Flink的sink實戰之三:cassandra3
阿新 • • 發佈:2020-11-10
### 歡迎訪問我的GitHub
[https://github.com/zq2599/blog_demos](https://github.com/zq2599/blog_demos)
內容:所有原創文章分類彙總及配套原始碼,涉及Java、Docker、Kubernetes、DevOPS等;
### 本篇概覽
本文是《Flink的sink實戰》系列的第三篇,主要內容是體驗Flink官方的cassandra connector,整個實戰如下圖所示,我們先從kafka獲取字串,再執行wordcount操作,然後將結果同時列印和寫入cassandra:
![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201110151126316-545035704.png)
### 全系列連結
1. [《Flink的sink實戰之一:初探》](https://blog.csdn.net/boling_cavalry/article/details/105597628)
2. [《Flink的sink實戰之二:kafka》](https://blog.csdn.net/boling_cavalry/article/details/105598224)
3. [《Flink的sink實戰之三:cassandra3》](https://blog.csdn.net/boling_cavalry/article/details/105598968)
4. [《Flink的sink實戰之四:自定義》](https://blog.csdn.net/boling_cavalry/article/details/105599511)
### 軟體版本
本次實戰的軟體版本資訊如下:
1. cassandra:3.11.6
2. kafka:2.4.0(scala:2.12)
3. jdk:1.8.0_191
4. flink:1.9.2
5. maven:3.6.0
6. flink所在作業系統:CentOS Linux release 7.7.1908
7. cassandra所在作業系統:CentOS Linux release 7.7.1908
8. IDEA:2018.3.5 (Ultimate Edition)
### 關於cassandra
本次用到的cassandra是三臺叢集部署的叢集,搭建方式請參考[《ansible快速部署cassandra3叢集》](https://xinchen.blog.csdn.net/article/details/105602584)
### 準備cassandra的keyspace和表
先建立keyspace和table:
1. cqlsh登入cassandra:
```shell
cqlsh 192.168.133.168
```
2. 建立keyspace(3副本):
```shell
CREATE KEYSPACE IF NOT EXISTS example
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3'};
```
3. 建表:
```shell
CREATE TABLE IF NOT EXISTS example.wordcount (
word text,
count bigint,
PRIMARY KEY(word)
);
```
### 準備kafka的topic
1. 啟動kafka服務;
2. 建立名為test001的topic,參考命令如下:
```shell
./kafka-topics.sh \
--create \
--bootstrap-server 127.0.0.1:9092 \
--replication-factor 1 \
--partitions 1 \
--topic test001
```
3. 進入傳送訊息的會話模式,參考命令如下:
```shell
./kafka-console-producer.sh \
--broker-list kafka:9092 \
--topic test001
```
4. 在會話模式下,輸入任意字串然後回車,都會將字串訊息傳送到broker;
### 原始碼下載
如果您不想寫程式碼,整個系列的原始碼可在GitHub下載到,地址和連結資訊如下表所示(https://github.com/zq2599/blog_demos):
| 名稱 | 連結 | 備註|
| :-------- | :----| :----|
| 專案主頁| https://github.com/zq2599/blog_demos | 該專案在GitHub上的主頁 |
| git倉庫地址(https)| https://github.com/zq2599/blog_demos.git | 該專案原始碼的倉庫地址,https協議 |
| git倉庫地址(ssh)| [email protected]:zq2599/blog_demos.git | 該專案原始碼的倉庫地址,ssh協議 |
這個git專案中有多個資料夾,本章的應用在flinksinkdemo資料夾下,如下圖紅框所示:
![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201110151126747-1186886631.png)
### 兩種寫入cassandra的方式
flink官方的connector支援兩種方式寫入cassandra:
1. Tuple型別寫入:將Tuple物件的欄位對齊到指定的SQL的引數中;
2. POJO型別寫入:通過DataStax,將POJO物件對應到註解配置的表和欄位中;
接下來分別使用這兩種方式;
### 開發(Tuple寫入)
1. [《Flink的sink實戰之二:kafka》](https://xinchen.blog.csdn.net/article/details/105598224)中建立了flinksinkdemo工程,在此繼續使用;
2. 在pom.xml中增加casandra的connector依賴:
```xml
```
3. 另外還要新增flink-streaming-scala依賴,否則編譯CassandraSink.addSink這段程式碼會失敗:
```xml
```
4. 新增CassandraTuple2Sink.java,這就是Job類,裡面從kafka獲取字串訊息,然後轉成Tuple2型別的資料集寫入cassandra,寫入的關鍵點是Tuple內容和指定SQL中的引數的匹配:
```java
package com.bolingcavalry.addsink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import java.util.Properties;
public class CassandraTuple2Sink {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//設定並行度
env.setParallelism(1);
//連線kafka用到的屬性物件
Properties properties = new Properties();
//broker地址
properties.setProperty("bootstrap.servers", "192.168.50.43:9092");
//zookeeper地址
properties.setProperty("zookeeper.connect", "192.168.50.43:2181");
//消費者的groupId
properties.setProperty("group.id", "flink-connector");
//例項化Consumer類
FlinkKafka