1. 程式人生 > 實用技巧 >flink按事件時間排序

flink按事件時間排序

1.事件時間提取器

class CustomerStatusChangedWatermark extends AscendingTimestampExtractor<CustomerStatusChangedEvent> {
        @Override
        public long extractAscendingTimestamp(CustomerStatusChangedEvent customerStatusChangedEvent) {
            return customerStatusChangedEvent.getEventTime();
        }
    }

2. 測試

public class WatermarkTest {

    public static void main(String[] args) {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStream<CustomerStatusChangedEvent> stream = env.fromElements(
                
new CustomerStatusChangedEvent(1001L, 1, 2, DateUtils.toTimeStamp(LocalDateTime.now())), new CustomerStatusChangedEvent(1011L, 1, 2, DateUtils.toTimeStamp(LocalDateTime.now().minusSeconds(20))), new CustomerStatusChangedEvent(1021L, 1, 2, DateUtils.toTimeStamp(LocalDateTime.now().minusSeconds(
30))), new CustomerStatusChangedEvent(1031L, 1, 2, DateUtils.toTimeStamp(LocalDateTime.now().plusSeconds(20))) ); DataStream<Long> watermarkStream = stream .assignTimestampsAndWatermarks(new CustomerStatusChangedWatermark()) .map(new MapFunction<CustomerStatusChangedEvent, Long>() { @Override public Long map(CustomerStatusChangedEvent p) throws Exception { return p.getCustomerId(); } }); watermarkStream.print(); try { env.execute("按事件時間排序"); } catch (Exception ex) { } } }

3.輸出

4> 1001
3> 1031
1> 1011
2> 1021