Storm和Redis native的整合
阿新 • • 發佈:2019-01-30
Storm-redis provides basic Bolt implementations, RedisLookupBolt
and RedisStoreBolt
.Storm提供了兩種Blot,從Redis查詢和插入Redis
public class WordSpout implements IRichSpout { boolean isDistributed; SpoutOutputCollector collector; Random random; public static final String[] words = {"apple", "orange", "banana", "HAP", "IBM"}; @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.collector = spoutOutputCollector; this.random = new Random(); } @Override public void close() { } @Override public voidactivate() { } @Override public void deactivate() { } @Override public void nextTuple() { final String word = words[random.nextInt(words.length)]; System.out.println("Spoutword:" + word); collector.emit(new Values(word), UUID.randomUUID()); } @Overridepublic void ack(Object o) { } @Override public void fail(Object o) { } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
public class PersistentWordCount { private static final String WORD_SPOUT = "WORD_SPOUT"; private static final String COUNT_BOLT = "COUNT_BOLT"; private static final String STORE_BOLT = "STORE_BOLT"; private static final String TEST_REDIS_HOST = "localhost"; private static final int TEST_REDIS_PORT = 6379; public static void main(String[] args) throws Exception { Config config = new Config(); String host = TEST_REDIS_HOST; int port = TEST_REDIS_PORT; if (args.length >= 2) { host = args[0]; port = Integer.parseInt(args[1]); } JedisPoolConfig poolConfig = new JedisPoolConfig.Builder() .setHost(host).setPort(port).build(); WordSpout spout = new WordSpout(); WordCounterBlot bolt = new WordCounterBlot(); RedisStoreMapper storeMapper = setupStoreMapper();
RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);// wordSpout ==> countBolt ==> RedisBolt TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(WORD_SPOUT, spout, 2); builder.setBolt(COUNT_BOLT, bolt, 2).fieldsGrouping(WORD_SPOUT, new Fields("word")); builder.setBolt(STORE_BOLT, storeBolt, 2).shuffleGrouping(COUNT_BOLT); if (args.length == 3) { StormSubmitter.submitTopology(args[3], config, builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("reidsTopo", config, builder.createTopology()); /* Thread.sleep(1000); cluster.shutdown();*/ } } private static RedisStoreMapper setupStoreMapper() { return new WordCountStoreMapper(); } private static class WordCountStoreMapper implements RedisStoreMapper { private RedisDataTypeDescription description; private final String hashKey = "wordCount"; public WordCountStoreMapper() { description = new RedisDataTypeDescription( RedisDataTypeDescription.RedisDataType.HASH, hashKey); } @Override public RedisDataTypeDescription getDataTypeDescription() { return description; } @Override public String getKeyFromTuple(ITuple tuple) { System.out.println("-----------word:" + tuple.getStringByField("word")); return tuple.getStringByField("word"); } @Override public String getValueFromTuple(ITuple tuple) { System.out.println("************count:" + tuple.getStringByField("count")); return tuple.getStringByField("count"); } } }