1. 程式人生 > >actor模型下kafka消費

actor模型下kafka消費

專案背景

kafka的生產和消費,用akka實現

實現思路

通過application構建排程器,通過配置選擇相應讀寫模式,在排程器actor中構建對應的生產actor和消費actor,加入到讀佇列和寫佇列,在生產和消費的actor的初始化過程中向排程actor傳送一個包含路徑的註冊資訊,當排程actor接收到該訊息後則啟動一個定時任務,在排程時間結束後傳送一個終止某actor的命令,在寫佇列和讀佇列都為空的時候,關閉系統

注意點

kafkaStream是執行緒阻塞的,需要放在額外的執行緒池裡面去做,其餘問題不大。

通用設定

配置資訊用typesafe的conf,日誌用logback
application.conf

akka {
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  loglevel = "INFO"
  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
  log-dead-letters = 0
}

common {
  numMessage = "20000" #預設傳送的訊息數
  scheduler.time = "20" #預設的排程實踐
  mode = "readwrite" #模式,分為write,read和readwrite三種模式
  threadNum = "10" #提供的資源池的執行緒數,主要防止kafkaconsumer造成的阻塞
actor = "10" timeout = "100" } consumer { bootstrap.servers = "ctao-machine:9092" group.id = "mytest" zookeeper.connect = "ctao-machine:2181" host = "ctao-machine" port = "2181" bufferSize = "100" clientId = "typesafe" topic = "testctao" zookeeper.sync.time.ms = "200" auto.commit.interval
.ms = "1000" zookeeper.session.timeout.ms = "5000" zookeeper.connection.timeout.ms = "10000" rebalance.backoff.ms = "2000" rebalance.max.retries = "10" key.deserializer = "org.apache.kafka.common.serialization.StringDeserializer" value.deserializer = "com.linewell.akkakafka.common.deserializer.LongDeserializer" } producer { metadata.broker.list = "ctao-machine:9092" key.serializer = "org.apache.kafka.common.serialization.StringSerializer" value.serializer = "com.linewell.akkakafka.common.serializer.LongSerializer" bootstrap.servers = "ctao-machine:9092" }

日誌

<configuration>
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%d{HH:mm:ss.SSS} %-5level %logger{36} - %msg%n%rEx</pattern>
        </encoder>
    </appender>

    <logger name="org.apache" level="WARN"/>
    <logger name="kafka" level="OFF"/>
    <logger name="kafka.network.Processor" level="OFF" /> <!-- To silence expected IOExceptions on consumer shutdown -->
    <logger name="org.apache.zookeeper.jmx" level="ERROR"/>  <!-- To silence expected JMX errors on ZK shutdown -->
    <logger name="kafka.server.KafkaApis" level="OFF" /> <!-- To silence expected occasional AdminOperationException on startup -->
    <logger name="kafka.producer.async.DefaultEventHandler" level="OFF" /> <!-- To silence expected occasional 'Failed to collate messages by topic' on startup -->
    <logger name="org.I0Itec" level="WARN"/>
    <logger name="com.linewell" level="DEBUG"/>

    <root level="ERROR">
        <appender-ref ref="STDOUT" />
    </root>
</configuration>

模式

分為讀、寫、讀寫模式


package com.linewell.akkakafka.kafka.bean

/**
  * Created by ctao on 16-1-23.
  * 模式
  */
sealed trait Mode

object Mode {

  /**
    * 讀模式
    */
  case object Read extends Mode

  /**
    * 寫模式
    */
  case object Write extends Mode

  /**
    * 混合模式
    */
  case object Readwrite extends Mode
}

命令型別

package com.linewell.akkakafka.kafka.bean

import akka.actor.ActorRef

/**
  * Created by ctao on 16-1-23.
  * 命令特質
  */
sealed trait Command

object Command {

  /**
    * 讀模式actor初始化結束
    * @param actorRef 路徑
    */
  case class ReadInitialized(actorRef: ActorRef) extends Command

  /**
    * 寫模式actor初始化結束
    * @param actorRef 路徑
    */
  case class WriteInitialized(actorRef: ActorRef) extends Command

