1. 程式人生 > 其它 >spark-streaming整合Kafka處理實時資料

spark-streaming整合Kafka處理實時資料

在這篇文章裡,我們模擬了一個場景,實時分析訂單資料,統計實時收益。

場景模擬

我試圖覆蓋工程上最為常用的一個場景:

1)首先,向Kafka裡實時的寫入訂單資料,JSON格式,包含訂單ID-訂單型別-訂單收益

2)然後,spark-streaming每十秒實時去消費kafka中的訂單資料,並以訂單型別分組統計收益

3)最後,spark-streaming統計結果實時的存入本地MySQL。

前提條件

安裝

1)spark:我使用的yarn-client模式下的spark,環境中叢集客戶端已經搞定

2)zookeeper:我使用的是這個叢集:10.93.21.21:2181,10.93.18.34:2181,10.93.18.35:2181

3)kafka:我使用的是standalone模式:10.93.21.21:9093

4)mysql:10.93.84.53:3306

語言

python:pykafka,pip install pykafka

java:spark,spark-streaming

下面開始

1、資料寫入kafka

  • kafka寫入

我們使用pykafka模擬資料實時寫入,程式碼如下:

kafka_producer.py

# -* coding:utf8 *-  
import time
import json
import uuid
import random
import threading
from pykafka import KafkaClient

# 建立kafka例項
hosts = '10.93.21.21:9093'
client = KafkaClient(hosts=hosts)

# 列印一下有哪些topic
print client.topics  

# 建立kafka producer控制代碼
topic = client.topics['kafka_spark']
producer = topic.get_producer()


# work
def work():
    while 1:
        msg = json.dumps({
            "id": str(uuid.uuid4()).replace('-', ''),
            "type": random.randint(1, 5),
            "profit": random.randint(13, 100)})
        producer.produce(msg)

# 多執行緒執行
thread_list = [threading.Thread(target=work) for i in range(10)]
for thread in thread_list:
    thread.setDaemon(True)
    thread.start()

time.sleep(60)

# 關閉控制代碼, 退出
producer.stop()

可以看到,我們寫入的形式是一個json,訂單id是一個uuid,訂單型別type從1-5隨機,訂單收益profit從13-100隨機,形如

{"id": ${uid}, "type": 1, "profit": 30}

注意:1)python對kafka的讀寫不需要藉助zookeeper,2)使用多執行緒的形式寫入,讓資料量具有一定的規模。

執行producer,會持續寫入資料1分鐘。

python kafka_producer.py
  • 驗證一下

kafka_consumer.py

# -* coding:utf8 *-
from pykafka import KafkaClient

hosts = '10.93.21.21:9093'
client = KafkaClient(hosts=hosts)
# 消費者
topic = client.topics['kafka_spark']
consumer = topic.get_simple_consumer(consumer_group='test', auto_commit_enable=True, auto_commit_interval_ms=1,
                                     consumer_id='test')
for message in consumer:
    if message is not None:
        print message.offset, message.value

 執行,可以消費kafka剛才寫入的資料

python kafka_consumer.py

2、spark-streaming

1)先解決依賴

其中比較核心的是spark-streaming和kafka整合包spark-streaming-kafka_2.10,還有spark引擎spark-core_2.10

json和mysql看大家愛好。

pom.xml

<dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.10</artifactId>
            <version>1.6.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.10</artifactId>
            <version>1.6.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.6.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.19</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.38</version>
        </dependency>
        <dependency>
            <groupId>commons-dbcp</groupId>
            <artifactId>commons-dbcp</artifactId>
            <version>1.4</version>
        </dependency>
    </dependencies>

2)MySQL準備

  • 建表

我們的結果去向是MySQL,先建立一個結果表。

id:主鍵,自增id

type:訂單型別

profit:每個spark batch聚合出的訂單收益結果

time:時間戳

CREATE TABLE `order` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `type` int(11) DEFAULT NULL,
  `profit` int(11) DEFAULT NULL,
  `time` mediumtext,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=56 DEFAULT CHARSET=utf8
  • Java客戶端

採用了單例執行緒池的模式簡單寫了一下。

ConnectionPool.java

package com.xiaoju.dqa.realtime_streaming;

import java.sql.Connection;
import java.sql.DriverManager;
import java.util.LinkedList;


public class ConnectionPool {
    private static LinkedList<Connection> connectionQueue;

    static {
        try {
            Class.forName("com.mysql.jdbc.Driver");
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
    }

    public synchronized static Connection getConnection() {
        try {
            if (connectionQueue == null) {
                connectionQueue = new LinkedList<Connection>();
                for (int i = 0; i < 5; i++) {
                    Connection conn = DriverManager.getConnection(
                            "jdbc:mysql://10.93.84.53:3306/big_data",
                            "root",
                            "1234");
                    connectionQueue.push(conn);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return connectionQueue.poll();

    }
    public  static void returnConnection(Connection conn){connectionQueue.push(conn);}
}

3)程式碼實現

我用java寫的,不會用scala很尷尬。

即時用java整個的處理過程依然比較簡單。跟常見的wordcount也沒有多大的差別。

SparkStreaming特點

spark的特點就是RDD,通過對RDD的操作,來遮蔽分散式運算的複雜度。

而spark-streaming的操作物件是RDD的時間序列DStream,這個序列的生成是跟batch的選取有關。例如我這裡Batch是10s一個,那麼每隔10s會產出一個RDD,對RDD的切割和序列的生成,spark-streaming對我們透明瞭。唯一暴露給我們的DStream和原生RDD的使用方式基本一致。

這裡需要講解一下MySQL寫入注意的事項。

MySQL寫入

在處理mysql寫入時使用了foreachPartition方法,即,在foreachPartition中使用borrow mysql控制代碼。

這樣做的原因是:

1)你無法再Driver端建立mysql控制代碼,並通過序列化的形式傳送到worker端

2)如果你在處理rdd中建立mysql控制代碼,很容易對每一條資料建立一個控制代碼,在處理過程中很快記憶體就會溢位。

OrderProfitAgg.java

package com.xiaoju.dqa.realtime_streaming;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;

import java.sql.Connection;
import java.sql.Statement;
import java.util.*;


/*
*   生產者可以選用kafka自帶的producer指令碼
*   bin/kafka-console-producer.sh --broker-list localhost:9093 --topic test
* */
public class OrderProfitAgg {

