kafka模擬生產者-消費者以及自定義分割槽
阿新 • • 發佈:2018-12-02
基本概念
kafka中的重要角色
broker:一臺kafka伺服器就是一個broker,一個叢集可有多個broker,一個broker可以容納多個topic
topic:可以理解為一個訊息佇列的名字
partition:分割槽,為了實現擴充套件性,一個topic可以分佈到多個broker上,一個topic可以被分成多個partition,partition中的每條訊息 都會被分配一個有序的id(offset)。kafka只保證按一個partition中的順序將訊息發給consumer,不保證一個topic的整體的順序。也就是說,一個topic在叢集中可以有多個partition 。kafka有Key Hash演算法和Round Robin演算法兩種分割槽策略。
producer:訊息的生產者,是向kafka發訊息的客戶端
consumer:訊息消費者,向broker取訊息的客戶端
offset:偏移量,用來記錄consumer消費訊息的位置
Consumer Group:消費組,訊息系統有兩類,一是廣播,二是訂閱釋出。
編碼實現
建立一個生產者
package sancen.kafka import java.util.Properties import kafka.producer.{KeyedMessage, Producer, ProducerConfig} /** * 類名 ProducerDemo * 作者 彭三青 * 建立時間 2018-11-26 9:49 * 版本 1.0 * 描述: $ 實現一個生產者,把模擬資料傳送到kafka叢集 */ object ProducerDemo { def main(args: Array[String]): Unit = { // 定義一個接收資料的topic val topic = "test" // 建立配置資訊 val props = new Properties() // 指定序列化類 props.put("serializer.class", "kafka.serializer.StringEncoder") // 指定kafka列表 props.put("metadata.broker.list", "SC01:9092, SC01:9092, SC03:9092") // 設定傳送資料後的響應方式 props.put("request.required.acks", "0") // 指定分割槽器 // props.put("partitioner.class", "kafka.producer.DefaultPartitioner // 自定義分割槽器 props.put("partitioner.class", "day01.kafka.CustomPartitioner") // 建立producer物件 val config: ProducerConfig = new ProducerConfig(props) // 建立生產者物件 val producer: Producer[String, String] = new Producer(config) // 模擬資料 for(i <- 1 to 10000){ val msg = s"$i : producer send data" producer.send(new KeyedMessage[String, String](topic, msg)) //key偏移量,也可以給空 v實際的資料 Thread.sleep(500) } } }
建立消費者
package sancen.kafka import java.util.Properties import java.util.concurrent.{ExecutorService, Executors} import kafka.consumer._ import scala.collection.mutable /** * 類名 ConsumerDemo * 作者 彭三青 * 建立時間 2018-11-26 10:08 * 版本 1.0 * 描述: $ 建立一個Consumer消費kafka的資料 */ class ConsumerDemo(val consumer: String, val stream: KafkaStream[Array[Byte], Array[Byte]]) extends Runnable{ override def run(): Unit = { val it: ConsumerIterator[Array[Byte], Array[Byte]] = stream.iterator() while (it.hasNext()){ val data = it.next() val topic = data.topic val partition = data.partition val offset = data.offset val msg: String = new String(data.message()) println(s"Consumer:$consumer, topic:$topic, partiton:$partition, offset:$offset, msg:$msg") } } } object ConsumerDemo { def main(args: Array[String]): Unit = { // 定義獲取的topic val topic = "test" // 定義一個map,用來儲存多個topic key:topic名稱,value:指定執行緒數用來獲取topic的資料 val topics = new mutable.HashMap[String, Int]() // 要求就要傳一個map,可以放一個或者多個topic topics.put(topic, 2) // 配置資訊 val props = new Properties() // 指定consumer組名 props.put("group.id", "group02") // 指定zk列表 props.put("zookeeper.connect", "SC01:2181,SC02:2181,SC03:2181") // 指定offset異常時需要獲取的offset值 props.put("auto.offset.reset", "smallest") // 建立配置資訊 val config = new ConsumerConfig(props) // 建立consumer物件 val consumer: ConsumerConnector = Consumer.create(config) // 獲取資料,返回的map型別中key:topic名稱,value:topic對應的資料 val streams: collection.Map[String, List[KafkaStream[Array[Byte], Array[Byte]]]] = consumer.createMessageStreams(topics) // 獲取指定topic的資料 val stream: Option[List[KafkaStream[Array[Byte], Array[Byte]]]] = streams.get(topic) // 建立固定大小的執行緒池 val pool: ExecutorService = Executors.newFixedThreadPool(3) for(i <- 0 until stream.size){ pool.execute(new ConsumerDemo(s"Consumer:$i", stream.get(i))) } } }
建立自定義分割槽類
package sancen.kafka
import kafka.producer.Partitioner
import kafka.utils.VerifiableProperties
import org.apache.kafka.common.utils.Utils
/**
* 類名 CustomPartitioner
* 作者 彭三青
* 建立時間 2018-11-26 20:29
* 版本 1.0
* 描述: $
*/
// 要實現自定義分割槽器必須要繼承Partitioner
class CustomPartitioner(props: VerifiableProperties) extends Partitioner{
override def partition(key: Any, numPartitions: Int): Int = {
Utils.abs(key.hashCode) % numPartitions
}
}
程式測試
後臺啟動kafka叢集
kafka-server-start.sh kafka_2.11-0.9.0.1/config/server.properties &
在kafka叢集上建立一個名為test的topic,指定分割槽為2,一般一個topic對應一個分割槽
kafka-topics.sh --create --zookeeper SC01:2181 --replication-factor 2 --partitions 2 --topic test
分別執行ProducerDemo和ConsumerDemo則可以在ConsumerDemo端視窗打印出資訊