  /**
    * 關閉actor
    * @param actorRef 路徑
    */
  case class Stop(actorRef: ActorRef) extends Command

  /**
    * 停止系統
    */
  case object Shutdown extends Command

  /**
    * 開啟任務
    * @param num 初始actor數量
    */
  case class StartNumber(num: Int) extends Command

  /**
    * 開啟消費
    */
  case object StartConsume extends Command

  /**
    * 開啟生產
    */
  case object StartProduce extends Command

}

消費者

client包下的

package com.linewell.akkakafka.kafka.consume

import akka.actor.SupervisorStrategy.Escalate
import akka.actor._
import com.linewell.akkakafka.common.util.KafkaConsumerConfig
import com.linewell.akkakafka.kafka.bean.Command.{ReadInitialized, StartConsume}
import com.typesafe.config.ConfigFactory
import org.apache.kafka.clients.consumer.KafkaConsumer

import scala.collection.JavaConversions._

/**
  * Created by ctao on 16-1-27.
  * 用kafkaClinet實現的消費類
  */
class NumberConsumerByKafkaClient extends Actor with ActorLogging with NumberConsume {

  /**
    * 消費者
    */
  private var consumer: KafkaConsumer[String, Long] = _
  private lazy val conf = ConfigFactory.load()
  private val topic = conf.getString("consumer.topic")
  private val timeOut = conf.getLong("common.timeout")

  /**
    * 啟動前呼叫
    */
  override def preStart(): Unit = {
    initConsumer()
    context.parent ! ReadInitialized(self)
    self ! StartConsume
  }

  /**
    * 如果收到開始消費則進行消費動作
    */
  override def receive: Receive = {
    case StartConsume ⇒ consume(timeOut)
  }

  /**
    * 終止前呼叫
    * @throws java.lang.Exception 異常
    */
  @throws[Exception](classOf[Exception])
  override def postStop(): Unit = {
    log.debug("stopping all consumer")

    /**
      * 登出註冊
      */
    consumer.unsubscribe()

    /**
      * 關閉
      */
    consumer.close()
    log.debug("stop all consumer")

  }


  /**
    * 消費方法
    * @param timeOut 時間範圍
    */
  private def consume(timeOut: Long): Unit = {
    consumer.poll(timeOut).foreach { record ⇒
      log.info(s"${self.path.toString} receive ${record.key} value ${record.value} " +
        s"offset ${record.offset} partition ${record.partition} topic ${record.topic}")
    }
    consumer.commitAsync()

  }

  /**
    * actor的策略
    */
  override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
    case e: Exception ⇒
      //handle failing kafka
      log.error(s"Read failed $e")
      Escalate
  }

  /**
    * 初始化actor
    */
  private def initConsumer() = {
    log.debug(s"Config ${KafkaConsumerConfig()}")
    consumer = new KafkaConsumer[String, Long](KafkaConsumerConfig())
    consumer.subscribe(Vector(topic))
  }
}


object NumberConsumerByKafkaClient {
  /**
    * 生產actor
    * @return 引數包含actor
    */
  def props: Props = Props(new NumberConsumerByKafkaClient)
}

另一種用阻塞的kafkaStrema的:

package com.linewell.akkakafka.kafka.consume

import java.util.concurrent.Executors

import akka.actor.SupervisorStrategy.Escalate
import akka.actor._
import com.linewell.akkakafka.common.deserializer.{Decoder, LongDeserializer}
import com.linewell.akkakafka.common.util.KafkaConsumerConfig
import com.linewell.akkakafka.kafka.bean.Command._
import com.linewell.akkakafka.kafka.consume.NumberConsumerByKafkaStream.Consume
import com.typesafe.config.ConfigFactory
import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, KafkaStream}
import kafka.message.MessageAndMetadata
import org.apache.kafka.common.serialization.StringDeserializer
import scala.async.Async._
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import scala.util.{Failure, Success, Try}

/**
  * Created by ctao on 16-1-23.
  * kafkastream實現的消費,含有阻塞
  */
class NumberConsumerByKafkaStream extends Actor with ActorLogging {

