1. 程式人生 > >storm_jdbc 最完整的版本,可貼上直接使用

storm_jdbc 最完整的版本,可貼上直接使用

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import com.google.common.collect.Lists;

import java.util.List;
import java.util.Map;
import java.util.Random;
/**
 * @author cwc
 * @date 2018年5月31日  
 * @description:儲存資料的spout,我的讀與寫共用的這一個spout,用於測試
 * @version 1.0.0 
 */
public class JdbcSpout extends BaseRichSpout {
	public static Random random =new Random();
	private static final long serialVersionUID = 1L;
	private SpoutOutputCollector collector;
	//模擬資料
	public static final List<Values> rows = Lists.newArrayList(
	            new Values("peter",random.nextInt(80),1),
	            new Values("bob",random.nextInt(60),2),
	            new Values("alice",random.nextInt(100),2));

	@Override
	public void nextTuple() {
		  Random rand = new Random();
	      Values row = rows.get(rand.nextInt(rows.size() - 1));
	      
//	      this.collector.emit(new Values("bob"));//用於佔位符查詢的欄位
	      this.collector.emit(row);//用於儲存寫入
	      System.out.println(row);
	      Thread.yield();
	}

	
	@Override
	public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) {
		this.collector =collector;
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		 declarer.declare(new Fields("name","age","sex"));//用於儲存寫入
//		 declarer.declare(new Fields("name"));//用於佔位符查詢的欄位
	}

}