Kafka生產消費API JAVA實現
阿新 • • 發佈:2019-05-09
call server nec topic [] args ray serializa ava
Maven依賴:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.0.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.10.0.0</version> </dependency>
Kafka生產者簡單接口JAVA實現:
import org.apache.kafka.clients.producer.*; import java.util.Properties; public class KafkaProducer { public static void main(String[] args) throws Exception{ String topic = ""; String brokerList = ""; String message = ""; Properties props = new Properties(); props.put("bootstrap.servers", brokerList); props.put("acks", "1"); props.put("retries", 0); props.put("batch.size", 16384); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(props); ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic,message); producer.send(record, new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { if (e != null) { e.printStackTrace(); } } }); } }
Kafka消費者簡單接口JAVA實現
import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; public class KafkaConsumer { public static void main(String[] args) { String topic = ""; String zkConnect = ""; Properties prop = new Properties(); prop.put("zookeeper.connect", zkConnect); prop.put("group.id", "group003"); prop.put("auto.offset.reset", "largest"); prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(prop)); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, 1); Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap); final KafkaStream<byte[], byte[]> kafkaStream = messageStreams.get(topic).get(0); ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator(); while (iterator.hasNext()) { String msg = new String(iterator.next().message()); System.out.println("--------"+msg); } } }
Kafka新消費者接口JAVA實現
import org.apache.kafka.clients.consumer.*;
import java.util.Arrays;
import java.util.Properties;
public class KafkaNewConsumer {
public static void main(String[] args) {
String topic = "";
String brokerList = "";
String group="";
Properties props = new Properties();
props.put("bootstrap.servers", brokerList);
props.put("group.id", group);
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
org.apache.kafka.clients.consumer.KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topic));
while(true) {
ConsumerRecords<String, String> records = consumer.poll(10);
for(ConsumerRecord<String, String> record : records) {
System.out.println("fetched from partition " + record.partition() + ", offset: " + record.offset() + ", message: " + record.value());
}
}
}
}
Kafka生產消費API JAVA實現