  val conf = ConfigFactory.load()
  /**
    * 消費連線
    */
  private var consumer: Try[ConsumerConnector] = _
  /**
    * 執行緒池執行緒數,主要提供給kafka消費,將阻塞放在單獨池中
    */
  val threadNum = conf.getInt("common.threadNum")
  private val executor = Executors.newFixedThreadPool(threadNum)
  val topic = conf.getString("consumer.topic")
  /**
    * kafkaStream,在關閉前clean
    */
  private var streams: Option[List[KafkaStream[String, Long]]] = None

  override def receive: Receive = {
    /**
      * 開始消費,則獲取stream傳遞給自己
      */
    case StartConsume ⇒ consumer.foreach { (consumer: ConsumerConnector) ⇒
      val consumerStreams = consumer.createMessageStreams(Map(topic → 1)
        , Decoder(topic, new StringDeserializer),
        Decoder(topic, new LongDeserializer))

      streams = Option(consumerStreams(topic))
      if (streams.isDefined) {
        log.info(s"Got streams ${streams.get.length} $streams")
        streams.get.foreach { kafkaStream ⇒
          self ! Consume(kafkaStream)

        }
      }
    }

    /**
      * 如果是stream,則放在池中
      */
    case Consume(kafkaStream) ⇒
      log.info(s"Handling KafkaStream ${kafkaStream.clientId}")
      implicit val executionContextExecutor: ExecutionContextExecutor = ExecutionContext.fromExecutor(executor)
      async {
        kafkaStream.iterator().foreach {
          case msg: MessageAndMetadata[String, Long] ⇒
            log.info(s"${self.path.toString} : kafkaStream ${kafkaStream.clientId} " +
              s" received offset ${msg.offset}  partition ${msg.partition} value ${msg.message}")
        }
      }


  }


  override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
    case e: Exception ⇒
      //handle failing kafka
      log.error(s"Read failed $e")
      Escalate
  }

  /**
    * 開始前動作
    */
  override def preStart(): Unit = {
    super.preStart()
    consumer = Try(Consumer.create(consumerConfig))
    consumer match {
      case Success(c) ⇒ context.parent ! ReadInitialized(self)
        self ! StartConsume
      case Failure(e) ⇒
        log.error(e, "Could not create kafkaConsumer")
        context.parent ! Shutdown
    }


  }

  /**
    * 結束前動作
    * @throws java.lang.Exception 異常
    */
  @throws[Exception](classOf[Exception])
  override def postStop(): Unit = {
    if (streams.isDefined) {
      log.debug("cleaning streams")
      streams.get.foreach(_.clear())
      log.debug("cleaned streams")
    }
    log.debug("stopping all consumer")
    consumer.foreach(_.shutdown())
    log.debug("stop all consumer")
    log.debug("shutting down execution")
    executor.shutdown()
    log.debug("shutdown execution")
  }

  /**
    * 消費配置
    */
  private val consumerConfig: ConsumerConfig = new ConsumerConfig(KafkaConsumerConfig())

}


object NumberConsumerByKafkaStream {

  /**
    * 返回包含消費者的引數
    * @return
    */
  def props: Props = Props(new NumberConsumerByKafkaStream())

  /**
    * 包含kafkaStream的消費類
    * @param kafkaStream  流
    */
  private case class Consume(kafkaStream: KafkaStream[String, Long])

}

生產者

package com.linewell.akkakafka.kafka.produce

import akka.actor.SupervisorStrategy.Resume
import akka.actor._
import akka.event.LoggingReceive
import com.linewell.akkakafka.common.util.KafkaProducerConfig
import com.linewell.akkakafka.kafka.bean.Command.{StartProduce, WriteInitialized}
import com.typesafe.config.ConfigFactory
import org.apache.kafka.clients.producer._


/**
  * Created by ctao on 16-1-23.
  */
class NumberProducer extends Actor with ActorLogging {
  private val conf = ConfigFactory.load()
  /**
    * 產生數字數量
    */
  val numMessage = conf.getInt("common.numMessage")

  val topic = conf.getString("consumer.topic")


