Storm讀取Mysql資料庫表寫入Mysql資料庫另一張表
阿新 • • 發佈:2019-01-02
1、spout:
package com.TestStorm; import java.io.BufferedReader; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.io.Reader; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.Map; import org.apache.storm.generated.DistributedRPCInvocations.Processor.result; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; public class TestSpout extends BaseRichSpout { String url = "jdbc:mysql://192.168.0.44:3306/mysql"; String username = "root"; String password = "mysql"; private ResultSet rs; private Statement state; private SpoutOutputCollector collector; int id = 0; public void nextTuple() { try { String str = ""; if (rs.next()) { id = rs.getInt(1); str = rs.getString(1) + " " + rs.getString(2) + "\n"; System.out.print(str); collector.emit(new Values(str)); }else{ rs.close(); rs = state.executeQuery("select * from storm where id > "+ id +" order by id limit 10000" ); } } catch (SQLException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector arg2) { try { String driver = "com.mysql.jdbc.Driver"; Class.forName(driver); Connection conn = DriverManager.getConnection("jdbc:mysql://192.168.0.44:3306/mysql", "root", "mysql"); state = conn.createStatement(); rs = state.executeQuery("select * from storm where id > "+ id +" order by id limit 10000"); this.collector = arg2; } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (SQLException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void declareOutputFields(OutputFieldsDeclarer arg0) { arg0.declare(new Fields("str")); } }
2、bolt:
package com.TestStorm; import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; import java.net.URI; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; public class TestBolt extends BaseBasicBolt { private String str; private FSDataOutputStream fo; Connection conn; private Statement state; public void execute(Tuple arg0, BasicOutputCollector arg1) { String str = String.valueOf(arg0.getValueByField("str")); try { state.execute("insert into newStorm values('" + str + "')"); } catch (SQLException e) { e.printStackTrace(); } } public void prepare(Map stormConf, TopologyContext context) { System.out.println("............bolt............"); try { String driver = "com.mysql.jdbc.Driver"; Class.forName(driver); conn = DriverManager.getConnection("jdbc:mysql://192.168.0.44:3306/mysql", "root", "mysql"); state = conn.createStatement(); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (SQLException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void declareOutputFields(OutputFieldsDeclarer arg0) { arg0.declare(new Fields("str")); } }
3、Job:
package com.TestStorm; import org.apache.storm.Config; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.AlreadyAliveException; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.InvalidTopologyException; import org.apache.storm.topology.TopologyBuilder; public class Job { public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("Testspout", new TestSpout(), 1); builder.setBolt("Testbolt", new TestBolt(), 10).shuffleGrouping("Testspout"); Config config = new Config(); config.setDebug(false); config.setNumWorkers(2); StormSubmitter.submitTopology(args[0], config, builder.createTopology()); } }