3.Kafka API應用
阿新 • • 發佈:2018-11-06
一.專案環境搭建
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>cn.lv</groupId> <artifactId>kafka-study</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>kafka-study</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.2</version> </dependency> </dependencies> <build> <finalName>kafka-study</finalName> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>assembly</goal> </goals> </execution> </executions> </plugin> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.7</source> <target>1.7</target> <encoding>UTF-8</encoding> </configuration> </plugin> </plugins> </build> </project>
二.編寫程式碼
1.生產者
package kafka; import java.io.IOException; import java.io.InputStream; import java.util.Properties; import java.util.Random; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; public class KafkaProducerOps { public static void main(String[] args) throws IOException { /** * 載入配置檔案 */ Properties prop = new Properties(); InputStream in = KafkaProducerOps.class.getClassLoader().getResourceAsStream("producer.properties"); prop.load(in); /** * 兩個泛型引數 第一個泛型引數:kafka中一條記錄key的型別 第二個泛型引數:kafka中一條記錄value的型別 */ String[] girls = new String[] { "科比", "詹姆斯", "杜蘭特", "庫裡" }; Producer<String, String> producer = new KafkaProducer<String, String>(prop); Random random = new Random(); int start = 1; for (int i = start; i <= start + 9; i++) { String topic = prop.getProperty(Constants.KAFKA_PRODUCER_TOPIC); String key = i + ""; String value = "今天<--" + girls[random.nextInt(girls.length)] + "-->得分非常高!"; ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, key, value); producer.send(producerRecord); } } }
生產者的配置檔案:
############################# Producer Basics ############################# # list of brokers used for bootstrapping knowledge about the rest of the cluster # format: host1:port1,host2:port2 ... bootstrap.servers=192.168.2.15:9092,192.168.2.15:9093,192.168.2.15:9094 # specify the compression codec for all data generated: none, gzip, snappy, lz4 compression.type=none # name of the partitioner class for partitioning events; default partition spreads data randomly # partitioner.class= partitioner.class=kafka.MyKafkaPartitioner # the maximum amount of time the client will wait for the response of a request #request.timeout.ms= # how long `KafkaProducer.send` and `KafkaProducer.partitionsFor` will block for #max.block.ms= # the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together #linger.ms= # the maximum size of a request in bytes #max.request.size= # the default batch size in bytes when batching multiple records sent to a partition #batch.size= # the total bytes of memory the producer can use to buffer records waiting to be sent to the server #buffer.memory= #####設定自定義的topic producer.topic=nbaStar key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer
2.消費者
package kafka;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class KafkaConsumerOps {
public static void main(String[] args) throws IOException {
/**
* 載入配置檔案
*/
Properties properties = new Properties();
InputStream in = KafkaConsumerOps.class.getClassLoader().getResourceAsStream("consumer.properties");
properties.load(in);
Consumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
Collection<String> topics = Arrays.asList("nbaStar");
/**
* 消費者訂閱topic
*/
consumer.subscribe(topics);
ConsumerRecords<String, String> consumerRecords = null;
while (true) {
// 接下來就要從topic中拉取資料
consumerRecords = consumer.poll(1000);
// 遍歷每一條記錄
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
long offset = consumerRecord.offset();
int partition = consumerRecord.partition();
Object key = consumerRecord.key();
Object value = consumerRecord.value();
System.out.format("%d\t%d\t%s\t%s\n", offset, partition, key, value);
}
}
}
}
消費者的配置檔案:
# Zookeeper connection string
# comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
zookeeper.connect=192.168.2.15:2181,192.168.2.15:2182,192.168.2.15:2183
bootstrap.servers=192.168.2.15:9092,192.168.2.15:9093,192.168.2.15:9094
# timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
#consumer group id
group.id=test-consumer-group
#consumer timeout
#consumer.timeout.ms=5000
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
3.自定義分割槽
package kafka;
import java.util.Map;
import java.util.Random;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
public class MyKafkaPartitioner implements Partitioner {
@Override
public void configure(Map<String, ?> config) {
}
@Override
public void close() {
}
/**
* 根據給定的資料設定相關的分割槽
*
* @param topic 主題名稱
* @param key key
* @param keyBytes 序列化之後的key
* @param value value
* @param valueBytes 序列化之後的value
* @param cluster 當前叢集的元資料資訊
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
Integer partitionNums = cluster.partitionCountForTopic(topic);
int targetPartition = -1;
if (key == null || keyBytes == null) {
targetPartition = new Random().nextInt(10000) % partitionNums;
} else {
int hashCode = key.hashCode();
targetPartition = hashCode % partitionNums;
System.out.println(
"key:" + key + ",value" + value + ", hashCode: " + hashCode + ", partition: " + targetPartition);
}
return targetPartition;
}
}
4.打包上傳到伺服器執行