  override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
    case e: Exception ⇒
      //can handle failing here
      log.error(s"Write failed $e")
      Resume
  }

  private var producer: KafkaProducer[String, Long] = _

  /**
    * 啟動前執行
    */
  override def preStart(): Unit = {
    producer = initProducer()
    context.parent ! WriteInitialized(self)
    self ! StartProduce
  }

  /**
    * 接收到開始生產後則呼叫生產函式
    */
  override def receive: Receive = LoggingReceive {
    case StartProduce ⇒ produce(producer, numMessage)
  }

  /**
    * 結束前呼叫
    * @throws java.lang.Exception 異常
    */
  @throws[Exception](classOf[Exception])
  override def postStop(): Unit = {
    log.debug("closing producer")
    producer.close()
    log.debug("closed producer")
  }

  /**
    * 生產函式
    * @param producer 生產者
    * @param numMessage 訊息數量
    */
  private def produce(producer: KafkaProducer[String, Long], numMessage: Int): Unit = {
    (1 to numMessage).foreach { messageNum ⇒
      val message = new ProducerRecord[String, Long](topic, (messageNum + 1).toString, messageNum)
      producer.send(message, new Callback {
        override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
          val maybeMetadata = Option(metadata)
          val maybeException = Option(exception)
          if (maybeMetadata.isDefined) {
            log.info(s"actor ${self.path.toString}: $messageNum onCompletion offset ${metadata.offset},partition ${metadata.partition}")
          }
          if (maybeException.isDefined) {
            log.error(exception, s"$messageNum onCompletion received error")
          }
        }
      })


    }
  }

  /**
    * 初始化
    * @return 生產者
    */
  private def initProducer(): KafkaProducer[String, Long] = {
    log.debug(s"Config ${KafkaProducerConfig()}")
    new KafkaProducer[String, Long](KafkaProducerConfig())

  }
}


object NumberProducer {
  def props: Props = Props(new NumberProducer)
}

排程


package com.linewell.akkakafka.kafka.coordination

import akka.actor.{Actor, ActorLogging, ActorRef}
import akka.stream.ActorMaterializer
import com.linewell.akkakafka.kafka.bean.Command._
import com.linewell.akkakafka.kafka.bean.{Command, Mode}
import com.linewell.akkakafka.kafka.consume.NumberConsumerByKafkaStream
import com.linewell.akkakafka.kafka.produce.NumberProducer
import com.typesafe.config.ConfigFactory

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

/**
  * Created by ctao on 16-1-23.
  * 排程類
  */
class Coordinator extends Actor with ActorLogging {

  import Coordinator._

  val conf = ConfigFactory.load()

  /**
    * 排程時間,類似定時器
    */
  val schedulerTime = conf.getInt("common.scheduler.time")

  /**
    * 寫actor佇列
    */
  val writerBuffer: ArrayBuffer[Option[ActorRef]] = new ArrayBuffer[Option[ActorRef]]()

  /**
    * 讀actor佇列
    */
  val readerBuffer: ArrayBuffer[Option[ActorRef]] = new ArrayBuffer[Option[ActorRef]]()

  /**
    * 物化
    */
  lazy val mat = ActorMaterializer()(context)

