flink聯結器-流處理-讀寫redis
阿新 • • 發佈:2020-11-25
寫入redis
resultStream.addSink(new RedisSink(FlinkUtils.getRedisSinkConfig(parameters),new MyRedisMapper()));
getRedisSinkConfig
public static FlinkJedisSentinelConfig getRedisSinkConfig(ParameterTool parameterTool){ String redisHosts = parameterTool.get(PropertiesUtil.REDIS_HOSTS); Set<String> hosts = new HashSet<String>(Arrays.asList(redisHosts.split(","))); FlinkJedisSentinelConfig redisProduceConfig = new FlinkJedisSentinelConfig.Builder() .setSentinels(hosts) .setMasterName(parameterTool.get(PropertiesUtil.REDIS_MASTER)) .setPassword(parameterTool.get(PropertiesUtil.REDIS_PASSWORD)) .setMaxIdle(parameterTool.getInt(PropertiesUtil.REDIS_POOL_MAXIDEL)) .setMaxTotal(parameterTool.getInt(PropertiesUtil.REDIS_POOL_MAXTOTAL)) .setConnectionTimeout(parameterTool.getInt(PropertiesUtil.REDIS_TIMEOUT)).build(); return redisProduceConfig; }
MyRedisMapper
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; /** * @Auther WeiJiQian * @描述 Redis 儲存的key和value */ public class MyRedisMapper implements RedisMapper<Tuple2<String, String>> { /** * 設定使用的redis資料結構型別,和key的名詞 * 通過RedisCommand設定資料結構型別 * Returns descriptor which defines data type. * * @return data type descriptor */ @Override public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription(RedisCommand.SETEX, Constant.REDIS_KEY_TTL); } /** * 設定value中的鍵值對 key的值 * Extracts key from data. * * @return key */ @Override public String getKeyFromData(Tuple2<String, String> stringStringTuple2) { return stringStringTuple2.f0; } /** * 設定value中的鍵值對 value的值 * Extracts value from data. * * @return value */ @Override public String getValueFromData(Tuple2<String, String> tuple2) { return tuple2.f1; } }