kafka-streams進行簡單的資料清洗
阿新 • • 發佈:2018-12-23
package com.terry.kafkastream; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorSupplier; import java.util.Properties; /** * 需求:對資料進行清洗操作 * * 思路:terry-henshuai 把-清洗掉 */ public class Application { public static void a(String[] args) { //1、定義主題 傳送到另外一個主題 資料清洗 String oneTopic = "t1"; String twoTopic = "t2"; //2、設定屬性 Properties properties = new Properties(); properties.put(StreamsConfig.APPLICATION_ID_CONFIG,"logProcessor"); properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata:9092"); //3、例項物件 StreamsConfig streamsConfig = new StreamsConfig(properties); //4、流計算 Topology topology = new Topology(); //5、定義kafka元件資料來源 topology.addSource("Source", oneTopic).addProcessor("Processor", new ProcessorSupplier<byte[], byte[]>() { @Override public Processor<byte[], byte[]> get() { return new LogProcessor(); } },"Source").addSink("Sink",twoTopic,"Processor"); //6、例項化 KafkaStreams kafkaStreams = new KafkaStreams(topology, properties); kafkaStreams.start(); } }
package com.terry.kafkastream; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; /** * 資料清洗 */ public class LogProcessor implements Processor<byte[], byte[]> { private ProcessorContext processorContext; @Override public void init(ProcessorContext processorContext) { //傳輸 this.processorContext=processorContext; } @Override public void process(byte[] key, byte[] value) { //1、拿到訊息資料,專程字串 String s = new String(value); //2、如果包含-,則取出 if(s.contains("-")){ String[] split = s.split("-"); s = split[1]; } processorContext.forward(key,s.getBytes()); } @Override public void close() { } }