Kafka Socket通訊+遇到的奇怪問題
阿新 • • 發佈:2018-11-05
1.使用Kafka2.0 API Socket建立Topic:
import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.requests.*; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.net.Socket; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; public class TopicTest { public static void main(String[] args){ try { TopicTest topicTest=new TopicTest(); topicTest.createTopics(10, (short) 1); }catch (IOException e){ } } //Kafka對每一種操作都定義了一對Request和Response類,比如這裡用到的CreateTopicsRequest和CreateTopicsResponse public void createTopics(int partitions,short replicationFactor) throws IOException { Map<String, CreateTopicsRequest.TopicDetails> topics=new HashMap<>(); topics.put("newtopic",new CreateTopicsRequest.TopicDetails(partitions,replicationFactor)); CreateTopicsRequest request=new CreateTopicsRequest.Builder(topics,60000).build(); ByteBuffer response=send("localhost",9092,request, ApiKeys.CREATE_TOPICS); CreateTopicsResponse.parse(response,request.version()); } private ByteBuffer send(String host, int port, AbstractRequest request, ApiKeys apiKeys) throws IOException { Socket socket=new Socket(host,port); try { return send(request,apiKeys,socket); }finally { socket.close(); } } private ByteBuffer send(AbstractRequest request, ApiKeys apiKeys, Socket socket) throws IOException { RequestHeader header=new RequestHeader(apiKeys,request.version(),"client",0); byte[] response=issueRequestAndWaitForResponse(socket,header,request); ByteBuffer responseBuffer=ByteBuffer.wrap(response); ResponseHeader.parse(responseBuffer); return responseBuffer; } private byte[] issueRequestAndWaitForResponse(Socket socket,RequestHeader header,AbstractRequest request) throws IOException { DataOutputStream dos=new DataOutputStream(socket.getOutputStream()); byte[] serializedRequest=request.serialize(header).array(); dos.writeInt(serializedRequest.length); dos.write(serializedRequest); dos.flush(); dos.close(); DataInputStream dis=new DataInputStream(socket.getInputStream()); byte[] response=new byte[dis.readInt()]; dis.readFully(response); return response; } }
《Apache Kafka實戰》上構建請求的方法是:
ByteBuffer buffer=ByteBuffer.allocate(header.sizeOf()+request.sizeOf());
header.writeTo(buffer);
request.writeTo(buffer);
byte[] serializedRequest=buffer.array();
實測2.0API沒有sizeOf和writeTo方法
使用 request.serialize(header).array() 代替
執行後使用指令碼查詢topic:
[email protected] :/usr/kafka# bin/kafka-topics.sh --list --zookeeper localhost:2181
newtopic
2.遇到的Kafka+Zookeeper奇怪問題:
啟動Zookeeper成功,但是無法啟動Kafka,提示超時:
[2018-08-24 07:59:48,997] INFO Socket connection established to localhost/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn) [2018-08-24 07:59:55,000] WARN Client session timed out, have not heard from server in 6003ms for sessionid 0x0 (org.apache.zookeeper.ClientCnxn) [2018-08-24 07:59:55,009] INFO Client session timed out, have not heard from server in 6003ms for sessionid 0x0, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
已設定 zookeeper.connection.timeout.ms=10000 ,繼續調大到60000,仍然報錯
百度得到的答案是資料量過大,但是本機沒有多少資料
由於都是在本地執行,應該不存在網路問題,於是想到可能是埠無法連線,但是WSL下無法使用netstat檢視端口占用(輸出為空,不知道是個例還是都這樣)。
於是在cmd中使用:netstat -ano,發現了兩個佔用2181埠的程序(zookeeper實際只開了一個),資源管理器檢視PID發現兩個程序都是java
進入WSL,使用pkill java殺掉所有殘留程序,重新啟動zookeeper和kafka,順利啟動