1. 程式人生 > >Kafka Socket通訊+遇到的奇怪問題

Kafka Socket通訊+遇到的奇怪問題

1.使用Kafka2.0 API Socket建立Topic:

import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.*;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;

public class TopicTest {
    public static void main(String[] args){
        try {
            TopicTest topicTest=new TopicTest();
            topicTest.createTopics(10, (short) 1);
        }catch (IOException e){

        }
    }

    //Kafka對每一種操作都定義了一對Request和Response類,比如這裡用到的CreateTopicsRequest和CreateTopicsResponse
    public void createTopics(int partitions,short replicationFactor) throws IOException {
        Map<String, CreateTopicsRequest.TopicDetails> topics=new HashMap<>();
        topics.put("newtopic",new CreateTopicsRequest.TopicDetails(partitions,replicationFactor));
        CreateTopicsRequest request=new CreateTopicsRequest.Builder(topics,60000).build();
        ByteBuffer response=send("localhost",9092,request, ApiKeys.CREATE_TOPICS);
        CreateTopicsResponse.parse(response,request.version());
    }

    private ByteBuffer send(String host, int port, AbstractRequest request, ApiKeys apiKeys) throws IOException {
        Socket socket=new Socket(host,port);
        try {
            return send(request,apiKeys,socket);
        }finally {
            socket.close();
        }
    }

    private ByteBuffer send(AbstractRequest request, ApiKeys apiKeys, Socket socket) throws IOException {
        RequestHeader header=new RequestHeader(apiKeys,request.version(),"client",0);
        byte[] response=issueRequestAndWaitForResponse(socket,header,request);
        ByteBuffer responseBuffer=ByteBuffer.wrap(response);
        ResponseHeader.parse(responseBuffer);
        return responseBuffer;
    }

    private byte[] issueRequestAndWaitForResponse(Socket socket,RequestHeader header,AbstractRequest request) throws IOException {
        DataOutputStream dos=new DataOutputStream(socket.getOutputStream());
        byte[] serializedRequest=request.serialize(header).array();
        dos.writeInt(serializedRequest.length);
        dos.write(serializedRequest);
        dos.flush();
        dos.close();
        DataInputStream dis=new DataInputStream(socket.getInputStream());
        byte[] response=new byte[dis.readInt()];
        dis.readFully(response);
        return response;
    }
}


《Apache Kafka實戰》上構建請求的方法是:

ByteBuffer buffer=ByteBuffer.allocate(header.sizeOf()+request.sizeOf());
header.writeTo(buffer);
request.writeTo(buffer);
byte[] serializedRequest=buffer.array();


實測2.0API沒有sizeOfwriteTo方法
使用 request.serialize(header).array() 代替

執行後使用指令碼查詢topic:

[email protected]
:/usr/kafka# bin/kafka-topics.sh --list --zookeeper localhost:2181 newtopic

2.遇到的Kafka+Zookeeper奇怪問題:
啟動Zookeeper成功,但是無法啟動Kafka,提示超時:

[2018-08-24 07:59:48,997] INFO Socket connection established to localhost/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2018-08-24 07:59:55,000] WARN Client session timed out, have not heard from server in 6003ms for sessionid 0x0 (org.apache.zookeeper.ClientCnxn)
[2018-08-24 07:59:55,009] INFO Client session timed out, have not heard from server in 6003ms for sessionid 0x0, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)

已設定 zookeeper.connection.timeout.ms=10000 ,繼續調大到60000,仍然報錯
百度得到的答案是資料量過大,但是本機沒有多少資料
由於都是在本地執行,應該不存在網路問題,於是想到可能是埠無法連線,但是WSL下無法使用netstat檢視端口占用(輸出為空,不知道是個例還是都這樣)。
於是在cmd中使用:netstat -ano,發現了兩個佔用2181埠的程序(zookeeper實際只開了一個),資源管理器檢視PID發現兩個程序都是java
進入WSL,使用pkill java殺掉所有殘留程序,重新啟動zookeeper和kafka,順利啟動