《深入理解Spark》之 結構化流(spark streaming+spark SQL 處理結構化資料)的一個demo
阿新 • • 發佈:2019-01-26
最近在做關於spark Streaming + spark sql 結合處理結構化的資料的業務,下面是一個小栗子,有需要的拿走!
package com.unistack.tamboo.compute.process.impl; import com.alibaba.fastjson.JSONArray; import com.google.common.collect.Maps; import com.unistack.tamboo.compute.process.StreamProcess; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka010.ConsumerStrategies; import org.apache.spark.streaming.kafka010.KafkaUtils; import org.apache.spark.streaming.kafka010.LocationStrategies; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.*; /** * @author hero.li * spark sql處理流資料 */ public class SqlProcess implements StreamProcess{ private static Logger LOGGER = LoggerFactory.getLogger(SqlProcess.class); private Properties outputInfo; private String toTopic; /** * {"datasources":[{"password":"welcome1","port":"3308","ip":"192.168.1.192","dbName":"test","dbType":"MYSQL","dataSourceName":"191_test","username":"root","tableName":"t1"}, * {"password":"welcome1","port":"3308","ip":"192.168.1.191","dbName":"test","dbType":"MYSQL","dataSourceName":"191_test","username":"root","tableName":"t1"}] * ,"sql":"select * from ....","windowLen":"時間範圍,2秒的倍數","windowSlide":"滾動間隔,2的倍數"} */ public SqlProcess(Properties outputInfo,String toTopic){ this.outputInfo = outputInfo; this.toTopic = toTopic; } @Override public void logic(JavaRDD<ConsumerRecord<String, String>> rdd) { rdd.foreachPartition(itr->{ while(itr.hasNext()){ String recored = itr.next().value(); } }); } public static void main(String[] args) throws InterruptedException { try{ Class.forName("com.mysql.jdbc.Driver"); } catch (ClassNotFoundException e){ e.printStackTrace(); } SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1)); SparkSession spark = SparkSession.builder().appName("test_kane").getOrCreate(); Map<String,String> map = Maps.newHashMap(); // map.put("url", "jdbc:mysql://x.x.x.x:3309/test?user=root&password=welcome1&characterEncoding=UTF8"); map.put("url","jdbc:mysql://x.x.x.x:3309/test?characterEncoding=UTF8"); map.put("user","root"); map.put("password", "welcome1"); map.put("dbtable", "t2"); Dataset<Row> hiveJob = spark.read().format("jdbc").options(map).load(); hiveJob.createOrReplaceTempView("t2"); System.setProperty("java.security.auth.login.config","/Users/frank/Desktop/shell/lyh.conf"); Map<String, Object> kafkaParams = new HashMap<>(); kafkaParams.put("bootstrap.servers", "x.x.x.x:9999"); kafkaParams.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); kafkaParams.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); kafkaParams.put("group.id",String.valueOf(System.currentTimeMillis())); kafkaParams.put("auto.offset.reset","earliest"); kafkaParams.put("enable.auto.commit",true); kafkaParams.put("sasl.mechanism","PLAIN"); kafkaParams.put("security.protocol","SASL_PLAINTEXT"); Collection<String> topics = Arrays.asList("xxTopic"); JavaInputDStream<ConsumerRecord<String,String>> stream = KafkaUtils.createDirectStream(jssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String,String>Subscribe(topics,kafkaParams)); stream.flatMap(r->Arrays.asList(new String(r.value())).iterator()) .foreachRDD((JavaRDD<String> rdd) ->{ if(rdd.count() > 0){ Dataset<Row> df = spark.read().json(spark.createDataset(rdd.rdd(),Encoders.STRING())); df.createOrReplaceTempView("streamData"); df.cache(); try{ Dataset<Row> aggregators = spark.sql("select a.*,b.* from streamData a join t2 b on a.id = b.id"); String[] colsName = aggregators.columns(); Iterator<Row> itr = aggregators.toLocalIterator(); while(itr.hasNext()){ Row row = itr.next(); for(int i=0;i<colsName.length;i++){ String cn = colsName[i]; Object as = row.getAs(cn); System.out.print(cn+"="+as+", "); } System.out.println(); } }catch(Exception e){ System.out.println("::::::::::::::::::::::::::::::::::::::::err::::::::::::::::::::::::::::::::::::::::::::"); e.printStackTrace(); } } }); jssc.start(); jssc.awaitTermination(); } }