springboot+kafka+sparkstreaming 生產及消費資料-超簡單例項
springboot+kafka+sparkstreaming 生產及消費資料-超簡單例項
kafka生產者例項:
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
public class LiveServerLog {
private static final Logger LOGGER = LoggerFactory.getLogger(LiveServerLog.class);
private int retry;
private static KafkaProducer<String, String> kafkaProducer;
private static final LiveServerLog INSTANCE = new LiveServerLog();
private LiveServerLog() {
}
public static final LiveServerLog getInstance() {
return INSTANCE;
}
/**
* kafka生產者進行初始化
* @param retry 重試次數
*/
public void initConfig(int retry) {
this.retry = retry;
if (null == kafkaProducer) {
Properties props = new Properties();
InputStream inStream = null;
try {
inStream = this.getClass().getClassLoader()
.getResourceAsStream("kafka.properties");
props.load(inStream);
kafkaProducer = new KafkaProducer<String, String>(props);
} catch (IOException e) {
LOGGER.error("kafkaProducer初始化失敗:" + e.getMessage(), e);
} finally {
if (null != inStream) {
try {
inStream.close();
} catch (IOException e) {
LOGGER.error("kafkaProducer初始化失敗:" + e.getMessage(), e);
}
}
}
}
}
/**
* 通過kafkaProducer傳送訊息
* @param topic 訊息接收主題
* @param message 具體訊息值
*/
public void sendKafkaMessage(String topic, String message) {
/**
* 1、如果指定了某個分割槽,會只講訊息發到這個分割槽上 random.nextInt(2)
* 2、如果同時指定了某個分割槽和key,則也會將訊息傳送到指定分割槽上,key不起作用 random.nextInt(2), "",
* 3、如果沒有指定分割槽和key,那麼將會隨機發送到topic的分割槽中
* 4、如果指定了key,那麼將會以hash<key>的方式傳送到分割槽中
*/
ProducerRecord<String, String> record = new ProducerRecord<String, String>(
topic, message);
// send方法是非同步的,新增訊息到快取區等待發送,並立即返回,這使生產者通過批量傳送訊息來提高效率
// kafka生產者是執行緒安全的,可以單例項傳送訊息
kafkaProducer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
exception.printStackTrace();
retryKakfaMessage(topic, message);
} else {
System.out.println(metadata.topic() + "-" + metadata.partition());
}
}
});
}
/**
* 當kafka訊息傳送失敗後,重試
*/
private void retryKakfaMessage(String topic, String retryMessage) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>(
topic, retryMessage);
for (int i = 1; i <= retry; i++) {
try {
kafkaProducer.send(record);
return;
} catch (Exception e) {
LOGGER.error("kafka傳送訊息失敗:" + e.getMessage(), e);
retryKakfaMessage(topic, retryMessage);
}
}
}
}
kafka.properties
bootstrap.servers=10.105.1.4:9092,10.105.1.5:9092,10.105.1.6:9092
acks=1
retries=3
batch.size=1000
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
client.id=producer.Live_Server.Log
springboot呼叫例項:
import net.sf.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Import;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@RestController
@SpringBootApplication
@EnableTransactionManagement
@EnableScheduling
@RequestMapping(value = "/LiveService/*")
public class LiveService {
private final static Logger log = LoggerFactory.getLogger(LiveService.class);
public static void main(String[] args) throws Exception {
SpringApplication.run(LiveService.class, args);
}
@RequestMapping(value = "/", produces = {"text/plain;charset=UTF-8"})
@ResponseBody
public String returnString() {
return "Hello LiveService";
}
/**
* 記錄日誌
*/
@RequestMapping(value = "LiveServerLog", produces = {"application/json;charset=UTF-8"})
@ResponseBody
public void LiveServerLog(HttpServletRequest request, HttpServletResponse response) {
try {
JSONObject _condition = getStringFromStream(request);
String log = _condition.getString("log");
LiveServerLog.getInstance().initConfig(3);
LiveServerLog.getInstance().sendKafkaMessage("live_server_log", log);
} catch (Exception e) {
log.info(e.getMessage());
}
}
/**
* 獲取請求引數
*/
private JSONObject getStringFromStream(HttpServletRequest req) {
ServletInputStream is;
try {
is = req.getInputStream();
int nRead = 1;
int nTotalRead = 0;
byte[] bytes = new byte[102400];
while (nRead > 0) {
nRead = is.read(bytes, nTotalRead, bytes.length - nTotalRead);
if (nRead > 0)
nTotalRead = nTotalRead + nRead;
}
String str = new String(bytes, 0, nTotalRead, "utf-8");
return JSONObject.fromObject(str);
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
}
sparkstreaming消費資料:
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges, KafkaUtils}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.slf4j.LoggerFactory
import java.util.ResourceBundle
class live_server_log
object live_server_log {
private val LOGGER = LoggerFactory.getLogger(classOf[live_server_log])
def main(args: Array[String]): Unit = {
try {
val conf = new SparkConf().setAppName("live_server_log").setMaster("yarn-cluster") //.setMaster("local")//
// spark2用法
val ss = SparkSession.builder.config(conf).getOrCreate()
val ssc = new StreamingContext(ss.sparkContext, Seconds.apply(5))//5秒執行一次
val prop = ResourceBundle.getBundle("app")
val bootstrapServers = prop.getString("bootstrap.servers")
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> bootstrapServers, // kafka 叢集
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "Kafka Broker Default Group",
"auto.offset.reset" -> "earliest", // 每次都是從頭開始消費(from-beginning),可配置其他消費方式
"enable.auto.commit" -> (false: java.lang.Boolean) //手動提交偏移量
)
val topics = Array("live_server_log") //主題,可配置多個
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
// val list = List(
// StructField("S_USER_NAME", StringType, nullable = true),
// StructField("D_CREATE", StringType, nullable = true)
// )
// val schema = StructType(list)
stream.foreachRDD(rdd => {
//計算偏移量
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
if (!rdd.isEmpty()) rdd.foreach(line => {
println(line.value())
//自行選擇入庫方式
//insertLog(line.value())
// val log_rdd1 = rdd.map(r => {
// createRow(r.value().toString())
// })
// val dataFrame = ss.createDataFrame(log_rdd1, schema)
// val date = Common.getToday(Common.DateFormat.targetDAY.getValue)
// dataFrame.write.format("parquet").mode(SaveMode.Append).save("hdfs://10.105.1.1:8020/user/hive/warehouse/default_db/log/" + date)
})
//設定偏移量
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
})
ssc.start()
ssc.awaitTermination()
// ...
} catch {
case ex: Exception => {
ex.printStackTrace() // 列印到標準err
LOGGER.error("kafka消費訊息失敗:" + ex.getMessage, ex)
}
}
}
/**
* 建立行資料
* @return Row
*/
def createRow(s: String): Row = {
val l = s.split(",")
val row = Row(l(0), l(1), l(2))
return row
}
/**
* 入庫
*/
def insertLog(s: String) {
if (!s.trim().isEmpty) {
val l = s.split(",")
//call_oracle.getInstance().callLiveClientLog(l(0), l(1), l(2).toInt)
}
}
}
提交到spark2執行:
spark2-submit --master yarn --deploy-mode cluster --driver-memory 1g --executor-memory 1g /home/kafka/live_server_log.jar --class com.kafka.live_server_log
pom1.xml
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.9.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<logback.version>1.2.3</logback.version>
<scala.maven.version>3.2.0</scala.maven.version>
<scala.binary.version>2.10.5</scala.binary.version>
<scala.version>2.10.5</scala.version>
<spark.version>1.6.0-cdh5.14.0</spark.version>
</properties>
<dependencies>
<!-- jdbc -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<!-- web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.7</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0-cdh5.14.0</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0-cdh5.14.0</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>servlet-api</artifactId>
<groupId>javax.servlet</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.2</version>
</dependency>
<!-- hadoop mr -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.6.0-cdh5.14.0</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>servlet-api</artifactId>
<groupId>javax.servlet</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- parquet列式儲存 -->
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>1.8.1</version>
</dependency>
<!-- json格式化 -->
<dependency>
<groupId>net.sf.json-lib</groupId>
<artifactId>json-lib</artifactId>
<version>2.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.10.2.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
pom2.xml
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.0.cloudera2</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.4.0.cloudera2</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.4.0.cloudera2</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
————————————————
版權宣告:本文為CSDN博主「史詩級大菠蘿」的原創文章,遵循CC 4.0 BY-SA版權協議,轉載請附上原文出處連結及本宣告。
原文連結:https://blog.csdn.net/weixin_43827665/article/details/116052515