flink按事件時間排序
阿新 • • 發佈:2020-09-17
1.事件時間提取器
class CustomerStatusChangedWatermark extends AscendingTimestampExtractor<CustomerStatusChangedEvent> { @Override public long extractAscendingTimestamp(CustomerStatusChangedEvent customerStatusChangedEvent) { return customerStatusChangedEvent.getEventTime(); } }
2. 測試
public class WatermarkTest { public static void main(String[] args) { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<CustomerStatusChangedEvent> stream = env.fromElements(new CustomerStatusChangedEvent(1001L, 1, 2, DateUtils.toTimeStamp(LocalDateTime.now())), new CustomerStatusChangedEvent(1011L, 1, 2, DateUtils.toTimeStamp(LocalDateTime.now().minusSeconds(20))), new CustomerStatusChangedEvent(1021L, 1, 2, DateUtils.toTimeStamp(LocalDateTime.now().minusSeconds(30))), new CustomerStatusChangedEvent(1031L, 1, 2, DateUtils.toTimeStamp(LocalDateTime.now().plusSeconds(20))) ); DataStream<Long> watermarkStream = stream .assignTimestampsAndWatermarks(new CustomerStatusChangedWatermark()) .map(new MapFunction<CustomerStatusChangedEvent, Long>() { @Override public Long map(CustomerStatusChangedEvent p) throws Exception { return p.getCustomerId(); } }); watermarkStream.print(); try { env.execute("按事件時間排序"); } catch (Exception ex) { } } }
3.輸出
4> 1001
3> 1031
1> 1011
2> 1021