Kafka 實作紀錄
在使用queue之前需要先了解:
- 常見的queue可以分成broker和broker-less兩種架構
broker指的是有一個中介者負責訊息的處理和傳遞,例如像Kafka會利用中間的server去處理資料,包含分配資料到不同的partition等等
broker-less則是沒有中介者負責,例如像ZeroMq是由兩端直接建立連線傳遞資料 - 不同queue有各自不同的設計目的,例如Kafka一開始是設計給log系統使用,所以在處理小筆但大量資料時有很高的throughput,而像RabbitMq則注重在資料的準確性以及提供更多實作protocol,但效能上就比不上Kafka,實際運用需要考慮到自己的情境
為了評估工作上的情境是否可以採用Kafka而做了一些測試,會放在文章後面寫,接下來先記錄一些Kafka的特性:
- Kafka是broker架構,利用中間的broker server來管理訊息處理
- Kafka採用Zookeeper來管理分散式架構的broker,新的版本Kafka已經內建Zookeeper了不用再自己下載
- Producer和Consumer透過Topic來溝通,Producer在send時要指定Topic,Consumer在new出來以後要subscribe對應的Topic
- Topic內包含的Partition,利用Partition可以做到分散式運算,例如傳遞的訊息是一連串的數字,我需要對每個數字做運算,那就可以根據數字的末碼分成0~9十個Partition,根據末碼把這堆數字分成十堆,再建十個Consumer去消化掉每堆的數字,而不用只有一個Consumer去處理所有數字
- Kafka確保Partition內部資料的順序性,但是不保證不同Partition之間的順序
- 最值得注意的是,一個Partition只能對應到一個Consumer,所以假設一個Topic對應的Consumer Group裡面有十個Consumer,但是只有分出八個Partition,那就會有兩個Consumer是沒事做的
- 一個Consumer可以接收多個Partition的資料
- Consumer只支援從broker pull資料的模式,不支援broker主動push資料給他,主要是考量Consumer各自的消化能力不相同,如果全部都由broker去控制會有太多東西要設定,參考Kafka的設計理念:
http://kafka.apache.org/documentation/#design_pull - Kafka中的訊息以offset去紀錄順序,Consumer在pull訊息時可以選擇從特定的offset開始拉,或是從最新的資料開始拉…等等,offset順序性只保證在Partition內
這次工作上所需的情境是,每秒最多大約會有2萬筆資料throughput,每筆資料大小大約是600~700個bytes
可以先利用Kafka內建的效能測試工具去簡單測試:
kafka-producer-perf-test --topic test --num-records 100000 --record-size 1000 --producer-props bootstrap.servers=localhost:9092 --throughput 100000
這樣的設定是總共傳送10萬筆資料,每筆資料1000個bytes,Kafka的server建立在local的docker內,每秒最高throughput為10萬筆
因為是建立在本機的docker內,docker的設定只有2個CPU跟2G的RAM,跑起來的效果很差,每秒大約只有4000左右的throughput,但是當資料量大小降低到只有100bytes時,throughput可以拉高到每秒2~3萬筆
而當機器為4核8G時,1000bytes的資料throughput也可以到每秒2~3萬筆,所以可以符合工作需求,加上Kafka的資源也比較多,所以最後決定採用Kafka
踩到的坑:
本次測試除了使用內建測試工具外,也另外寫了Producer和Consumer,包成jar檔後放到兩台機器中,連線到Kafka server測試,測試結果和測試工具跑出來的差不多
主要的坑是在於,Kafka是建立在docker之內,利用docker-compose去管理Kafka和Zookeeper兩個container,在本機用localhost:9092去測試時都沒有問題,但把jar檔放到外部機器時卻都連不上server
google到的問題點都是在於,advertised.listeners=PLAINTEXT://localhost:9092,這邊必須要把localhost換成ip位置,但是設定完後問題仍沒有解決
最後在求助devops後,發現問題在docker-compose.override.yml的設定檔內,要把其中KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://locolhost:9092的localhost改成ip,最後問題才解決
最後補充:
當時用本機測試出來的效能很悲劇,所以跑去研究了Chronicle Queue想看看能不能用,但Chronicle Queue是在本機端內透過virtual memory的方式去做queue,也是broker-less的架構,如果要做host之間的queue的話必須要付費用企業版,而在發現Kafka要達到需求效能所需的硬體是可以接受而決定採用Kafka後,就決定不採用Chronicle Queue了
不過還是把Chronicle Queue的效能測試結果分享一下,60萬筆的資料,每筆資料大小1000bytes,總共送完只花了4秒!
如果是每2萬筆資料當作一輪,發送完這兩萬筆只需要約130毫秒的時間!在效率上比起Kafka要高非常多,如果需求是本機內服務的溝通要追求更高效能的話,是可以考慮使用的工具
Chronicle Queue的測試code有空再放上Github~