kafka的流計算框架
阿新 • • 發佈:2018-12-27
需求:
producer: 傳送例如 aa-zz
consumer:收到zz
通過-切分得到後面的,如果沒有-就正常輸出
Processor
public class LogProcessor implements Processor<byte [],byte []> { ProcessorContext context ; //初始化 public void init(ProcessorContext context) { //傳輸 this.context = context; } //具體業務邏輯 public void process(byte[] key, byte[] value) { //獲取資料 String line = new String(value); //切分資料 if (line.contains("-")){ line = line.split("-")[1]; } //輸出資料 context.forward(key,line.getBytes()); } //釋放資源 public void close() { } }
application
public class Application { public static void main(String[] args) { String onetopic = "t1"; String twotopic = "t2"; Properties prop = new Properties(); prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"logProcessor"); prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.232.132:9092,192.168.232.133:9092,192.168.232.134:9092"); StreamsConfig s = new StreamsConfig(prop); //初始化拓撲 Topology Builder = new Topology(); Builder.addSource("source",onetopic).addProcessor("processor", new ProcessorSupplier() { public Processor<byte[],byte[]> get() { return new LogProcessor(); } },"source").addSink("Sink",twotopic,"processor"); //輸出 KafkaStreams ka = new KafkaStreams(Builder,prop); //可以是Builder,s,我試過,好像過時了..... ka.start(); } }
kafka可以做流計算,但是不適合