    public static void main(String[] args) throws InterruptedException {
        /*
        *   kafka所註冊的zk叢集
        * */
        String zkQuorum = "10.93.21.21:2181,10.93.18.34:2181,10.93.18.35:2181";

        /*
        *   spark-streaming消費kafka的topic名稱, 多個以逗號分隔
        * */
        String topics = "kafka_spark,kafka_spark2";

        /*
        *   消費組 group
        * */
        String group = "bigdata_qa";

        /*
        *   消費程序數
        * */
        int numThreads = 2;

        /*
        *   選用yarn佇列模式, spark-streaming程式的app名稱是"order profit"
        * */
        SparkConf sparkConf = new SparkConf().setMaster("yarn-client").setAppName("order profit");

        /*
        *   建立sc, 全域性唯一, 設定logLevel可以列印一些東西到控制檯
        * */
        JavaSparkContext sc = new JavaSparkContext(sparkConf);
        sc.setLogLevel("WARN");

        /*
        *   建立jssc, spark-streaming的batch是每10s劃分一個
        * */
        JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(10));

        /*
        *   準備topicMap
        * */
        Map<String ,Integer> topicMap = new HashMap<String, Integer>();
        for (String topic : topics.split(",")) {
            topicMap.put(topic, numThreads);
        }

        /*
        *   kafka資料流
        * */
        List<JavaPairReceiverInputDStream<String, String>> streams = new ArrayList<JavaPairReceiverInputDStream<String, String>>();
        for (int i = 0; i < numThreads; i++) {
            streams.add(KafkaUtils.createStream(jssc, zkQuorum, group, topicMap));
        }
        /*
        *   從kafka消費資料的RDD
        * */
        JavaPairDStream<String, String> streamsRDD = streams.get(0);
        for (int i = 1; i < streams.size(); i++) {
            streamsRDD = streamsRDD.union(streams.get(i));
        }

        /*
        *   kafka訊息形如: {"id": ${uuid}, "type": 1, "profit": 35}
        *   統計結果, 以type分組的總收益
        *   mapToPair, 將kafka消費的資料, 轉化為type-profit key-value對
        *   reduceByKey, 以type分組, 聚合profit
        * */
        JavaPairDStream<Integer, Integer> profits = streamsRDD.mapToPair(new PairFunction<Tuple2<String, String>, Integer, Integer>() {
            @Override
            public Tuple2<Integer, Integer> call(Tuple2<String, String> s_tuple2) throws Exception {
                JSONObject jsonObject = JSON.parseObject(s_tuple2._2);
                return new Tuple2<Integer, Integer>(jsonObject.getInteger("type"), jsonObject.getInteger("profit"));
            }
        }).reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer i1, Integer i2) throws Exception {
                return i1 + i2;
            }
        });

        /*
        *   輸出結果到MySQL
        *   需要為每一個partition建立一個MySQL控制代碼, 使用foreachPartition
        * */
        profits.foreachRDD(new Function<JavaPairRDD<Integer, Integer>, Void>() {
            @Override
            public Void call(JavaPairRDD<Integer, Integer> integerIntegerJavaPairRDD) throws Exception {

                integerIntegerJavaPairRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<Integer, Integer>>>() {
                    @Override
                    public void call(Iterator<Tuple2<Integer, Integer>> tuple2Iterator) throws Exception {
                        Connection connection = ConnectionPool.getConnection();
                        Statement stmt = connection.createStatement();
                        long timestamp = System.currentTimeMillis();
                        while(tuple2Iterator.hasNext()) {
                            Tuple2<Integer, Integer> tuple = tuple2Iterator.next();
                            Integer type = tuple._1;
                            Integer profit = tuple._2;
                            String sql = String.format("insert into `order` (`type`, `profit`, `time`) values (%s, %s, %s)", type, profit, timestamp);
                            stmt.executeUpdate(sql);
                        }
                        ConnectionPool.returnConnection(connection);
                    }
                });
                return null;
            }
        });

        /*
        *   開始計算, 等待計算結束
        * */
        jssc.start();
        try {
            jssc.awaitTermination();
        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            jssc.close();
        }
    }
}

4)打包方法

編寫pom.xml build tag。

mvn clean package打包即可。

pom.xml

<build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <!--這裡要替換成jar包main方法所在類 -->
                            <!--<mainClass>com.bigdata.qa.hotdog.driver.WordCount</mainClass>-->
                            <mainClass>com.xiaoju.dqa.realtime_streaming.OrderProfitAgg</mainClass>

                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id> <!-- this is used for inheritance merges -->
                        <phase>package</phase> <!-- 指定在打包節點執行jar包合併操作 -->
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

3、執行與結果

1)執行kafka_producer.py

python kafka_producer.py

2) 執行spark-streaming

這裡使用的是預設引數提交yarn佇列。

spark-submit --queue=root.XXXX realtime-streaming-1.0-SNAPSHOT-jar-with-dependencies.jar

3)檢視結果

到MySQL中檢視結果,每隔10秒會聚合出type=1-5的5條資料。

例如第一條資料,就是type=4這種型別的業務,在10s內收益是555473元。業務量驚人啊。哈哈。

完結。