Kafka 參數解析
專有名詞簡介
- Producer:訊息生產者
- Consumer:訊息消費者
- Broker:傳遞訊息的中介者
- Topic:訊息的主題
- Partition:主題內的分區
訊息在傳遞和接收時主要指定是topic,除非要進行細節的操作才要去指定partition,一個topic裡面可以生成許多partition,訊息傳送時如果不指定partition則會複製到所有partition之中,而一個partition只能被一個consumer消費
Producer Config 設定
下列欄位為實作Kafka實體時需要用到的參數,星號為必須:
bootstrap.servers = localhost:9092 (*)
設定Kafka server的ip和port
key.serializer = org.apache.kafka.common.serialization.StringSerializer (*)
value.serializer = org.apache.kafka.common.serialization.StringSerializer (*)
訊息的序列化功能,producer在傳送時訊息可以是字串也可以是物件,在傳送時必須做序列化,除了String之外預設還支援ByteArray、ByteBuffer、Bytes、Double、Integer、Long,也可以實作自己的serializer,但必須要implement org.apache.kafka.common.serialization.Serializer
參考文章:https://www.opencodez.com/java/implement-custom-value-serializer-apache-kafka.htm
acks = all
Producer發送訊息時是否要等待訊息被確認收到,分為0, 1, all(-1)三種
0:producer不等待borker leader回應確認即當作發送成功,不斷發送訊息,因不做確認而有可能會缺漏訊息,設定為0則retries參數等於失效
1:broker leader收到後會直接回應確認,不等其他replica也保存好訊息
all:broker收到後會等到所有replica都保存好訊息後才回覆確認
retries = 0
當訊息發送失敗時重試的次數
max.in.flight.request.per.connection = 1
未被確認收到的訊息最多可以有幾筆準備被發送,當retries不為0時建議設定該值為1,代表在消息未被確認收到前不會再發送其他訊息,避免訊息重複被發送,但會降低效能
batch.size = 16348
Kafka producer的send是async方式,訊息會先被暫存到batch裡面然後再一次發送,該參數設定當batch量到達多少會進行發送,設定太少會讓發送次數過於頻繁導致效能下降,設定太多會消耗過多的memory在暫存batch
buffer.memory = 33554432
Producer可以用的memory大小,包含buffer訊息、進行壓縮所需使用的memory、等待被retry的訊息…等等
compression.typ = lz4
訊息壓縮編碼
Consumer Config設定
下列欄位為實作Kafka實體時需要用到的參數,星號為必須:
bootstrap.servers = localhost:9092 (*)
設定Kafka server的ip和port
key.deserializer = org.apache.kafka.common.serialization.StringDeserializer(*)
value.deserializer = org.apache.kafka.common.serialization.StringDeserializer (*)
訊息的反序列化功能,參照Producer Config設定
group.id = NASDAQ
設定consumer的群組,如果有多個group訂閱同個topic,該topic內的訊息會被複製發送到所有group中,而該訊息在同個group中只會有一個consumer去消費,不會被重複消費到
enable.auto.commit = true
consumer在消費訊息後,會通知broker自己消費到那個offset了,該值設定是否要自動通知broker
auto.commit.interval.ms = 1000
consumer多久要commit自己消費到的offset
auto.offset.reset = earliest
新來的consumer要從哪邊開始消費
earliest:當partition下有已commit的offset時,從該offset開始消費,否則從頭開始消費
latest:當partition下有已commit的offset時,從該offset開始消費,否則消費新產生message
none:當partition下有已commit的offset時,從該offset開始消費,否則拋exception
session.timeout.ms = 30000
kafka每個partition只能有一個consumer,當consumer超過設定時間沒有回應heartbeat時,該consumer會被標記失效,之後會release partition給其他consumer去消費