actor模型下kafka消費
阿新 • • 發佈:2019-01-01
專案背景
kafka的生產和消費,用akka實現
實現思路
通過application構建排程器,通過配置選擇相應讀寫模式,在排程器actor中構建對應的生產actor和消費actor,加入到讀佇列和寫佇列,在生產和消費的actor的初始化過程中向排程actor傳送一個包含路徑的註冊資訊,當排程actor接收到該訊息後則啟動一個定時任務,在排程時間結束後傳送一個終止某actor的命令,在寫佇列和讀佇列都為空的時候,關閉系統
注意點
kafkaStream是執行緒阻塞的,需要放在額外的執行緒池裡面去做,其餘問題不大。
通用設定
配置資訊用typesafe的conf,日誌用logback
application.conf
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "INFO"
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
log-dead-letters = 0
}
common {
numMessage = "20000" #預設傳送的訊息數
scheduler.time = "20" #預設的排程實踐
mode = "readwrite" #模式,分為write,read和readwrite三種模式
threadNum = "10" #提供的資源池的執行緒數,主要防止kafkaconsumer造成的阻塞
actor = "10"
timeout = "100"
}
consumer {
bootstrap.servers = "ctao-machine:9092"
group.id = "mytest"
zookeeper.connect = "ctao-machine:2181"
host = "ctao-machine"
port = "2181"
bufferSize = "100"
clientId = "typesafe"
topic = "testctao"
zookeeper.sync.time.ms = "200"
auto.commit.interval .ms = "1000"
zookeeper.session.timeout.ms = "5000"
zookeeper.connection.timeout.ms = "10000"
rebalance.backoff.ms = "2000"
rebalance.max.retries = "10"
key.deserializer = "org.apache.kafka.common.serialization.StringDeserializer"
value.deserializer = "com.linewell.akkakafka.common.deserializer.LongDeserializer"
}
producer {
metadata.broker.list = "ctao-machine:9092"
key.serializer = "org.apache.kafka.common.serialization.StringSerializer"
value.serializer = "com.linewell.akkakafka.common.serializer.LongSerializer"
bootstrap.servers = "ctao-machine:9092"
}
日誌
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} %-5level %logger{36} - %msg%n%rEx</pattern>
</encoder>
</appender>
<logger name="org.apache" level="WARN"/>
<logger name="kafka" level="OFF"/>
<logger name="kafka.network.Processor" level="OFF" /> <!-- To silence expected IOExceptions on consumer shutdown -->
<logger name="org.apache.zookeeper.jmx" level="ERROR"/> <!-- To silence expected JMX errors on ZK shutdown -->
<logger name="kafka.server.KafkaApis" level="OFF" /> <!-- To silence expected occasional AdminOperationException on startup -->
<logger name="kafka.producer.async.DefaultEventHandler" level="OFF" /> <!-- To silence expected occasional 'Failed to collate messages by topic' on startup -->
<logger name="org.I0Itec" level="WARN"/>
<logger name="com.linewell" level="DEBUG"/>
<root level="ERROR">
<appender-ref ref="STDOUT" />
</root>
</configuration>
模式
分為讀、寫、讀寫模式
package com.linewell.akkakafka.kafka.bean
/**
* Created by ctao on 16-1-23.
* 模式
*/
sealed trait Mode
object Mode {
/**
* 讀模式
*/
case object Read extends Mode
/**
* 寫模式
*/
case object Write extends Mode
/**
* 混合模式
*/
case object Readwrite extends Mode
}
命令型別
package com.linewell.akkakafka.kafka.bean
import akka.actor.ActorRef
/**
* Created by ctao on 16-1-23.
* 命令特質
*/
sealed trait Command
object Command {
/**
* 讀模式actor初始化結束
* @param actorRef 路徑
*/
case class ReadInitialized(actorRef: ActorRef) extends Command
/**
* 寫模式actor初始化結束
* @param actorRef 路徑
*/
case class WriteInitialized(actorRef: ActorRef) extends Command
/**
* 關閉actor
* @param actorRef 路徑
*/
case class Stop(actorRef: ActorRef) extends Command
/**
* 停止系統
*/
case object Shutdown extends Command
/**
* 開啟任務
* @param num 初始actor數量
*/
case class StartNumber(num: Int) extends Command
/**
* 開啟消費
*/
case object StartConsume extends Command
/**
* 開啟生產
*/
case object StartProduce extends Command
}
消費者
client包下的
package com.linewell.akkakafka.kafka.consume
import akka.actor.SupervisorStrategy.Escalate
import akka.actor._
import com.linewell.akkakafka.common.util.KafkaConsumerConfig
import com.linewell.akkakafka.kafka.bean.Command.{ReadInitialized, StartConsume}
import com.typesafe.config.ConfigFactory
import org.apache.kafka.clients.consumer.KafkaConsumer
import scala.collection.JavaConversions._
/**
* Created by ctao on 16-1-27.
* 用kafkaClinet實現的消費類
*/
class NumberConsumerByKafkaClient extends Actor with ActorLogging with NumberConsume {
/**
* 消費者
*/
private var consumer: KafkaConsumer[String, Long] = _
private lazy val conf = ConfigFactory.load()
private val topic = conf.getString("consumer.topic")
private val timeOut = conf.getLong("common.timeout")
/**
* 啟動前呼叫
*/
override def preStart(): Unit = {
initConsumer()
context.parent ! ReadInitialized(self)
self ! StartConsume
}
/**
* 如果收到開始消費則進行消費動作
*/
override def receive: Receive = {
case StartConsume ⇒ consume(timeOut)
}
/**
* 終止前呼叫
* @throws java.lang.Exception 異常
*/
@throws[Exception](classOf[Exception])
override def postStop(): Unit = {
log.debug("stopping all consumer")
/**
* 登出註冊
*/
consumer.unsubscribe()
/**
* 關閉
*/
consumer.close()
log.debug("stop all consumer")
}
/**
* 消費方法
* @param timeOut 時間範圍
*/
private def consume(timeOut: Long): Unit = {
consumer.poll(timeOut).foreach { record ⇒
log.info(s"${self.path.toString} receive ${record.key} value ${record.value} " +
s"offset ${record.offset} partition ${record.partition} topic ${record.topic}")
}
consumer.commitAsync()
}
/**
* actor的策略
*/
override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
case e: Exception ⇒
//handle failing kafka
log.error(s"Read failed $e")
Escalate
}
/**
* 初始化actor
*/
private def initConsumer() = {
log.debug(s"Config ${KafkaConsumerConfig()}")
consumer = new KafkaConsumer[String, Long](KafkaConsumerConfig())
consumer.subscribe(Vector(topic))
}
}
object NumberConsumerByKafkaClient {
/**
* 生產actor
* @return 引數包含actor
*/
def props: Props = Props(new NumberConsumerByKafkaClient)
}
另一種用阻塞的kafkaStrema的:
package com.linewell.akkakafka.kafka.consume
import java.util.concurrent.Executors
import akka.actor.SupervisorStrategy.Escalate
import akka.actor._
import com.linewell.akkakafka.common.deserializer.{Decoder, LongDeserializer}
import com.linewell.akkakafka.common.util.KafkaConsumerConfig
import com.linewell.akkakafka.kafka.bean.Command._
import com.linewell.akkakafka.kafka.consume.NumberConsumerByKafkaStream.Consume
import com.typesafe.config.ConfigFactory
import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, KafkaStream}
import kafka.message.MessageAndMetadata
import org.apache.kafka.common.serialization.StringDeserializer
import scala.async.Async._
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import scala.util.{Failure, Success, Try}
/**
* Created by ctao on 16-1-23.
* kafkastream實現的消費,含有阻塞
*/
class NumberConsumerByKafkaStream extends Actor with ActorLogging {
val conf = ConfigFactory.load()
/**
* 消費連線
*/
private var consumer: Try[ConsumerConnector] = _
/**
* 執行緒池執行緒數,主要提供給kafka消費,將阻塞放在單獨池中
*/
val threadNum = conf.getInt("common.threadNum")
private val executor = Executors.newFixedThreadPool(threadNum)
val topic = conf.getString("consumer.topic")
/**
* kafkaStream,在關閉前clean
*/
private var streams: Option[List[KafkaStream[String, Long]]] = None
override def receive: Receive = {
/**
* 開始消費,則獲取stream傳遞給自己
*/
case StartConsume ⇒ consumer.foreach { (consumer: ConsumerConnector) ⇒
val consumerStreams = consumer.createMessageStreams(Map(topic → 1)
, Decoder(topic, new StringDeserializer),
Decoder(topic, new LongDeserializer))
streams = Option(consumerStreams(topic))
if (streams.isDefined) {
log.info(s"Got streams ${streams.get.length} $streams")
streams.get.foreach { kafkaStream ⇒
self ! Consume(kafkaStream)
}
}
}
/**
* 如果是stream,則放在池中
*/
case Consume(kafkaStream) ⇒
log.info(s"Handling KafkaStream ${kafkaStream.clientId}")
implicit val executionContextExecutor: ExecutionContextExecutor = ExecutionContext.fromExecutor(executor)
async {
kafkaStream.iterator().foreach {
case msg: MessageAndMetadata[String, Long] ⇒
log.info(s"${self.path.toString} : kafkaStream ${kafkaStream.clientId} " +
s" received offset ${msg.offset} partition ${msg.partition} value ${msg.message}")
}
}
}
override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
case e: Exception ⇒
//handle failing kafka
log.error(s"Read failed $e")
Escalate
}
/**
* 開始前動作
*/
override def preStart(): Unit = {
super.preStart()
consumer = Try(Consumer.create(consumerConfig))
consumer match {
case Success(c) ⇒ context.parent ! ReadInitialized(self)
self ! StartConsume
case Failure(e) ⇒
log.error(e, "Could not create kafkaConsumer")
context.parent ! Shutdown
}
}
/**
* 結束前動作
* @throws java.lang.Exception 異常
*/
@throws[Exception](classOf[Exception])
override def postStop(): Unit = {
if (streams.isDefined) {
log.debug("cleaning streams")
streams.get.foreach(_.clear())
log.debug("cleaned streams")
}
log.debug("stopping all consumer")
consumer.foreach(_.shutdown())
log.debug("stop all consumer")
log.debug("shutting down execution")
executor.shutdown()
log.debug("shutdown execution")
}
/**
* 消費配置
*/
private val consumerConfig: ConsumerConfig = new ConsumerConfig(KafkaConsumerConfig())
}
object NumberConsumerByKafkaStream {
/**
* 返回包含消費者的引數
* @return
*/
def props: Props = Props(new NumberConsumerByKafkaStream())
/**
* 包含kafkaStream的消費類
* @param kafkaStream 流
*/
private case class Consume(kafkaStream: KafkaStream[String, Long])
}
生產者
package com.linewell.akkakafka.kafka.produce
import akka.actor.SupervisorStrategy.Resume
import akka.actor._
import akka.event.LoggingReceive
import com.linewell.akkakafka.common.util.KafkaProducerConfig
import com.linewell.akkakafka.kafka.bean.Command.{StartProduce, WriteInitialized}
import com.typesafe.config.ConfigFactory
import org.apache.kafka.clients.producer._
/**
* Created by ctao on 16-1-23.
*/
class NumberProducer extends Actor with ActorLogging {
private val conf = ConfigFactory.load()
/**
* 產生數字數量
*/
val numMessage = conf.getInt("common.numMessage")
val topic = conf.getString("consumer.topic")
override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
case e: Exception ⇒
//can handle failing here
log.error(s"Write failed $e")
Resume
}
private var producer: KafkaProducer[String, Long] = _
/**
* 啟動前執行
*/
override def preStart(): Unit = {
producer = initProducer()
context.parent ! WriteInitialized(self)
self ! StartProduce
}
/**
* 接收到開始生產後則呼叫生產函式
*/
override def receive: Receive = LoggingReceive {
case StartProduce ⇒ produce(producer, numMessage)
}
/**
* 結束前呼叫
* @throws java.lang.Exception 異常
*/
@throws[Exception](classOf[Exception])
override def postStop(): Unit = {
log.debug("closing producer")
producer.close()
log.debug("closed producer")
}
/**
* 生產函式
* @param producer 生產者
* @param numMessage 訊息數量
*/
private def produce(producer: KafkaProducer[String, Long], numMessage: Int): Unit = {
(1 to numMessage).foreach { messageNum ⇒
val message = new ProducerRecord[String, Long](topic, (messageNum + 1).toString, messageNum)
producer.send(message, new Callback {
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
val maybeMetadata = Option(metadata)
val maybeException = Option(exception)
if (maybeMetadata.isDefined) {
log.info(s"actor ${self.path.toString}: $messageNum onCompletion offset ${metadata.offset},partition ${metadata.partition}")
}
if (maybeException.isDefined) {
log.error(exception, s"$messageNum onCompletion received error")
}
}
})
}
}
/**
* 初始化
* @return 生產者
*/
private def initProducer(): KafkaProducer[String, Long] = {
log.debug(s"Config ${KafkaProducerConfig()}")
new KafkaProducer[String, Long](KafkaProducerConfig())
}
}
object NumberProducer {
def props: Props = Props(new NumberProducer)
}
排程
package com.linewell.akkakafka.kafka.coordination
import akka.actor.{Actor, ActorLogging, ActorRef}
import akka.stream.ActorMaterializer
import com.linewell.akkakafka.kafka.bean.Command._
import com.linewell.akkakafka.kafka.bean.{Command, Mode}
import com.linewell.akkakafka.kafka.consume.NumberConsumerByKafkaStream
import com.linewell.akkakafka.kafka.produce.NumberProducer
import com.typesafe.config.ConfigFactory
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
/**
* Created by ctao on 16-1-23.
* 排程類
*/
class Coordinator extends Actor with ActorLogging {
import Coordinator._
val conf = ConfigFactory.load()
/**
* 排程時間,類似定時器
*/
val schedulerTime = conf.getInt("common.scheduler.time")
/**
* 寫actor佇列
*/
val writerBuffer: ArrayBuffer[Option[ActorRef]] = new ArrayBuffer[Option[ActorRef]]()
/**
* 讀actor佇列
*/
val readerBuffer: ArrayBuffer[Option[ActorRef]] = new ArrayBuffer[Option[ActorRef]]()
/**
* 物化
*/
lazy val mat = ActorMaterializer()(context)
override def receive: Receive = {
/**
* 如果是包含startnum和模式的訊息,則呼叫構建方法
*/
case msg@InitialMessage(StartNumber(num), mode) ⇒
log.debug(s"Starting the numbers coordinator with $msg")
buildWriteOrBuildRead(mode, num)
/**
* 無法識別的訊息型別
*/
case msg: InitialMessage ⇒
log.error(s"Did not understand $msg")
log.error("shutdown")
context.system.shutdown()
/**
* actor在初始化結束後會向父節點發送訊息,接收到讀初始化結束後,啟動一個排程時間的排程器,將stopactor
* 的訊息發給自己
*/
case ReadInitialized(actorRef) ⇒
log.debug(s"Reader initialized :${actorRef.path.toString}")
context.system.scheduler.scheduleOnce(schedulerTime.seconds, self, Stop(actorRef))
log.debug(s"end scheduler stop ${actorRef.path.toString}")
/**
* actor在初始化結束後會向父節點發送訊息,接收到寫初始化結束後,啟動一個排程時間的排程器,將stopactor
* 的訊息發給自己
*/
case WriteInitialized(actorRef) ⇒
log.debug(s"Writer initialized:${actorRef.path.toString}")
context.system.scheduler.scheduleOnce(schedulerTime.seconds, self, Stop(actorRef))
log.debug(s"end scheduler stop ${actorRef.path.toString}")
/**
* 收到自己傳送的stop訊息後則對應佇列移除actor
*/
case Stop(actorRef) ⇒
log.debug("Stopping the coordinator")
writerBuffer -= Some(actorRef)
readerBuffer -= Some(actorRef)
log.debug(s"writeBuffer.length ${writerBuffer.length} and readerBuffer.length ${readerBuffer.length}")
/**
* 如果讀寫佇列都為空,則給自己傳送system的停止訊息
*/
if (writerBuffer.isEmpty && readerBuffer.isEmpty) {
context.system.scheduler.scheduleOnce(1.seconds, self, Shutdown)
}
/**
* 停止system
*/
case Shutdown ⇒
log.debug("Shutting down the app")
context.system.shutdown()
log.info("shutdown the app")
}
/**
* 構建讀寫模式方法
* @param mode 模式
* @param numActor actor數量
*/
def buildWriteOrBuildRead(mode: Mode, numActor: Int): Unit = mode match {
/**
* 寫模式,則寫佇列增減對應數量actor
*/
case Mode.Write ⇒
log.debug("write mode")
(1 to numActor).foreach { x ⇒
val writer = Some(context.actorOf(NumberProducer.props, name = s"writerActor-$x"))
writerBuffer += writer
}
/**
* 讀模式,則讀佇列增減對應數量actor
*/
case Mode.Read ⇒
log.debug("read mode")
(1 to numActor).foreach { x ⇒
val reader = Some(context.actorOf(NumberConsumerByKafkaStream.props, name = s"readerActor-$x"))
readerBuffer += reader
}
/**
* 讀寫模式,則讀寫佇列各增減對應數量actor
*/
case Mode.Readwrite ⇒
log.debug("readwrite mode")
(1 to numActor).foreach { x ⇒
val writer = Some(context.actorOf(NumberProducer.props, name = s"writerActor-$x"))
val reader = Some(context.actorOf(NumberConsumerByKafkaStream.props, name = s"readerActor-$x"))
writerBuffer += writer
readerBuffer += reader
}
}
}
case object Coordinator {
/**
* 初始化訊息
* @param name 訊息型別
* @param mode 模式
*/
case class InitialMessage(name: Command, mode: Mode)
}
應用
package com.linewell.akkakafka.kafka.application
import akka.actor.{ActorSystem, Props}
import com.linewell.akkakafka.kafka.bean.Command.StartNumber
import com.linewell.akkakafka.kafka.bean.Mode.{Readwrite, Write, Read}
import com.linewell.akkakafka.kafka.coordination.Coordinator
import com.linewell.akkakafka.kafka.coordination.Coordinator.InitialMessage
import com.typesafe.config.ConfigFactory
import org.slf4j.LoggerFactory
/**
* Created by ctao on 16-1-23.
*/
object Application extends App {
val log = LoggerFactory.getLogger(this.getClass)
val system = ActorSystem("Kafka")
/**
* 排程actor
*/
val coordinator = system.actorOf(Props(new Coordinator), name = "coordinator")
val conf = ConfigFactory.load()
private val numActor = conf.getInt("common.actor")
log.info(s"start app")
/**
* app的模式
*/
val appMode = conf.getString("common.mode") match {
case s:String ⇒ s.toUpperCase match {
case "READ" ⇒ Read
case "WRITE" ⇒ Write
case "READWRITE" ⇒ Readwrite
}
case _ ⇒ throw new IllegalArgumentException("can't load mode")
}
/**
* 啟動排程
*/
coordinator ! InitialMessage(StartNumber(numActor),appMode)
}
kafka配置類
package com.linewell.akkakafka.common.util
import java.util.Properties
import com.typesafe.config.ConfigFactory
/**
* Created by ctao on 16-1-27.
* kafka消費者配置訊息
*/
trait KafkaConsumerConfig extends Properties {
import KafkaConsumerConfig._
private val consumerPrefixWithDot = consumerPrefix + "."
val allKeys = Seq(groupId,
zookeeperConnect,
zookeeperConnectionTimeOut,
zookeeperSessionTimeOut,
reBalanceBackOff,
reBalanceMaxRetries,
keyDeserializer,
valueDeserializer,
servers
)
lazy val conf = ConfigFactory.load()
allKeys.map { key ⇒
if (conf.hasPath(key)) {
put(key.replace(consumerPrefixWithDot, ""), conf.getString(key))
}
}
}
object KafkaConsumerConfig {
val consumerPrefix = "consumer"
//Consumer Keys
val groupId = s"$consumerPrefix.group.id"
val zookeeperConnect = s"$consumerPrefix.zookeeper.connect"
val topic = s"$consumerPrefix.topic"
val zookeeperSessionTimeOut = s"$consumerPrefix.zookeeper.session.timeout.ms"
val zookeeperConnectionTimeOut = s"$consumerPrefix.zookeeper.connection.timeout.ms"
val reBalanceBackOff = s"$consumerPrefix.rebalance.backoff.ms"
val reBalanceMaxRetries = s"$consumerPrefix.rebalance.max.retries"
val keyDeserializer = s"$consumerPrefix.key.com.linewell.akkakafka.common.deserializer"
val valueDeserializer = s"$consumerPrefix.value.com.linewell.akkakafka.common.deserializer"
val servers = s"$consumerPrefix.bootstrap.servers"
def apply(): KafkaConsumerConfig = new KafkaConsumerConfig {}
}
package com.linewell.akkakafka.common.util
import java.util.Properties
import com.typesafe.config.ConfigFactory
import org.apache.kafka.clients.producer.ProducerConfig
/**
* Created by ctao on 16-1-25.
* kafka生產者配置資訊
*/
trait KafkaProducerConfig extends Properties {
import KafkaProducerConfig._
private val producerPrefixWithDot = producerPrefix + "."
private val allKeys = Seq(
brokers,
brokers,
keySerializer,
valueSerializer,
partitioner,
requiredAcks,
servers
)
lazy val conf = ConfigFactory.load()
allKeys.map { key ⇒
if (conf.hasPath(key)) {
put(key.replace(producerPrefixWithDot, ""), conf.getString(key))
}
}
}
object KafkaProducerConfig {
val producerPrefix = "producer"
//Producer Keys
val brokers = s"$producerPrefix.metadata.broker.list"
val keySerializer = s"$producerPrefix.key.com.linewell.akkakafka.common.serializer"
val valueSerializer = s"$producerPrefix.value.com.linewell.akkakafka.common.serializer"
val servers = s"$producerPrefix.bootstrap.servers"
val partitioner = s"$producerPrefix.partitioner.class"
val requiredAcks = s"$producerPrefix.request.required.acks"
def apply(): KafkaProducerConfig = new KafkaProducerConfig {}
}
序列化和解析
package com.linewell.akkakafka.common.serializer
import java.util
import org.apache.kafka.common.serialization.Serializer
/**
* Created by ctao on 16-1-26.
* Long型的序列化,是kafka序列化的子類
*/
class LongSerializer extends Serializer[Long] {
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()
override def serialize(topic: String, data: Long): Array[Byte] = BigInt(data).toByteArray
override def close(): Unit = ()
}
package com.linewell.akkakafka.common.deserializer
import java.util
import org.apache.kafka.common.serialization.Deserializer
/**
* Created by ctao on 16-1-26.
* Long解析類
*/
class LongDeserializer extends Deserializer[Long]{
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()
override def close(): Unit = ()
override def deserialize(topic: String, data: Array[Byte]): Long = BigInt(data).toLong
}