1. 程式人生 > >基於kafka模擬生產者和消費者

基於kafka模擬生產者和消費者

zookeeper的啟動指令碼:

#!/bin/sh
echo "start zookeeper server..."

hosts="hadoop0300 hadoop0301 hadoop0302"

for host in $hosts
do
  ssh $host  "source /etc/profile; /root/app/zookeeper-3.4.7/bin/zkServer.sh start"
done

kafka的啟動指令碼:

#!/bin/bash

for host in hadoop0300 hadoop0301 hadoop0302
do
echo $host
ssh [email protected]
$host "source /etc/profile;/usr/local/kafka_2.11-0.9.0.1/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.11-0.9.0.1/config/server.properties" done

//時間同步 ntpdate -u ntp.api.bz

//啟動kafka /usr/local/kafka_2.11-0.9.0.1/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.11-0.9.0.1/config/server.properties

//建立一個topci為test /usr/local/kafka_2.11-0.9.0.1/bin./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

//檢視當前叢集裡面所有的topic /usr/local/kafka_2.11-0.9.0.1/bin/kafka-topics.sh --list --zookeeper 192.168.88.130:2181

//通過shell命令傳送訊息(模擬生產者) /usr/local/kafka_2.11-0.9.0.1/bin/kafka-console-producer.sh --broker-list 192.168.88.130:9092 --topic test

//通過shell消費訊息(模擬消費者,另一客戶端) /usr/local/kafka_2.11-0.9.0.1/bin/kafka-console-consumer.sh --zookeeper 192.168.88.130:2181 --from-beginning --topic test

//如果報的是下面的錯 kafka.common.FailedToSendMessageException Failed to send messages after 3 tries 解決:將server.properties裡面的host.name該為自己的ip地址

ProducerDemo模擬生產者:

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.Properties;

public class ProducerDemo {
    public static void main(String[] args) {
        //建立producer配置資訊,發到哪裡去
        Properties pro = new Properties();
        //指定訊息傳送到kafka叢集
        pro.put("metadata.broker.list","192.168.88.130:9092,192.168.88.131:9092,192.168.88.132:9092");
        //指定訊息序列化方式
        pro.put("serializer.class","kafka.serializer.StringEncoder");
        //配置資訊包裝
        ProducerConfig config = new ProducerConfig(pro);
        //1.建立producer
        Producer<String,String> producer = new Producer<String, String>(config);

        for (int i = 0; i <= 100; i++) {
            producer.send(new KeyedMessage<String,String>("test","message"+i));
        }
    }
}
ConsumerDemo模擬消費者:

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.message.MessageAndMetadata;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

public class ConsumerDemo {

    //指定消費的主題(哪個類別的訊息)
    private static final String topic = "test";
    //指定執行緒個數
    private static final Integer thread = 2;

    public static void main(String[] args) {
        //建立消費者的配置資訊
        Properties pro = new Properties();
        //指定連線zookeeper的訊息
        pro.put("zookeeper.connect","192.168.88.130:2181,192.168.88.131:2181,192.168.88.132:2181");
        //消費者是以組的形式消費,指定消費組資訊
        pro.put("group.id","testGroup");
        //配置消費訊息的開始位置,從偏移量為0的開始消費,smallest代表從topic的第一條訊息開始消費
        //對應的largest:代表從我們的消費者啟動之後該topic下新產生的訊息開始消費
        pro.put("auto.offset.reset","smallest");
        //
        ConsumerConfig config = new ConsumerConfig(pro);
        //建立消費者
        kafka.javaapi.consumer.ConsumerConnector consumer = Consumer.createJavaConsumerConnector(config);
        //消費者可以消費多個topic資料,建立一個map存放top資訊
        Map<String,Integer> topicMaps = new HashMap<String,Integer>();
        topicMaps.put(topic,thread);
        //建立資訊流
        Map<String,List<KafkaStream<byte[],byte[]>>> consumerMap=
        consumer.createMessageStreams(topicMaps);
        //獲取topic資訊
        List<KafkaStream<byte[],byte[]>> kafkaStreams = consumerMap.get(topic);
        //一直迴圈kafka拉取訊息
        for(final KafkaStream<byte[],byte[]> kafkaStream: kafkaStreams){
            //建立一個執行緒,消費訊息
            new Thread(new Runnable() {
                @Override
                public void run() {
                    //迴圈讀取每一條訊息
                    for(MessageAndMetadata<byte[],byte[]> msg:kafkaStream){
                        //讀到一條訊息
                        String message =new String(msg.message());
                        System.out.println(message);
                    }
                }
            }).start();
        }
    }
}