編寫Java程式向Kafka叢集生產並消費資料
阿新 • • 發佈:2018-12-29
一.Kafka生產資料
1.預備知識:
- 1.程式設計環境如下:
01.使用windows的intellij編寫java程式,連線到本地虛擬機器上的kafka叢集,生產和消費資料。
02.一定要注意配置等問題,否則會導致無法連線到zookeeper和kafka叢集。
2.Kafka直接生產資料
1.程式碼如下:
/*
1.use the Kafka1.0.0 alone don't have any problems.But if i use Kafka1.0.0 with spark2.2.0 and scala
2.11.8,there is a problem.
*/
import org.apache .kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent .TimeUnit;
public class TestProducer {
public static void main(String[] args) {
Properties props = new Properties();//New configuration file
props.put("bootstrap.servers", "192.168.211.3:9092");//you should write specific ip address rather than localhost
props.put("key.serializer" , "org.apache.kafka.common.serialization.StringSerializer");//StringSerializer/IntegerSerializer/or other self-defined Serializer.
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//the producer produce to specified topic
String topic = "dblab";
Producer<String, String> procuder = new KafkaProducer<String,String>(props);
for (int i = 1; i <= 10; i++) {
String value = "value: " + i;
//produce message through ProducerRecord<string,String>
ProducerRecord<String, String> msg = new ProducerRecord<String, String>(topic, value);
System.out.println(msg);
procuder.send(msg);
}
System.out.println("send message over.");
procuder.close(100,TimeUnit.MILLISECONDS);
}
}
2.Kafka通過讀取csv檔案生產模擬資料
1.程式碼如下:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Properties;
/*
1.the class ReadCsvAndSendToKafka is Read Csv file And Send this content To Kafka
2.
*/
public class ReadCsvAndSendToKafka {
public static void main(String[] args) {
Properties properties = new Properties();//get an properties
properties.put("bootstrap.servers","192.168.211.3:2181");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(properties);
try {
//the filePath is a varibales
if(args.length = 0){
System.out.println("you should enter a parameter");
return ;
}
//class BufferedReader has a better efficiency than class Reader
BufferedReader reader = new BufferedReader(new FileReader(args[0]));
String firstLine = reader.readLine();//read the first line
System.out.println("The first line is :"+firstLine);
String value = null;
String topic = "dblab";//the topic name in kafka is dblab
while((value=reader.readLine())!=null){
/*
1.the message send to kafka is a specific pattern,the pattern is ProducerRecord,it is a key-value pair
2.the key-value is topic-value
*/
ProducerRecord<String, String> message = new ProducerRecord<String, String>(topic,value);
Thread.sleep(1000);//print every one second
producer.send(message);
//System.out.println(value);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
3.kafka通過java程式消費資料
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class TestConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers","192.168.211.3:2181");//get a connection to zookeeper.very important!rather than broker's port
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//StringDeserializer
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//properties.put("group.id", "MyGroup");
Consumer<String,String> consumer = new KafkaConsumer<String,String>(properties);
consumer.subscribe(Arrays.asList("dblab","MyTest"));
while(true){
ConsumerRecords<String,String> records = consumer.poll(100);
for(ConsumerRecord re:records){
System.out.println(re.value());
}
}
}
}