1. 程式人生 > >kafka 協議層API示例

kafka 協議層API示例

眾所周知,Kafka自己實現了一套二進位制協議(binary protocol)用於各種功能的實現,比如傳送訊息,獲取訊息,提交位移以及建立topic等。具體協議規範參見:Kafka協議 這套協議的具體使用流程為:

1.客戶端建立對應協議的請求

2.客戶端傳送請求給對應的broker

3.broker處理請求,併發送response給客戶端

雖然Kafka提供的大量的指令碼工具用於各種功能的實現,但很多時候我們還是希望可以把某些功能以程式設計的方式嵌入到另一個系統中。這時使用Java API的方式就顯得異常地靈活了。本文我將嘗試給出Java API底層框架的一個範例,同時也會針對“建立topic”和“檢視位移”這兩個主要功能給出對應的例子。 需要提前說明的是,本文給出的範例並沒有考慮Kafka叢集開啟安全的情況。另外Kafka的KIP4應該一直在優化命令列工具以及各種管理操作,有興趣的讀者可以關注這個KIP。

本文中用到的API依賴於kafka-clients,所以如果你使用Maven構建的話,請加上:

?
1 2 3 4 5 < dependency >
< groupId >org.apache.kafka</ groupId > < artifactId >kafka-clients</ artifactId > < version >0.10.2.0</
version > </ dependency >

如果是gradle,請加上:

?
1 compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.10.2.0'

底層框架

?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 /** * 傳送請求主方法 * @param host 目標broker的主機名 * @param port 目標broker的埠 * @param request 請求物件 * @param apiKey 請求型別 * @return 序列化後的response * @throws IOException */ public ByteBuffer send(String host, int port, AbstractRequest request, ApiKeys apiKey) throws IOException { Socket socket = connect(host, port); try { return send(request, apiKey, socket); } finally { socket.close(); } } /** * 傳送序列化請求並等待response返回 * @param socket 連向目標broker的socket * @param request 序列化後的請求 * @return 序列化後的response * @throws IOException */ private byte [] issueRequestAndWaitForResponse(Socket socket, byte [] request) throws IOException { sendRequest(socket, request); return getResponse(socket); } /** * 傳送序列化請求給socket * @param socket 連向目標broker的socket * @param request 序列化後的請求 * @throws IOException */ private void sendRequest(Socket socket, byte [] request) throws IOException { DataOutputStream dos = new DataOutputStream(socket.getOutputStream()); dos.writeInt(request.length); dos.write(request); dos.flush(); } /** * 從給定socket處獲取response * @param socket 連向目標broker的socket * @return 獲取到的序列化後的response * @throws IOException */ private byte [] getResponse(Socket socket) throws IOException { DataInputStream dis = null ; try { dis = new DataInputStream(socket.getInputStream()); byte [] response = new byte [dis.readInt()]; dis.readFully(response); return response; } finally { if (dis != null ) { dis.close(); } } } /** * 建立Socket連線 * @param hostName 目標broker主機名 * @param port 目標broker服務埠, 比如9092 * @return 建立的Socket連線 * @throws IOException */ private Socket connect(String hostName, int port) throws IOException { return new Socket(hostName, port); } /** * 向給定socket傳送請求 * @param request 請求物件 * @param apiKey 請求型別, 即屬於哪種請求 * @param socket 連向目標broker的socket * @return 序列化後的response * @throws IOException */ private ByteBuffer send(AbstractRequest request, ApiKeys apiKey, Socket socket) throws IOException { RequestHeader header = new RequestHeader(apiKey.id, request.version(), "client-id" , 0 ); ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + request.sizeOf()); header.writeTo(buffer); request.writeTo(buffer); byte [] serializedRequest = buffer.array(); byte [] response = issueRequestAndWaitForResponse(socket, serializedRequest); ByteBuffer responseBuffer = ByteBuffer.wrap(response); ResponseHeader.parse(responseBuffer); return responseBuffer; }

有了這些方法的鋪墊,我們就可以建立具體的請求了。

建立topic

?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 /** * 建立topic * 由於只是樣例程式碼,有些東西就硬編碼寫到程式裡面了(比如主機名和埠),各位看官自行修改即可 * @param topicName topic名 * @param partitions 分割槽數 * @param replicationFactor 副本數 * @throws IOException */ public void createTopics(String topicName, int partitions, short replicationFactor) throws IOException { Map<String, CreateTopicsRequest.TopicDetails> topics = new HashMap<>(); // 插入多個元素便可同時建立多個topic topics.put(topicName, new CreateTopicsRequest.TopicDetails(partitions, replicationFactor)); int creationTimeoutMs = 60000 ; CreateTopicsRequest request = new CreateTopicsRequest.Builder(topics, creationTimeoutMs).build(); ByteBuffer response = send( "localhost" , 9092 , request, ApiKeys.CREATE_TOPICS); CreateTopicsResponse.parse(response, request.version()); }

檢視位移

?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 /** * 獲取某個consumer group下的某個topic分割槽的位移 * @param groupID group id * @param topic topic名 * @param parititon 分割槽號 * @throws IOException */ public void getOffsetForPartition(String groupID, String topic, int parititon) throws IOException { TopicPartition tp = new TopicPartition(topic, parititon); OffsetFetchRequest request = new OffsetFetchRequest.Builder(groupID, singletonList(tp)) .setVersion(( short ) 2 ).build(); ByteBuffer response = send( "localhost" , 9092 , request, ApiKeys.OFFSET_FETCH); OffsetFetchResponse resp = OffsetFetchResponse.parse(response, request.version()); OffsetFetchResponse.PartitionData partitionData = resp.responseData().get(tp); System.out.println(partitionData.offset); }
?
1 2 3 4 5 6 7 8 9 10 11 12 /** * 獲取某個consumer group下所有topic分割槽的位移資訊 * @param groupID group id * @return (topic分割槽 --> 分割槽資訊)的map * @throws IOException */ public Map<TopicPartition, OffsetFetchResponse.PartitionData> getAllOffsetsForGroup(String groupID) throws IOException { OffsetFetchRequest request = new OffsetFetchRequest.Builder(groupID, null ).setVersion(( short ) 2 ).build(); ByteBuffer response = send( "localhost" , 9092 , request, ApiKeys.OFFSET_FETCH); OffsetFetchResponse resp = OffsetFetchResponse.parse(response, request.version()); return resp.responseData(); }