1. 程式人生 > >kafka-stream資料清洗

kafka-stream資料清洗

1、資料清洗業務類LogProcessor

package com.css.kafka.kafka_stream;

import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;

/**
 * 資料清洗*/
public class LogProcessor implements Processor<byte[], byte[]>{

    private ProcessorContext context;
    
    
//初始化 public void init(ProcessorContext context) { //傳輸 this.context = context; } //具體業務邏輯 public void process(byte[] key, byte[] value) { //1.拿到訊息資料,轉成字串 String message = new String(value); //2.如果包含- 去除 if (message.contains("-")) {
//3.把- 去掉 之後去掉左側資料 message = message.split("-")[1]; } //4.傳送資料 context.forward(key, message.getBytes()); } //釋放資源 public void close() { } }

2、Application類

package com.css.kafka.kafka_stream;

import java.util.Properties;

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorSupplier; /** * 需求:對資料進行清洗操作 * * 思路:wo-henshuai 把-和wo清洗掉*/ public class Application { public static void main(String[] args) { //1.定義主題 傳送到 另外一個主題中 資料清洗 String oneTopic = "t1"; String twoTopic = "t2"; //2.設定屬性 Properties prop = new Properties(); prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "logProcessor"); prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.146.132:9092,192.168.146.133:9092,192.168.146.134:9092"); //3.例項物件 StreamsConfig config = new StreamsConfig(prop); //4.流計算 拓撲 Topology builder = new Topology(); //5.定義kafka元件資料來源 builder.addSource("Source", oneTopic).addProcessor("Processor", new ProcessorSupplier<byte[], byte[]>() { public Processor<byte[], byte[]> get() { return new LogProcessor(); } //從哪裡來 }, "Source") //到哪裡去 .addSink("Sink", twoTopic, "Processor"); //6.例項化kafkaStream KafkaStreams kafkaStreams = new KafkaStreams(builder, prop); kafkaStreams.start(); } }

3、執行Application類的main方法

4、在hd09-1機器上建立主題t1

bin/kafka-topics.sh --zookeeper hd09-1:2181 --create --replication-factor 3 --partition 1 --topic t1

5、在hd09-2機器上啟動消費者

bin/kafka-console-consumer.sh --bootstrap-server hd09-2:9092 --topic t2 --from-beginning --consumer.config config/consumer.properties

6、在hd09-1機器上啟動生產者

bin/kafka-console-producer.sh --broker-list hd09-1:9092 --topic t1

7、此時在hd09-1機器kafka生產者上輸入  wo-henshuai,在hd09-2消費者機器上會顯示henshuai,即完成了資料清洗,如下圖。