Flink自定義一個簡單source
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.lucene.analysis.CachingTokenFilter; import java.util.Random; public class MySelfSourceTest01 { public static void main(String[] args) { Logger.getLogger("org").setLevel(Level.OFF); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> dataStreamSource = env.addSource(new SourceFunction<String>() { @Override public void run(SourceContext<String> ctx) throws Exception { Random random = new Random();
// 迴圈可以不停的讀取靜態資料 while (true) { int nextInt = random.nextInt(100); ctx.collect("random : " + nextInt); Thread.sleep(1000); } } @Override public void cancel() { } }); WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> window = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { String[] sps = value.split(":"); return new Tuple2<>(value, Integer.parseInt(sps[1].trim())); } }).keyBy(0).timeWindow(Time.seconds(5)); SingleOutputStreamOperator<String> apply = window.apply(new WindowFunction<Tuple2<String, Integer>, String, Tuple, TimeWindow>() { @Override public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception { input.forEach(x -> { System.out.println("apply function -> " + x.f0); out.collect(x.f0); }); } }); apply.print(); try { env.execute("myself_source_test01"); } catch (Exception e) { e.printStackTrace(); } } }
相關推薦
Flink自定義一個簡單source
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.t
springboot 自定義一個簡單的 starter
1.新建專案 。 啟動器只用來做依賴匯入; 專門來寫一個自動配置模組; idea 下建立空專案 hello-spring-boot-starter 新增兩個子模組 spring-boot-starter-autoconfigurer, spring-boot-starter-hell
自定義一個簡單的迭代器(line_iterator)
STL是容器、迭代器、演算法三位一體的好東西,使用STL編寫的程式看起來非常簡潔。比如從cin輸入若干字串,每一字串佔一行,然後將這些字串按字典序排序並輸出到cout中,相關的程式碼如下所示: /* 迭代器的使用 * created: btwsmile
Visual C++網路程式設計經典案例詳解 第5章 網頁瀏覽器 HTTP響應 實體資料 自定義一個簡單的訊息體結構
總之,伺服器返回的響應訊息類似於C++語言中的結構體 訊息頭和訊息體就是這個結構體裡面的元素。 使用者在使用HTTP程式設計時, 可以根據需要自定義一個結構體儲存該訊息資料。 例如,自定義一個簡單的訊息結構體 typedef struct { char *messagehead; /
Django自定義一個簡單的中介軟體,並使用此中介軟體
1、在最近做的專案中,需要每個頁面訪問的時候判斷是否登入,沒登入的話就跳轉到登入頁面,因此抽出個公共方法,並自定義箇中間件是很有必要的,這樣就可以用註解方式去使用這個自定義的中介軟體,就如Django自帶的@login_required一樣。 2、因此首先在專案的目錄底下新
自定義View(簡單一個畫板)
畫板 程式碼 public class PaintView extends View { private Paint mPaint;//畫筆工具 private Path mPath;//路徑 public PaintView(Conte
自定義一個更好用的SwipeRefreshLayout(彈力拉伸效果詳解)(轉載)
dsc drag 常數 lane swipe loading 數據改變 高中數學 tca 轉自: 自定義一個更好用的SwipeRefreshLayout(彈力拉伸效果詳解) 前言 熟悉SwipeRefreshLayout的同學一定知道,SwipeRefreshLayout是
2.Border Layout 自定義一個Layout來完成布局。
log 自定義 min int size ger 官方文檔 implement for 目標: 1.每一個被添加到布局裏的控件都是QLayoutItem,我們根據方位添加。 2.定義一個結構體 ItemWrapper。裏面包含QLayoutItem
自定義一個校驗器--------------------------完成用戶註冊時候,對username是否符合規則以及時候已經存在於數據庫的校驗
實例 check ajax -- value ava .cn java 數據 實例: <!-- 自定義校驗表單--> $.validator.addMethod( "checkusername", //校驗規則名稱,類似於required
c++primer,自定義一個復數類
opera 指針 隨著 per call 拷貝構造函數 會銷 局部變量 eal 1 #include<iostream> 2 #include<string> 3 #include<vector> 4 #include<a
Android零基礎入門第24節:自定義View簡單使用
子類 protect jin 討論 我們 @+ amp 進階 運行程序 當我們開發中遇到Android原生的組件無法滿足需求時,這時候就應該自定義View來滿足這些特殊的組件需求。 一、概述 很多初入Android開發的程序員,對於Android自定義View可能比較
關於自定義一個上傳的file按鈕
ner receive play display list 之前 引入 image oot 在input中html給我們一個 type file用來做文件上傳的功能,比如 但是這樣的樣式,實在難看,在開發的時候看了layui和bootstrap的點擊上傳,都很不錯。 前
如何自定義一個長度可變數組
方式 urn img 數組長度 字符串數組 個數 很多 由於 lac 摘要:本文主要寫了如何自定義一個長度可變數組 數組是在程序設計中,為了處理方便,把具有相同類型的若幹元素按無序的形式組織起來的一種形式 在定義之初,數組的長度就被定義 新建數組有很多方式 下面兩個都可
Java類載入器( CLassLoader ) 死磕5: 自定義一個檔案系統的classLoader
【正文】Java類載入器( CLassLoader ) 死磕5: 自定義一個檔案系統classLoader 本小節目錄 5.1. 自定義類載入器的基本流程 5.2. 入門案例:自定義檔案系統類載入器 5.3. 案例的環境配置 5.4 FileClassLoader 案例實現步驟 5
使用java反射,自定義springMvc簡單案例
目前javaWeb開發領域,SpringMvc已經是絕大部分中小公司必選框架,那麼springMvc是如何實現的呢。這裡通過一個簡單的小案例來演示一下。 首先看一下案例的結構圖 目前springBoot專案比較流行,這裡新建一個springBoot專案,先引入專案依賴
flutter - 點選事件(一) - 自定義一個方便的點選控制元件
android中,所有View都可以直接setOnClickListener, RN中也有TouchableHightlight這樣的控制元件可以直接套在外面,ios中也可以有UIControl 這樣的控制元件可以直接新增點選事件. 那麼flutter中有嗎? 答案自然是有. Ges
flutter 如何自定義一個loadmore / 載入更多
寫在前面 這類的庫在pub上有很多 我為什麼要自定義呢 首先是專案需要,並且這種庫普適性高,抽取出來今後複用也方便點 另外記錄一下編碼思路,方便後續檢視 pub地址 pub國內映象 github 使用說明 匯入說明看這裡 或 中文映象 看看構造方法 一共
Vue 自定義一個外掛的用法及案例
1.開發外掛 install有兩個引數,第一個是Vue構造器,第二個引數是一個可選的選項物件 MyPlugin.install = function (Vue, options) { // 1. 新增全域
React Native:自定義一個導航欄,改變狀態列背景,隱藏狀態列
設計開發過程中,導航欄都會有所不同,這時候使用RN就需要自定義一個想要的導航欄了,RN中文網有講專門ios的導航欄(NavigatorIOS),可以不用自定義。 首先定義自定義導航欄的一些屬性的約束,記得npm install --save prop-types然後引入import Prop
python實現使用者自定義一個矩形的輸出
補充知識:python中的print 函式在列印的時候末尾會自動補全換行,python3.0以上版本可以用 print(‘abc’,end = ‘’)來換掉換行,直接再一行中列印 如果不加end的話,最後一個預設值是\n,如果加了end=’’,就把\n給去掉了,變成一個空的字串print()