基於kafka模擬生產者和消費者
zookeeper的啟動指令碼:
#!/bin/sh
echo "start zookeeper server..."
hosts="hadoop0300 hadoop0301 hadoop0302"
for host in $hosts
do
ssh $host "source /etc/profile; /root/app/zookeeper-3.4.7/bin/zkServer.sh start"
done
kafka的啟動指令碼:
#!/bin/bash
for host in hadoop0300 hadoop0301 hadoop0302
do
echo $host
ssh [email protected] $host "source /etc/profile;/usr/local/kafka_2.11-0.9.0.1/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.11-0.9.0.1/config/server.properties"
done
//時間同步 ntpdate -u ntp.api.bz
//啟動kafka /usr/local/kafka_2.11-0.9.0.1/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.11-0.9.0.1/config/server.properties
//建立一個topci為test /usr/local/kafka_2.11-0.9.0.1/bin./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
//檢視當前叢集裡面所有的topic /usr/local/kafka_2.11-0.9.0.1/bin/kafka-topics.sh --list --zookeeper 192.168.88.130:2181
//通過shell命令傳送訊息(模擬生產者) /usr/local/kafka_2.11-0.9.0.1/bin/kafka-console-producer.sh --broker-list 192.168.88.130:9092 --topic test
//通過shell消費訊息(模擬消費者,另一客戶端) /usr/local/kafka_2.11-0.9.0.1/bin/kafka-console-consumer.sh --zookeeper 192.168.88.130:2181 --from-beginning --topic test
//如果報的是下面的錯 kafka.common.FailedToSendMessageException Failed to send messages after 3 tries 解決:將server.properties裡面的host.name該為自己的ip地址
ProducerDemo模擬生產者:
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.Properties;
public class ProducerDemo {
public static void main(String[] args) {
//建立producer配置資訊,發到哪裡去
Properties pro = new Properties();
//指定訊息傳送到kafka叢集
pro.put("metadata.broker.list","192.168.88.130:9092,192.168.88.131:9092,192.168.88.132:9092");
//指定訊息序列化方式
pro.put("serializer.class","kafka.serializer.StringEncoder");
//配置資訊包裝
ProducerConfig config = new ProducerConfig(pro);
//1.建立producer
Producer<String,String> producer = new Producer<String, String>(config);
for (int i = 0; i <= 100; i++) {
producer.send(new KeyedMessage<String,String>("test","message"+i));
}
}
}
ConsumerDemo模擬消費者:
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.message.MessageAndMetadata;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
public class ConsumerDemo {
//指定消費的主題(哪個類別的訊息)
private static final String topic = "test";
//指定執行緒個數
private static final Integer thread = 2;
public static void main(String[] args) {
//建立消費者的配置資訊
Properties pro = new Properties();
//指定連線zookeeper的訊息
pro.put("zookeeper.connect","192.168.88.130:2181,192.168.88.131:2181,192.168.88.132:2181");
//消費者是以組的形式消費,指定消費組資訊
pro.put("group.id","testGroup");
//配置消費訊息的開始位置,從偏移量為0的開始消費,smallest代表從topic的第一條訊息開始消費
//對應的largest:代表從我們的消費者啟動之後該topic下新產生的訊息開始消費
pro.put("auto.offset.reset","smallest");
//
ConsumerConfig config = new ConsumerConfig(pro);
//建立消費者
kafka.javaapi.consumer.ConsumerConnector consumer = Consumer.createJavaConsumerConnector(config);
//消費者可以消費多個topic資料,建立一個map存放top資訊
Map<String,Integer> topicMaps = new HashMap<String,Integer>();
topicMaps.put(topic,thread);
//建立資訊流
Map<String,List<KafkaStream<byte[],byte[]>>> consumerMap=
consumer.createMessageStreams(topicMaps);
//獲取topic資訊
List<KafkaStream<byte[],byte[]>> kafkaStreams = consumerMap.get(topic);
//一直迴圈kafka拉取訊息
for(final KafkaStream<byte[],byte[]> kafkaStream: kafkaStreams){
//建立一個執行緒,消費訊息
new Thread(new Runnable() {
@Override
public void run() {
//迴圈讀取每一條訊息
for(MessageAndMetadata<byte[],byte[]> msg:kafkaStream){
//讀到一條訊息
String message =new String(msg.message());
System.out.println(message);
}
}
}).start();
}
}
}