  override def receive: Receive = {
    /**
      * 如果是包含startnum和模式的訊息,則呼叫構建方法
      */
    case msg@InitialMessage(StartNumber(num), mode) ⇒
      log.debug(s"Starting the numbers coordinator with $msg")
      buildWriteOrBuildRead(mode, num)

    /**
      * 無法識別的訊息型別
      */
    case msg: InitialMessage ⇒
      log.error(s"Did not understand $msg")
      log.error("shutdown")
      context.system.shutdown()

    /**
      * actor在初始化結束後會向父節點發送訊息,接收到讀初始化結束後,啟動一個排程時間的排程器,將stopactor
      * 的訊息發給自己
      */
    case ReadInitialized(actorRef) ⇒
      log.debug(s"Reader initialized :${actorRef.path.toString}")
      context.system.scheduler.scheduleOnce(schedulerTime.seconds, self, Stop(actorRef))
      log.debug(s"end scheduler stop ${actorRef.path.toString}")
    /**
      * actor在初始化結束後會向父節點發送訊息,接收到寫初始化結束後,啟動一個排程時間的排程器,將stopactor
      * 的訊息發給自己
      */
    case WriteInitialized(actorRef) ⇒
      log.debug(s"Writer initialized:${actorRef.path.toString}")
      context.system.scheduler.scheduleOnce(schedulerTime.seconds, self, Stop(actorRef))
      log.debug(s"end scheduler stop ${actorRef.path.toString}")

    /**
      * 收到自己傳送的stop訊息後則對應佇列移除actor
      */
    case Stop(actorRef) ⇒
      log.debug("Stopping the coordinator")

      writerBuffer -= Some(actorRef)
      readerBuffer -= Some(actorRef)

      log.debug(s"writeBuffer.length ${writerBuffer.length} and readerBuffer.length ${readerBuffer.length}")

      /**
        * 如果讀寫佇列都為空,則給自己傳送system的停止訊息
        */
      if (writerBuffer.isEmpty && readerBuffer.isEmpty) {
        context.system.scheduler.scheduleOnce(1.seconds, self, Shutdown)
      }

    /**
      * 停止system
      */
    case Shutdown ⇒
      log.debug("Shutting down the app")
      context.system.shutdown()
      log.info("shutdown the app")
  }


  /**
    * 構建讀寫模式方法
    * @param mode 模式
    * @param numActor actor數量
    */
  def buildWriteOrBuildRead(mode: Mode, numActor: Int): Unit = mode match {
    /**
      * 寫模式,則寫佇列增減對應數量actor
      */
    case Mode.Write ⇒
      log.debug("write mode")
      (1 to numActor).foreach { x ⇒
        val writer = Some(context.actorOf(NumberProducer.props, name = s"writerActor-$x"))
        writerBuffer += writer
      }

    /**
      * 讀模式,則讀佇列增減對應數量actor
      */
    case Mode.Read ⇒
      log.debug("read mode")
      (1 to numActor).foreach { x ⇒
        val reader = Some(context.actorOf(NumberConsumerByKafkaStream.props, name = s"readerActor-$x"))
        readerBuffer += reader
      }

    /**
      * 讀寫模式,則讀寫佇列各增減對應數量actor
      */
    case Mode.Readwrite ⇒
      log.debug("readwrite mode")
      (1 to numActor).foreach { x ⇒
        val writer = Some(context.actorOf(NumberProducer.props, name = s"writerActor-$x"))
        val reader = Some(context.actorOf(NumberConsumerByKafkaStream.props, name = s"readerActor-$x"))
        writerBuffer += writer
        readerBuffer += reader
      }


  }
}


case object Coordinator {

  /**
    * 初始化訊息
    * @param name 訊息型別
    * @param mode 模式
    */
  case class InitialMessage(name: Command, mode: Mode)

}

應用


package com.linewell.akkakafka.kafka.application

import akka.actor.{ActorSystem, Props}
import com.linewell.akkakafka.kafka.bean.Command.StartNumber
import com.linewell.akkakafka.kafka.bean.Mode.{Readwrite, Write, Read}
import com.linewell.akkakafka.kafka.coordination.Coordinator
import com.linewell.akkakafka.kafka.coordination.Coordinator.InitialMessage
import com.typesafe.config.ConfigFactory
import org.slf4j.LoggerFactory

/**
  * Created by ctao on 16-1-23.
  */
object Application extends App {

  val log = LoggerFactory.getLogger(this.getClass)

  val system = ActorSystem("Kafka")

  /**
    * 排程actor
    */
  val coordinator = system.actorOf(Props(new Coordinator), name = "coordinator")

  val conf = ConfigFactory.load()
  private val numActor = conf.getInt("common.actor")
  log.info(s"start app")

  /**
    * app的模式
    */
  val appMode = conf.getString("common.mode") match {
    case s:String  ⇒ s.toUpperCase match {
      case "READ" ⇒ Read
      case "WRITE" ⇒ Write
      case "READWRITE" ⇒ Readwrite
    }
    case _ ⇒ throw  new IllegalArgumentException("can't load mode")
  }

  /**
    * 啟動排程
    */

  coordinator ! InitialMessage(StartNumber(numActor),appMode)

}

kafka配置類


package com.linewell.akkakafka.common.util

