Kafka+SparkStreaming解析Json資料並插入Hbase,包含部分業務邏輯
阿新 • • 發佈:2019-01-31
以下程式碼是在學習Spark時候自己寫的例子,還不成熟,僅供記錄和參考
下邊直接上程式碼,我在我覺得有用的位置加了比較詳細的註解
import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.hadoop.hbase.client.Put; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils; import scala.Tuple2; import com.alibaba.fastjson.JSONObject; public class KafkaStream_Json { static final String ZK_QUORUM = "devhadoop3:2181,devhadoop2:2181,devhadoop1:2181"; static final String GROUP = "spark_json_test_group"; static final String TOPICSS = "spark_json_test2"; static final String NUM_THREAD = "5"; @SuppressWarnings({ "serial" }) public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("json_test").setMaster("local[2]"); conf.set("spark.testing.memory", "2147480000");// 後面的值大於512m即可 JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(20)); int numThreads = Integer.parseInt(NUM_THREAD); Map<String, Integer> topicMap = new HashMap<String, Integer>(); String[] topics = TOPICSS.split(","); for (String topic : topics) { topicMap.put(topic, numThreads); } JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, ZK_QUORUM, GROUP, topicMap);// 原始資料 JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {// 這裡返回的應該就是一個Json字串了 public String call(Tuple2<String, String> tuple2) { return tuple2._2(); } }); JavaDStream<JSONObject> words_2 = lines.flatMap(new FlatMapFunction<String, JSONObject>() {// 把資料轉換成json @Override public Iterable<JSONObject> call(String jsonStr) throws Exception { List<JSONObject> arr = new ArrayList<JSONObject>(); JSONObject obj = JSONObject.parseObject(jsonStr); System.out.println("收到的資料" + jsonStr); arr.add(obj); return arr; } }); JavaDStream<JSONObject> words = words_2.persist();// 快取也可以根據實際業務儲存,也可以用cache,cache只支援MEMORY_ONLY級別快取 // 如果上邊不快取,那麼type1和type2輸出的時候,都需要重新執行以下lines,words_2的操作,那麼效率將會很低 // 業務分流,根據業務編號先區分出不同的訊息,業務1 JavaDStream<JSONObject> type1 = words.filter(new Function<JSONObject, Boolean>() { @Override public Boolean call(JSONObject v1) throws Exception { return "1".equals(v1.getString("type")); } }); // 業務2的資料 JavaDStream<JSONObject> type2 = words.filter(new Function<JSONObject, Boolean>() { @Override public Boolean call(JSONObject v1) throws Exception { return "2".equals(v1.getString("type")); } }); JavaDStream<JSONObject> type1_2 = type1.map(new Function<JSONObject, JSONObject>() { @Override public JSONObject call(JSONObject v1) throws Exception { /* * 對v1進行業務處理,但是最終結果是在type1_2,類似於string的 substring函式 * * 必須用一個新的去接而不是改變type1裡的v1的值 * * 這裡即使我們改變的起始是v1但是實際上type1裡的v1並沒有變化 */ v1.put("context", "測試哈哈哈"); return v1; } }); type1.print();// type1_2.print();// type2.print(); /* * 下邊是迴圈是獲得真正資料的一種方式 ,foreachRDD也相當於是一種輸出 */ type1_2.foreachRDD(new VoidFunction<JavaRDD<JSONObject>>() { @Override public void call(JavaRDD<JSONObject> rdd) throws Exception { System.out.println("123333333333333333333333333333"); List<Put> puts = new ArrayList<Put>(); System.out.println("外部" + puts.hashCode()); List<JSONObject> dataList = rdd.collect(); for (JSONObject t : dataList) { System.out.println(t.getString("name")); Put put = new Put(t.getString("name").getBytes()); put.addColumn("data".getBytes(), "name".getBytes(), t.getString("name").getBytes()); put.addColumn("data".getBytes(), "age".getBytes(), t.getString("age").getBytes()); put.addColumn("data".getBytes(), "type".getBytes(), t.getString("type").getBytes()); put.addColumn("data".getBytes(), "context".getBytes(), t.getString("context").getBytes()); puts.add(put); // System.out.println("內部" + puts.hashCode());//這裡的puts,hashCode每次都不一樣,但是確實是最後都加入到一個List裡了 } if (puts.size() > 0) { System.out.println("陣列大小"+puts.size()); HbaseInsert.getInstance().insertHbase("lwb_test", puts); } } }); jssc.start();// jssc.awaitTermination();// } }
這個是批量插入HBase的隨便寫的一個插入類
import java.io.IOException; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; public class HbaseInsert { private static HbaseInsert hbaseInsert; private static Configuration configuration; private static String zkHost = "devhadoop3,devhadoop2,devhadoop1"; private static String zkPort = "2181"; private static String zkParent = "/hbase-unsecure"; private static Connection connection; private HbaseInsert() { configuration = HBaseConfiguration.create(); configuration.set("hbase.zookeeper.quorum", zkHost); configuration.set("hbase.zookeeper.property.clientPort", zkPort); configuration.set("zookeeper.znode.parent", zkParent); try { connection = ConnectionFactory.createConnection(configuration); } catch (IOException e) { e.printStackTrace(); } } public static synchronized HbaseInsert getInstance() { if (hbaseInsert == null) { hbaseInsert = new HbaseInsert(); } return hbaseInsert; } public void insertHbase(String tablename, List<Put> puts) { Table table = null; try { table = connection.getTable(TableName.valueOf(tablename)); table.put(puts); } catch (IOException e) { e.printStackTrace(); } finally { if (table != null) { try { table.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
下邊是我測試往kafka裡插入資料的程式碼
import java.util.Properties; import com.alibaba.fastjson.JSONObject; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class KafkaProducer { public static void main(String[] aaa) { Properties props = new Properties(); // 此處配置的是kafka的埠 props.put("metadata.broker.list", "192.168.1.100:2181,192.168.1.101:2181,192.168.1.102:2181");// 這裡必須用域名 // kafka.serializer. props.put("request.required.acks", "-1"); props.put("serializer.class", "kafka.serializer.StringEncoder"); Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(props)); for (int i = 0; i < 10; i++) { JSONObject obj = new JSONObject(); obj.put("name", "name"+i); obj.put("age", i); obj.put("type", String.valueOf(i%4)); producer.send(new KeyedMessage<String, String>("spark_json_test2", obj.toJSONString()));// } producer.close(); } }