kafka API消費資料,指定分割槽消費,分割槽,攔截器
阿新 • • 發佈:2018-12-27
producer傳送訊息,consumer消費訊息
public class producer1 { public static void main(String[] args) { Properties prop = new Properties(); //1.配置kafka節點地址 prop.put("bootstrap.servers","192.168.232.132:9092"); //2.傳送訊息是否應答 prop.put("acks","all"); //3.配置傳送訊息失敗重試 prop.put("retries","0"); //4.配置批量處理訊息大小 prop.put("batch.size" ,"10241"); //5.配置批量處理資料延遲 prop.put("linger.ms","5"); //6.配置記憶體緩衝大小 prop.put("buffer.memory","1234321"); //7.資訊傳送前必須序列化 prop.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); prop.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); //例項化 KafkaProducer<String,String> prodecer = new KafkaProducer<String,String>(prop); for (int i = 0; i <99; i++){ prodecer.send(new ProducerRecord<String, String>("aa", "hah" + i), new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { if (metadata!=null) { System.out.println(metadata.topic() + "..." + metadata.offset() + "..." + metadata.partition()); } } }); } prodecer.close(); } }
public class Consumer1 { public static void main(String[] args) { //1.配置消費者屬性 Properties prop = new Properties(); //配置屬性 //伺服器地址指定 prop.put("bootstrap.servers", "192.168.232.132:9092"); //配置消費者組 prop.put("group.id", "g1"); //配置是否自動確認offset prop.put("enable.auto.commit", "true"); //序列化 prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //2.例項消費者 final KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop); //4.釋放資源 執行緒安全 Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { public void run() { if(consumer != null) { consumer.close(); } } })); //訂閱訊息主題 consumer.subscribe(Arrays.asList("aa")); //3.拉訊息 推push 拉poll while(true) { ConsumerRecords<String,String> records = consumer.poll(1000); //遍歷訊息 for(ConsumerRecord<String,String> record:records) { System.out.println(record.topic() + "------" + record.value()); } } } }
設定分割槽與指定分割槽消費
public class Patition1 implements Partitioner{ //設定 public void configure(Map<String, ?> configs) { } //分割槽邏輯 public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { return 1; } //釋放資源 public void close() { } }
public class Producer2 {
public static void main(String[] args) {
// 1.配置生產者屬性(指定多個引數)
Properties prop = new Properties();
// 引數配置
// kafka節點的地址
prop.put("bootstrap.servers", "192.168.232.132:9092");
// 傳送訊息是否等待應答
prop.put("acks", "all");
// 配置傳送訊息失敗重試
prop.put("retries", "0");
// 配置批量處理訊息大小
prop.put("batch.size", "10241");
// 配置批量處理資料延遲
prop.put("linger.ms", "5");
// 配置記憶體緩衝大小
prop.put("buffer.memory", "12341235");
// 訊息在傳送前必須序列化
prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prop.put("partitioner.class", "com.itstar.kafka.kafka_producer.Patition1");
//2.例項化producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
//3.傳送訊息
for(int i = 0;i<99;i++) {
producer.send(new ProducerRecord<String, String>("yuandan", "hunterhenshuai" + i), new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
//如果metadata不為null 拿到當前的資料偏移量與分割槽
if(metadata != null) {
System.out.println(metadata.topic() + "----" + metadata.offset() + "----" + metadata.partition());
}
}
});
}
//4.關閉資源
producer.close();
}
}
public class consumer {
public static void main(String[] args) {
Properties prop = new Properties();
//配置節點
prop.put("bootstrap.servers","192.168.232.132:9092");
//配置消費者組
prop.put("group.id","tt1");
//配置自動獲取確定offset
prop.put("enable.auto.commit","true");
//序列化
prop.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
prop.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
//例項化consumer
final KafkaConsumer<String,String> consume = new KafkaConsumer<String, String>(prop);
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
if (consume!=null){
consume.close();
}
}
}));
TopicPartition pp = new TopicPartition("aa",1);
//這個是指定分割槽消費
// consume.assign(Arrays.asList(p));
// 指定offset開始讀取
// consume.seekToBeginning(Arrays.asList(p));
//列印分割槽
List<PartitionInfo> parlist = consume.partitionsFor("aa");
for(PartitionInfo p : parlist){
System.out.println(p.toString());
}
//消費所有分割槽的,新增到List,然後assign這個List
List<TopicPartition> list = new ArrayList<TopicPartition>();
for (PartitionInfo p : parlist){
TopicPartition top = new TopicPartition("shengdan",p.partition());
list.add(top);
}
consume.assign(Arrays.asList(pp));
while (true){
ConsumerRecords<String,String> records = consume.poll(1000);
for(ConsumerRecord<String,String> record : records){
System.out.println(record.topic() +"---"+record.value());
}
}
}
}
攔截器
public class TimeInterceptor implements ProducerInterceptor<String, String>{
//配置資訊
public void configure(Map<String, ?> configs) {
}
//業務邏輯
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return new ProducerRecord<String, String>(
record.topic(),
record.partition(),
record.key(),
System.currentTimeMillis() + "-" + record.value());
}
//傳送失敗呼叫
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
//關閉資源
public void close() {
}
在producer裡
//攔截器
ArrayList<String> inList = new ArrayList<String>();
inList.add("com.itstare.kafka.interceptor.TimeInterceptor");
prop.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, inList);
就可以使用攔截器,相當於一個過濾的作用