1. 程式人生 > 實用技巧 >Flink 從 0 到 1 學習之(25)Flink從redis中獲取資料作為source源

Flink 從 0 到 1 學習之(25)Flink從redis中獲取資料作為source源

redis中的資料:

需要實現SourceFunction介面,指定泛型<>,也就是獲取redis裡的資料,處理完後的資料輸入的資料型別 這裡我們需要的是
(我們需要返回kv對的,就要考慮HashMap)
pom.xml

 <!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
            <dependency>
                <groupId>redis.clients</groupId>
                <artifactId
>jedis</artifactId> <version>2.9.3</version> </dependency>

Java程式碼:

package ryx.source;

import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisConnectionException; import javax.swing.plaf.TableHeaderUI; import java.util.HashMap; import java.util.Map; /** * * 在redis中儲存的有國家和大區的關係 * hset areas AREA_US US * hset areas AREA_CT TW,HK * hset areas AREA_AR PK,KW,SA * hset areas AREA_IN IN *./bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic allDataClean--from-beginning * * 我們需要返回kv對的,就要考慮HashMap
*/ public class MyRedisSource implements SourceFunction<HashMap<String,String>> { private Logger logger= LoggerFactory.getLogger(MyRedisSource.class); private boolean isRunning =true; private Jedis jedis=null; private final long SLEEP_MILLION=5000; public void run(SourceContext<HashMap<String, String>> ctx) throws Exception { this.jedis = new Jedis("hadoop01", 6379); HashMap<String, String> kVMap = new HashMap<String, String>(); while(isRunning){ try{ kVMap.clear(); Map<String, String> areas = jedis.hgetAll("areas"); for(Map.Entry<String,String> entry:areas.entrySet()){ // key :大區 value:國家 String key = entry.getKey(); String value = entry.getValue(); String[] splits = value.split(","); System.out.println("key:"+key+",--value:"+value); for (String split:splits){ // key :國家value:大區 kVMap.put(split, key); } } if(kVMap.size()>0){ ctx.collect(kVMap); }else { logger.warn("從redis中獲取的資料為空"); } Thread.sleep(SLEEP_MILLION); }catch (JedisConnectionException e){ logger.warn("redis連線異常,需要重新連線",e.getCause()); jedis = new Jedis("hadoop01", 6379); }catch (Exception e){ logger.warn(" source 資料來源異常",e.getCause()); } } } public void cancel() { isRunning=false; while(jedis!=null){ jedis.close(); } } }

結果為:
key:AREA_US,–value:US
key:AREA_CT,–value:TW,HK
key:AREA_AR,–value:PK,KW,SA
key:AREA_IN,–value:IN

接著將value資料進行分割單個的單詞,和key進行進行組合裝到HashMap中,通過Run方法的SourceContext物件,作為Source源進行輸出!