import java.util.Properties

import com.typesafe.config.ConfigFactory

/**
  * Created by ctao on 16-1-27.
  * kafka消費者配置訊息
  */
trait KafkaConsumerConfig extends Properties {

  import KafkaConsumerConfig._

  private val consumerPrefixWithDot = consumerPrefix + "."

  val allKeys = Seq(groupId,
    zookeeperConnect,
    zookeeperConnectionTimeOut,
    zookeeperSessionTimeOut,
    reBalanceBackOff,
    reBalanceMaxRetries,
    keyDeserializer,
    valueDeserializer,
    servers
  )

  lazy val conf = ConfigFactory.load()

  allKeys.map { key ⇒
    if (conf.hasPath(key)) {
      put(key.replace(consumerPrefixWithDot, ""), conf.getString(key))
    }
  }

}


object KafkaConsumerConfig {

  val consumerPrefix = "consumer"
  //Consumer Keys
  val groupId = s"$consumerPrefix.group.id"
  val zookeeperConnect = s"$consumerPrefix.zookeeper.connect"
  val topic = s"$consumerPrefix.topic"
  val zookeeperSessionTimeOut = s"$consumerPrefix.zookeeper.session.timeout.ms"
  val zookeeperConnectionTimeOut = s"$consumerPrefix.zookeeper.connection.timeout.ms"
  val reBalanceBackOff = s"$consumerPrefix.rebalance.backoff.ms"
  val reBalanceMaxRetries = s"$consumerPrefix.rebalance.max.retries"
  val keyDeserializer = s"$consumerPrefix.key.com.linewell.akkakafka.common.deserializer"
  val valueDeserializer = s"$consumerPrefix.value.com.linewell.akkakafka.common.deserializer"
  val servers = s"$consumerPrefix.bootstrap.servers"
  def apply(): KafkaConsumerConfig = new KafkaConsumerConfig {}
}


package com.linewell.akkakafka.common.util

import java.util.Properties

import com.typesafe.config.ConfigFactory
import org.apache.kafka.clients.producer.ProducerConfig

/**
  * Created by ctao on 16-1-25.
  * kafka生產者配置資訊
  */
trait KafkaProducerConfig extends Properties {

  import KafkaProducerConfig._

  private val producerPrefixWithDot = producerPrefix + "."

  private val allKeys = Seq(
    brokers,
    brokers,
    keySerializer,
    valueSerializer,
    partitioner,
    requiredAcks,
    servers
  )

  lazy val conf = ConfigFactory.load()
  allKeys.map { key ⇒
    if (conf.hasPath(key)) {
      put(key.replace(producerPrefixWithDot, ""), conf.getString(key))
    }
  }




}

object KafkaProducerConfig {

  val producerPrefix = "producer"


  //Producer Keys
  val brokers = s"$producerPrefix.metadata.broker.list"
  val keySerializer = s"$producerPrefix.key.com.linewell.akkakafka.common.serializer"
  val valueSerializer = s"$producerPrefix.value.com.linewell.akkakafka.common.serializer"
  val servers = s"$producerPrefix.bootstrap.servers"
  val partitioner = s"$producerPrefix.partitioner.class"
  val requiredAcks = s"$producerPrefix.request.required.acks"


  def apply(): KafkaProducerConfig = new KafkaProducerConfig {}

}

序列化和解析

package com.linewell.akkakafka.common.serializer

import java.util

import org.apache.kafka.common.serialization.Serializer

/**
  * Created by ctao on 16-1-26.
  * Long型的序列化,是kafka序列化的子類
  */
class LongSerializer extends Serializer[Long] {
  override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()

  override def serialize(topic: String, data: Long): Array[Byte] = BigInt(data).toByteArray

  override def close(): Unit = ()
}



package com.linewell.akkakafka.common.deserializer

import java.util

import org.apache.kafka.common.serialization.Deserializer

/**
  * Created by ctao on 16-1-26.
  * Long解析類
  */
class LongDeserializer extends Deserializer[Long]{
  override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()

  override def close(): Unit = ()

  override def deserialize(topic: String, data: Array[Byte]): Long = BigInt(data).toLong
}