Spark 消費Kafka資料
阿新 • • 發佈:2018-12-16
spark RDD消費的哦,不是spark streaming。
導maven包:
注意版本哦,要跟自己機器的一致
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.9.0.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.9.0.0</version> </dependency>
導包:
import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.serializer.StringDecoder; import kafka.utils.VerifiableProperties; import java.sql.Connection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties;
程式碼:
複製貼上,加簡單修改即可使用。
public class KafkaConsumer { private final ConsumerConnector consumer; private final static String TOPIC="test";//你要消費的topic private final static String sql=""; private KafkaConsumer(){ Properties props=new Properties(); //zookeeper props.put("zookeeper.connect","192.168.163.120:2181");//你的zookeeper地址 //topic props.put("group.id","logstest");//設定組 //Zookeeper 超時 props.put("zookeeper.session.timeout.ms", "4000"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "smallest"); props.put("serializer.class", "kafka.serializer.StringEncoder"); ConsumerConfig config=new ConsumerConfig(props); consumer= kafka.consumer.Consumer.createJavaConsumerConnector(config); } void consume(){ Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(TOPIC, new Integer(1)); StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties()); StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties()); Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder); KafkaStream<String, String> stream = consumerMap.get(TOPIC).get(0); ConsumerIterator<String, String> it = stream.iterator(); try{ int messageCount = 0; while (it.hasNext()){ System.out.println(it.next().message()); messageCount++; if(messageCount%10 == 0){ System.out.println("Consumer端一共消費了" + messageCount + "條訊息!"); } } }catch (Exception e){ e.printStackTrace(); } } public static void main(String[] args) { new KafkaConsumer().consume(); } }
希望能幫到有需要的朋友。