kafka中生產者和消費者API
阿新 • • 發佈:2017-06-03
actor 成功 edm icc per class 持久化 spout payment
使用idea實現相關API操作,先要再pom.xml重添加Kafka依賴:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.8.2</artifactId> <version>0.8.1</version> <exclusions> <exclusion> <artifactId>jmxtools</artifactId> <groupId>com.sun.jdmk</groupId> </exclusion> <exclusion> <artifactId>jmxri</artifactId> <groupId>com.sun.jmx</groupId> </exclusion> <exclusion> <artifactId>jms</artifactId> <groupId>javax.jms</groupId> </exclusion> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </exclusion> </exclusions> </dependency>
Kafka生產者API:
1 package cn.itcast.storm.kafka.simple; 2 3 import kafka.javaapi.producer.Producer; 4 import kafka.producer.KeyedMessage; 5 import kafka.producer.ProducerConfig; 6 7 import java.util.Properties; 8 import java.util.UUID; 9 10 /** 11 * 這是一個簡單的Kafka producer代碼 12 * 包含兩個功能:13 * 1、數據發送 14 * 2、數據按照自定義的partition策略進行發送 15 * 16 * 17 * KafkaSpout的類 18 */ 19 public class KafkaProducerSimple { 20 public static void main(String[] args) { 21 /** 22 * 1、指定當前kafka producer生產的數據的目的地 23 * 創建topic可以輸入以下命令,在kafka集群的任一節點進行創建。 24 * bin/kafka-topics.sh --create --zookeeper zk01:2181 --replication-factor 1 --partitions 1 --topic test25 */ 26 String TOPIC = "orderMq"; 27 /** 28 * 2、讀取配置文件 29 */ 30 Properties props = new Properties(); 31 /* 32 * key.serializer.class默認為serializer.class 33 */ 34 props.put("serializer.class", "kafka.serializer.StringEncoder"); 35 /* 36 * kafka broker對應的主機,格式為host1:port1,host2:port2 37 */ 38 props.put("metadata.broker.list", "kafka01:9092,kafka02:9092,kafka03:9092"); 39 /* 40 * request.required.acks,設置發送數據是否需要服務端的反饋,有三個值0,1,-1 41 * 0,意味著producer永遠不會等待一個來自broker的ack,這就是0.7版本的行為。 42 * 這個選項提供了最低的延遲,但是持久化的保證是最弱的,當server掛掉的時候會丟失一些數據。 43 * 1,意味著在leader replica已經接收到數據後,producer會得到一個ack。 44 * 這個選項提供了更好的持久性,因為在server確認請求成功處理後,client才會返回。 45 * 如果剛寫到leader上,還沒來得及復制leader就掛了,那麽消息才可能會丟失。 46 * -1,意味著在所有的ISR都接收到數據後,producer才得到一個ack。 47 * 這個選項提供了最好的持久性,只要還有一個replica存活,那麽數據就不會丟失 48 */ 49 props.put("request.required.acks", "1"); 50 /* 51 * 可選配置,如果不配置,則使用默認的partitioner partitioner.class 52 * 默認值:kafka.producer.DefaultPartitioner 53 * 用來把消息分到各個partition中,默認行為是對key進行hash。 54 */ 55 props.put("partitioner.class", "cn.itcast.storm.kafka.MyLogPartitioner"); 56 // props.put("partitioner.class", "kafka.producer.DefaultPartitioner"); 57 /** 58 * 3、通過配置文件,創建生產者 59 */ 60 Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(props)); 61 /** 62 * 4、通過for循環生產數據 63 */ 64 for (int messageNo = 1; messageNo < 100000; messageNo++) { 65 // String messageStr = new String(messageNo + "註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey," + 66 // "註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發" + 67 // "註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發" + 68 // "註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發" + 69 // "註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發" + 70 // "註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發" + 71 // "註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發" + 72 // "註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發" + 73 // "用來配合自定義的MyLogPartitioner進行數據分發"); 74 75 /** 76 * 5、調用producer的send方法發送數據 77 * 註意:這裏需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發 78 */ 79 producer.send(new KeyedMessage<String, String>(TOPIC, messageNo + "", "appid" + UUID.randomUUID() + "itcast")); 80 } 81 } 82 }
Kafka消費者API:
1 package cn.itcast.storm.kafka.simple; 2 3 import kafka.consumer.Consumer; 4 import kafka.consumer.ConsumerConfig; 5 import kafka.consumer.ConsumerIterator; 6 import kafka.consumer.KafkaStream; 7 import kafka.javaapi.consumer.ConsumerConnector; 8 import kafka.message.MessageAndMetadata; 9 10 import java.util.HashMap; 11 import java.util.List; 12 import java.util.Map; 13 import java.util.Properties; 14 import java.util.concurrent.ExecutorService; 15 import java.util.concurrent.Executors; 16 17 public class KafkaConsumerSimple implements Runnable { 18 public String title; 19 public KafkaStream<byte[], byte[]> stream; 20 public KafkaConsumerSimple(String title, KafkaStream<byte[], byte[]> stream) { 21 this.title = title; 22 this.stream = stream; 23 } 24 @Override 25 public void run() { 26 System.out.println("開始運行 " + title); 27 ConsumerIterator<byte[], byte[]> it = stream.iterator(); 28 /** 29 * 不停地從stream讀取新到來的消息,在等待新的消息時,hasNext()會阻塞 30 * 如果調用 `ConsumerConnector#shutdown`,那麽`hasNext`會返回false 31 * */ 32 while (it.hasNext()) { 33 MessageAndMetadata<byte[], byte[]> data = it.next(); 34 String topic = data.topic(); 35 int partition = data.partition(); 36 long offset = data.offset(); 37 String msg = new String(data.message()); 38 System.out.println(String.format( 39 "Consumer: [%s], Topic: [%s], PartitionId: [%d], Offset: [%d], msg: [%s]", 40 title, topic, partition, offset, msg)); 41 } 42 System.out.println(String.format("Consumer: [%s] exiting ...", title)); 43 } 44 45 public static void main(String[] args) throws Exception{ 46 Properties props = new Properties(); 47 props.put("group.id", "dashujujiagoushi"); 48 props.put("zookeeper.connect", "zk01:2181,zk02:2181,zk03:2181"); 49 props.put("auto.offset.reset", "largest"); 50 props.put("auto.commit.interval.ms", "1000"); 51 props.put("partition.assignment.strategy", "roundrobin"); 52 ConsumerConfig config = new ConsumerConfig(props); 53 String topic1 = "orderMq"; 54 String topic2 = "paymentMq"; 55 //只要ConsumerConnector還在的話,consumer會一直等待新消息,不會自己退出 56 ConsumerConnector consumerConn = Consumer.createJavaConsumerConnector(config); 57 //定義一個map 58 Map<String, Integer> topicCountMap = new HashMap<>(); 59 topicCountMap.put(topic1, 3); 60 //Map<String, List<KafkaStream<byte[], byte[]>> 中String是topic, List<KafkaStream<byte[], byte[]>是對應的流 61 Map<String, List<KafkaStream<byte[], byte[]>>> topicStreamsMap = consumerConn.createMessageStreams(topicCountMap); 62 //取出 `kafkaTest` 對應的 streams 63 List<KafkaStream<byte[], byte[]>> streams = topicStreamsMap.get(topic1); 64 //創建一個容量為4的線程池 65 ExecutorService executor = Executors.newFixedThreadPool(3); 66 //創建20個consumer threads 67 for (int i = 0; i < streams.size(); i++) 68 executor.execute(new KafkaConsumerSimple("消費者" + (i + 1), streams.get(i))); 69 } 70 }
kafka自定義patition:
1 package cn.itcast.storm.kafka; 2 3 import kafka.producer.Partitioner; 4 import kafka.utils.VerifiableProperties; 5 import org.apache.log4j.Logger; 6 7 8 public class MyLogPartitioner implements Partitioner { 9 private static Logger logger = Logger.getLogger(MyLogPartitioner.class); 10 11 public MyLogPartitioner(VerifiableProperties props) { 12 } 13 14 public int partition(Object obj, int numPartitions) { 15 return Integer.parseInt(obj.toString())%numPartitions; 16 // return 1; 17 } 18 19 }
kafka中生產者和消費者API