Storm 系列(七)—— Storm 整合 Redis 詳解
一、簡介
Storm-Redis 提供了 Storm 與 Redis 的整合支援,你只需要引入對應的依賴即可使用:
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-redis</artifactId>
<version>${storm.version}</version>
<type>jar</type>
</dependency>
Storm-Redis 使用 Jedis 為 Redis 客戶端,並提供瞭如下三個基本的 Bolt 實現:
- RedisLookupBolt:從 Redis 中查詢資料;
- RedisStoreBolt:儲存資料到 Redis;
- RedisFilterBolt : 查詢符合條件的資料;
RedisLookupBolt
、RedisStoreBolt
、RedisFilterBolt
均繼承自 AbstractRedisBolt
抽象類。我們可以通過繼承該抽象類,實現自定義 RedisBolt,進行功能的拓展。
二、整合案例
2.1 專案結構
這裡首先給出一個整合案例:進行詞頻統計並將最後的結果儲存到 Redis。專案結構如下:
用例原始碼下載地址:storm-redis-integration
2.2 專案依賴
專案主要依賴如下:
<properties> <storm.version>1.2.2</storm.version> </properties> <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>${storm.version}</version> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-redis</artifactId> <version>${storm.version}</version> </dependency> </dependencies>
2.3 DataSourceSpout
/**
* 產生詞頻樣本的資料來源
*/
public class DataSourceSpout extends BaseRichSpout {
private List<String> list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive");
private SpoutOutputCollector spoutOutputCollector;
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.spoutOutputCollector = spoutOutputCollector;
}
@Override
public void nextTuple() {
// 模擬產生資料
String lineData = productData();
spoutOutputCollector.emit(new Values(lineData));
Utils.sleep(1000);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("line"));
}
/**
* 模擬資料
*/
private String productData() {
Collections.shuffle(list);
Random random = new Random();
int endIndex = random.nextInt(list.size()) % (list.size()) + 1;
return StringUtils.join(list.toArray(), "\t", 0, endIndex);
}
}
產生的模擬資料格式如下:
Spark HBase
Hive Flink Storm Hadoop HBase Spark
Flink
HBase Storm
HBase Hadoop Hive Flink
HBase Flink Hive Storm
Hive Flink Hadoop
HBase Hive
Hadoop Spark HBase Storm
2.4 SplitBolt
/**
* 將每行資料按照指定分隔符進行拆分
*/
public class SplitBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
String line = input.getStringByField("line");
String[] words = line.split("\t");
for (String word : words) {
collector.emit(new Values(word, String.valueOf(1)));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
2.5 CountBolt
/**
* 進行詞頻統計
*/
public class CountBolt extends BaseRichBolt {
private Map<String, Integer> counts = new HashMap<>();
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector=collector;
}
@Override
public void execute(Tuple input) {
String word = input.getStringByField("word");
Integer count = counts.get(word);
if (count == null) {
count = 0;
}
count++;
counts.put(word, count);
// 輸出
collector.emit(new Values(word, String.valueOf(count)));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
2.6 WordCountStoreMapper
實現 RedisStoreMapper 介面,定義 tuple 與 Redis 中資料的對映關係:即需要指定 tuple 中的哪個欄位為 key,哪個欄位為 value,並且儲存到 Redis 的何種資料結構中。
/**
* 定義 tuple 與 Redis 中資料的對映關係
*/
public class WordCountStoreMapper implements RedisStoreMapper {
private RedisDataTypeDescription description;
private final String hashKey = "wordCount";
public WordCountStoreMapper() {
description = new RedisDataTypeDescription(
RedisDataTypeDescription.RedisDataType.HASH, hashKey);
}
@Override
public RedisDataTypeDescription getDataTypeDescription() {
return description;
}
@Override
public String getKeyFromTuple(ITuple tuple) {
return tuple.getStringByField("word");
}
@Override
public String getValueFromTuple(ITuple tuple) {
return tuple.getStringByField("count");
}
}
2.7 WordCountToRedisApp
/**
* 進行詞頻統計 並將統計結果儲存到 Redis 中
*/
public class WordCountToRedisApp {
private static final String DATA_SOURCE_SPOUT = "dataSourceSpout";
private static final String SPLIT_BOLT = "splitBolt";
private static final String COUNT_BOLT = "countBolt";
private static final String STORE_BOLT = "storeBolt";
//在實際開發中這些引數可以將通過外部傳入 使得程式更加靈活
private static final String REDIS_HOST = "192.168.200.226";
private static final int REDIS_PORT = 6379;
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout());
// split
builder.setBolt(SPLIT_BOLT, new SplitBolt()).shuffleGrouping(DATA_SOURCE_SPOUT);
// count
builder.setBolt(COUNT_BOLT, new CountBolt()).shuffleGrouping(SPLIT_BOLT);
// save to redis
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
.setHost(REDIS_HOST).setPort(REDIS_PORT).build();
RedisStoreMapper storeMapper = new WordCountStoreMapper();
RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);
builder.setBolt(STORE_BOLT, storeBolt).shuffleGrouping(COUNT_BOLT);
// 如果外部傳參 cluster 則代表線上環境啟動否則代表本地啟動
if (args.length > 0 && args[0].equals("cluster")) {
try {
StormSubmitter.submitTopology("ClusterWordCountToRedisApp", new Config(), builder.createTopology());
} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
e.printStackTrace();
}
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LocalWordCountToRedisApp",
new Config(), builder.createTopology());
}
}
}
2.8 啟動測試
可以用直接使用本地模式執行,也可以打包後提交到伺服器叢集執行。本倉庫提供的原始碼預設採用 maven-shade-plugin
進行打包,打包命令如下:
# mvn clean package -D maven.test.skip=true
啟動後,檢視 Redis 中的資料:
三、storm-redis 實現原理
3.1 AbstractRedisBolt
RedisLookupBolt
、RedisStoreBolt
、RedisFilterBolt
均繼承自 AbstractRedisBolt
抽象類,和我們自定義實現 Bolt 一樣,AbstractRedisBolt
間接繼承自 BaseRichBolt
。
AbstractRedisBolt
中比較重要的是 prepare 方法,在該方法中通過外部傳入的 jedis 連線池配置 ( jedisPoolConfig/jedisClusterConfig) 建立用於管理 Jedis 例項的容器 JedisCommandsInstanceContainer
。
public abstract class AbstractRedisBolt extends BaseTickTupleAwareRichBolt {
protected OutputCollector collector;
private transient JedisCommandsInstanceContainer container;
private JedisPoolConfig jedisPoolConfig;
private JedisClusterConfig jedisClusterConfig;
......
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
// FIXME: stores map (stormConf), topologyContext and expose these to derived classes
this.collector = collector;
if (jedisPoolConfig != null) {
this.container = JedisCommandsContainerBuilder.build(jedisPoolConfig);
} else if (jedisClusterConfig != null) {
this.container = JedisCommandsContainerBuilder.build(jedisClusterConfig);
} else {
throw new IllegalArgumentException("Jedis configuration not found");
}
}
.......
}
JedisCommandsInstanceContainer
的 build()
方法如下,實際上就是建立 JedisPool 或 JedisCluster 並傳入容器中。
public static JedisCommandsInstanceContainer build(JedisPoolConfig config) {
JedisPool jedisPool = new JedisPool(DEFAULT_POOL_CONFIG, config.getHost(), config.getPort(), config.getTimeout(), config.getPassword(), config.getDatabase());
return new JedisContainer(jedisPool);
}
public static JedisCommandsInstanceContainer build(JedisClusterConfig config) {
JedisCluster jedisCluster = new JedisCluster(config.getNodes(), config.getTimeout(), config.getTimeout(), config.getMaxRedirections(), config.getPassword(), DEFAULT_POOL_CONFIG);
return new JedisClusterContainer(jedisCluster);
}
3.2 RedisStoreBolt和RedisLookupBolt
RedisStoreBolt
中比較重要的是 process 方法,該方法主要從 storeMapper 中獲取傳入 key/value 的值,並按照其儲存型別 dataType
呼叫 jedisCommand 的對應方法進行儲存。
RedisLookupBolt 的實現基本類似,從 lookupMapper 中獲取傳入的 key 值,並進行查詢操作。
public class RedisStoreBolt extends AbstractRedisBolt {
private final RedisStoreMapper storeMapper;
private final RedisDataTypeDescription.RedisDataType dataType;
private final String additionalKey;
public RedisStoreBolt(JedisPoolConfig config, RedisStoreMapper storeMapper) {
super(config);
this.storeMapper = storeMapper;
RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription();
this.dataType = dataTypeDescription.getDataType();
this.additionalKey = dataTypeDescription.getAdditionalKey();
}
public RedisStoreBolt(JedisClusterConfig config, RedisStoreMapper storeMapper) {
super(config);
this.storeMapper = storeMapper;
RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription();
this.dataType = dataTypeDescription.getDataType();
this.additionalKey = dataTypeDescription.getAdditionalKey();
}
@Override
public void process(Tuple input) {
String key = storeMapper.getKeyFromTuple(input);
String value = storeMapper.getValueFromTuple(input);
JedisCommands jedisCommand = null;
try {
jedisCommand = getInstance();
switch (dataType) {
case STRING:
jedisCommand.set(key, value);
break;
case LIST:
jedisCommand.rpush(key, value);
break;
case HASH:
jedisCommand.hset(additionalKey, key, value);
break;
case SET:
jedisCommand.sadd(key, value);
break;
case SORTED_SET:
jedisCommand.zadd(additionalKey, Double.valueOf(value), key);
break;
case HYPER_LOG_LOG:
jedisCommand.pfadd(key, value);
break;
case GEO:
String[] array = value.split(":");
if (array.length != 2) {
throw new IllegalArgumentException("value structure should be longitude:latitude");
}
double longitude = Double.valueOf(array[0]);
double latitude = Double.valueOf(array[1]);
jedisCommand.geoadd(additionalKey, longitude, latitude, key);
break;
default:
throw new IllegalArgumentException("Cannot process such data type: " + dataType);
}
collector.ack(input);
} catch (Exception e) {
this.collector.reportError(e);
this.collector.fail(input);
} finally {
returnInstance(jedisCommand);
}
}
.........
}
3.3 JedisCommands
JedisCommands 介面中定義了所有的 Redis 客戶端命令,它有以下三個實現類,分別是 Jedis、JedisCluster、ShardedJedis。Strom 中主要使用前兩種實現類,具體呼叫哪一個實現類來執行命令,由傳入的是 jedisPoolConfig 還是 jedisClusterConfig 來決定。
3.4 RedisMapper 和 TupleMapper
RedisMapper 和 TupleMapper 定義了 tuple 和 Redis 中的資料如何進行對映轉換。
1. TupleMapper
TupleMapper 主要定義了兩個方法:
getKeyFromTuple(ITuple tuple): 從 tuple 中獲取那個欄位作為 Key;
getValueFromTuple(ITuple tuple):從 tuple 中獲取那個欄位作為 Value;
2. RedisMapper
定義了獲取資料型別的方法 getDataTypeDescription()
,RedisDataTypeDescription 中 RedisDataType 列舉類定義了所有可用的 Redis 資料型別:
public class RedisDataTypeDescription implements Serializable {
public enum RedisDataType { STRING, HASH, LIST, SET, SORTED_SET, HYPER_LOG_LOG, GEO }
......
}
3. RedisStoreMapper
RedisStoreMapper 繼承 TupleMapper 和 RedisMapper 介面,用於資料儲存時,沒有定義額外方法。
4. RedisLookupMapper
RedisLookupMapper 繼承 TupleMapper 和 RedisMapper 介面:
- 定義了 declareOutputFields 方法,宣告輸出的欄位。
- 定義了 toTuple 方法,將查詢結果組裝為 Storm 的 Values 的集合,並用於傳送。
下面的例子表示從輸入 Tuple
的獲取 word
欄位作為 key,使用 RedisLookupBolt
進行查詢後,將 key 和查詢結果 value 組裝為 values 併發送到下一個處理單元。
class WordCountRedisLookupMapper implements RedisLookupMapper {
private RedisDataTypeDescription description;
private final String hashKey = "wordCount";
public WordCountRedisLookupMapper() {
description = new RedisDataTypeDescription(
RedisDataTypeDescription.RedisDataType.HASH, hashKey);
}
@Override
public List<Values> toTuple(ITuple input, Object value) {
String member = getKeyFromTuple(input);
List<Values> values = Lists.newArrayList();
values.add(new Values(member, value));
return values;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("wordName", "count"));
}
@Override
public RedisDataTypeDescription getDataTypeDescription() {
return description;
}
@Override
public String getKeyFromTuple(ITuple tuple) {
return tuple.getStringByField("word");
}
@Override
public String getValueFromTuple(ITuple tuple) {
return null;
}
}
5. RedisFilterMapper
RedisFilterMapper 繼承 TupleMapper 和 RedisMapper 介面,用於查詢資料時,定義了 declareOutputFields 方法,宣告輸出的欄位。如下面的實現:
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("wordName", "count"));
}
四、自定義RedisBolt實現詞頻統計
4.1 實現原理
自定義 RedisBolt:主要利用 Redis 中雜湊結構的 hincrby key field
命令進行詞頻統計。在 Redis 中 hincrby
的執行效果如下。hincrby 可以將欄位按照指定的值進行遞增,如果該欄位不存在的話,還會新建該欄位,並賦值為 0。通過這個命令可以非常輕鬆的實現詞頻統計功能。
redis> HSET myhash field 5
(integer) 1
redis> HINCRBY myhash field 1
(integer) 6
redis> HINCRBY myhash field -1
(integer) 5
redis> HINCRBY myhash field -10
(integer) -5
redis>
4.2 專案結構
4.3 自定義RedisBolt的程式碼實現
/**
* 自定義 RedisBolt 利用 Redis 的雜湊資料結構的 hincrby key field 命令進行詞頻統計
*/
public class RedisCountStoreBolt extends AbstractRedisBolt {
private final RedisStoreMapper storeMapper;
private final RedisDataTypeDescription.RedisDataType dataType;
private final String additionalKey;
public RedisCountStoreBolt(JedisPoolConfig config, RedisStoreMapper storeMapper) {
super(config);
this.storeMapper = storeMapper;
RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription();
this.dataType = dataTypeDescription.getDataType();
this.additionalKey = dataTypeDescription.getAdditionalKey();
}
@Override
protected void process(Tuple tuple) {
String key = storeMapper.getKeyFromTuple(tuple);
String value = storeMapper.getValueFromTuple(tuple);
JedisCommands jedisCommand = null;
try {
jedisCommand = getInstance();
if (dataType == RedisDataTypeDescription.RedisDataType.HASH) {
jedisCommand.hincrBy(additionalKey, key, Long.valueOf(value));
} else {
throw new IllegalArgumentException("Cannot process such data type for Count: " + dataType);
}
collector.ack(tuple);
} catch (Exception e) {
this.collector.reportError(e);
this.collector.fail(tuple);
} finally {
returnInstance(jedisCommand);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
4.4 CustomRedisCountApp
/**
* 利用自定義的 RedisBolt 實現詞頻統計
*/
public class CustomRedisCountApp {
private static final String DATA_SOURCE_SPOUT = "dataSourceSpout";
private static final String SPLIT_BOLT = "splitBolt";
private static final String STORE_BOLT = "storeBolt";
private static final String REDIS_HOST = "192.168.200.226";
private static final int REDIS_PORT = 6379;
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout());
// split
builder.setBolt(SPLIT_BOLT, new SplitBolt()).shuffleGrouping(DATA_SOURCE_SPOUT);
// save to redis and count
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
.setHost(REDIS_HOST).setPort(REDIS_PORT).build();
RedisStoreMapper storeMapper = new WordCountStoreMapper();
RedisCountStoreBolt countStoreBolt = new RedisCountStoreBolt(poolConfig, storeMapper);
builder.setBolt(STORE_BOLT, countStoreBolt).shuffleGrouping(SPLIT_BOLT);
// 如果外部傳參 cluster 則代表線上環境啟動,否則代表本地啟動
if (args.length > 0 && args[0].equals("cluster")) {
try {
StormSubmitter.submitTopology("ClusterCustomRedisCountApp", new Config(), builder.createTopology());
} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
e.printStackTrace();
}
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LocalCustomRedisCountApp",
new Config(), builder.createTopology());
}
}
}
參考資料
- Storm Redis Integration
更多大資料系列文章可以參見 GitHub 開源專案: 大資料入門指南