Flink開發_Flink函式實現
阿新 • • 發佈:2020-12-02
Flink函式實現
Flink中的 filter map flatmap
filter
public class BlanksFilterFunction<T> implements FilterFunction<Tuple2<String,String>> { @Override public boolean filter(Tuple2<String, String> tupleValue) throws Exception { if (tupleValue != null) { String colKey = tupleValue.f0; String colValue = tupleValue.f1; if (!"".equals(colKey) && !"".equals(colValue)) { return true; } else { return false; } } else { return false; } } }
map
public class IdeSubscribeParseFunction implements MapFunction<String, TesteBean> { @Override public TesteBean map(String value) throws Exception { if(StringUtils.isBlank(value)) { return null; } JSONObject js = null; try { js = JSON.parseObject(value); } catch (Exception e) { return null; } if(js != null && !js.isEmpty()) { //獲取cookie String jobName = js.getString("jobName"); TesteBean ideRecord = new TesteBean(); ideRecord.setjobName(jobName); return ideRecord; } return null; } }
flatMap
/** * Implements a string tokenizer that splits sentences into words as a user-defined FlatMapFunction. *The function takes a line (String) and splits it into multiple pairs in the form of "(word,1)" ({ Tuple2<String,Integer>}). */ public static class SelectEnglishAndTokenizeFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> { private transient ObjectMapper jsonParser; @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { if (jsonParser == null) { jsonParser = new ObjectMapper(); } JsonNode jsonNode = jsonParser.readValue(value, JsonNode.class); boolean isEnglish = jsonNode.has("user") && jsonNode.get("user").has("lang") && jsonNode.get("user").get("lang").asText().equals("en"); boolean hasText = jsonNode.has("text"); if (isEnglish && hasText) { // message of tweet StringTokenizer tokenizer = new StringTokenizer(jsonNode.get("text").asText()); // split the message while (tokenizer.hasMoreTokens()) { String result = tokenizer.nextToken().replaceAll("\\s*", "").toLowerCase(); if (!result.equals("")) { out.collect(new Tuple2<>(result, 1)); } } } } }
getKey
public class NameKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
@Override
public String getKey(Tuple2<String, Integer> value) {
return value.f0;
}
}
Spark的FlatMapFunction
// Convert each line of Array[Byte] to String, and split into words
JavaDStream<String> words = unionStreams.flatMap(new FlatMapFunction<byte[], String>() {
@Override
public Iterator<String> call(byte[] line) {
String s = new String(line, StandardCharsets.UTF_8);
return Arrays.asList(WORD_SEPARATOR.split(s)).iterator();
}
});
public class TestMatchFlatFuntion implements FlatMapFunction<Row, Row> {
Map<String, TestBean> sysMap = null;
/**
* 用於傳遞變數
* @param useBroad
*/
public TestMatchFlatFuntion(Broadcast<Map<String, >> useBroad) {
sysMap = useBroad.getValue();
}
@Override
public Iterator<Row> call(Row montiorRow) throws Exception {
String system_cd = montiorRow.getAs("system").toString().toLowerCase();
String query_commond = montiorRow.getAs("sql");
String crowdid = montiorRow.getAs("crowdid");
TestBean syTestBean = sysMap.get(system_cd);
Set<TestBean> TestSet = null;
if( syTestBean !=null){ TestSet = syTestBean.parseCrowd(query_commond);}
List<Row> resultLs = new ArrayList<Row>();
if (TestSet != null && !TestSet.isEmpty()) {
for (TestBean parsedTest : TestSet) {
String Test_cd = parsedTest.getTest_cd();
Row row = RowFactory.create( system_cd, Test_cd );
resultLs.add(row);
}
}
return resultLs.iterator();
}
}
參考:
Flink 原始碼
Spark原始碼