storm_jdbc 最完整的版本,可貼上直接使用
阿新 • • 發佈:2018-12-12
import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import com.google.common.collect.Lists; import java.util.List; import java.util.Map; import java.util.Random; /** * @author cwc * @date 2018年5月31日 * @description:儲存資料的spout,我的讀與寫共用的這一個spout,用於測試 * @version 1.0.0 */ public class JdbcSpout extends BaseRichSpout { public static Random random =new Random(); private static final long serialVersionUID = 1L; private SpoutOutputCollector collector; //模擬資料 public static final List<Values> rows = Lists.newArrayList( new Values("peter",random.nextInt(80),1), new Values("bob",random.nextInt(60),2), new Values("alice",random.nextInt(100),2)); @Override public void nextTuple() { Random rand = new Random(); Values row = rows.get(rand.nextInt(rows.size() - 1)); // this.collector.emit(new Values("bob"));//用於佔位符查詢的欄位 this.collector.emit(row);//用於儲存寫入 System.out.println(row); Thread.yield(); } @Override public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) { this.collector =collector; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("name","age","sex"));//用於儲存寫入 // declarer.declare(new Fields("name"));//用於佔位符查詢的欄位 } }