[筆記]CSAPP第五章 優化程式效能
阿新 • • 發佈:2022-04-21
①.kafka需要依賴zk管理,在搭建kafka叢集之前需要先搭建zk叢集:
https://my.oschina.net/u/2486137/blog/1537389
②.從apache kafka官網下載kafka( 二進位制版本)
注意下載的版本否則會在啟動時報錯:找不到主類Kafka.kafka.
我這裡使用的是2.10版本.
③.配置config/server.properties檔案:
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # see kafka.server.KafkaConfig for additional details and defaults ############################# Server Basics ############################# # The id of the broker. This must be set to a unique integer for each broker. #每個Broker在叢集中的唯一標識.即使Broker的IP地址發生變化,broker.id只要沒變, #則不會影響consumers的訊息標識. broker.id=0 #類似於zk的myid, #是否允許Topic被刪除,如果是false,使用管理員工具刪除Topic的時候,kafka並不會處理此操作 delete.topic.enable=true #是否允許自動建立topic,若是false,就需要通過命令建立topic,預設為true,建議設定成false, #並在使用topic之前手動建立. #如果開啟此選項(true)則以下2種請求會觸發topic的自動建立: #①.producer向某個不存在的topic寫入訊息 #②.consumer某個不存在的topic讀取訊息 auto.create.topics.enable =true ############################# Socket Server Settings ############################# ############################# 下面是服務端網路的相關配置 ############################# # The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured. # FORMAT: # kafka server使用的協議,主機名及埠格式如下: # listeners = security_protocol://host_name:port # EXAMPLE: #參考示例: # listeners = PLAINTEXT://your.host.name:9092 #listeners=PLAINTEXT://:9092,#這是預設配置,使用PLAINTEXT,埠是9092 # Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). #advertised.listeners=PLAINTEXT://your.host.name:9092 # The number of threads handling network requests #broker處理訊息的最大執行緒數,一般情況下不需要去修改 num.network.threads=3 # The number of threads doing disk I/O #broker處理磁碟IO的執行緒數,數值應該大於你的硬碟數 num.io.threads=8 # The send buffer (SO_SNDBUF) used by the socket server #socket的傳送緩衝區,socket的調優引數SO_SNDBUFF,如果是-1就使用作業系統的預設值 socket.send.buffer.bytes=102400 # The receive buffer (SO_RCVBUF) used by the socket server #socket的接受緩衝區,socket的調優引數SO_RCVBUFF,如果是-1就使用作業系統的預設值 socket.receive.buffer.bytes=102400 # The maximum size of a request that the socket server will accept (protection against OOM) #socket請求的最大數值,防止serverOOM,message.max.bytes必然要小於socket.request.max.bytes,會被topic建立時的指定引數覆蓋 socket.request.max.bytes=104857600 ############################# Log Basics ############################# # A comma seperated list of directories under which to store log files #儲存log檔案的目錄,可以將多個目錄通過逗號分隔,形成一個目錄列表 log.dirs=/tmp/kafka-logs # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. #每個topic的分割槽個數,預設為1,若是在topic建立時候沒有指定的話會被topic建立時的指定引數覆蓋 num.partitions=3 # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. # This value is recommended to be increased for installations with data dirs located in RAID array. #用來恢復log檔案以及關閉時將log資料重新整理到磁碟的執行緒數量,每個目錄對應num.recovery.threads.per.data.dir個執行緒 num.recovery.threads.per.data.dir=1 ############################# Log Flush Policy ############################# ############################# log檔案刷盤的相關配置 ############################# # Messages are immediately written to the filesystem but by default we only fsync() to sync # the OS cache lazily. The following configurations control the flush of data to disk. # There are a few important trade-offs here: # 1. Durability: Unflushed data may be lost if you are not using replication. # 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. # The settings below allow one to configure the flush policy to flush data after a period of time or # every N messages (or both). This can be done globally and overridden on a per-topic basis. # The number of messages to accept before forcing a flush of data to disk #log檔案”sync”到磁碟之前累積的訊息條數,因為磁碟IO操作是一個慢操作, #但又是一個”資料可靠性"的必要手段,所以此引數的設定,需要在"資料可靠性"與"效能"之間做必要的權衡. #如果此值過大,將會導致每次"fsync"的時間較長(IO阻塞), #如果此值過小,將會導致"fsync"的次數較多, #這也意味著整體的client請求有一定的延遲.物理server故障,將會導致沒有fsync的訊息丟失. #每隔多少個訊息觸發一次flush操作,將記憶體中的資料重新整理到磁碟 #log.flush.interval.messages=10000 # The maximum amount of time a message can sit in a log before we force a flush #僅僅通過interval來控制訊息的磁碟寫入時機,是不足的. #此引數用於控制"fsync"的時間間隔,如果訊息量始終沒有達到閥值,但是離上一次磁碟同步的時間間隔達到閥值,也將觸發. #每隔多少毫秒觸發一次flush操作,將記憶體中的資料重新整理到磁碟 #log.flush.interval.ms=1000 ############################# Log Retention Policy ############################# ############################# Log 相關的儲存策略 ############################# # The following configurations control the disposal of log segments. The policy can # be set to delete segments after a period of time, or after a given size has accumulated. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens # from the end of the log. #注意:下面有兩種配置,一種是基於時間的策略,另一種是基於日誌檔案大小的策略,兩種. #策略同是配置的話,只要滿足其中一種,則觸發log刪除的操作,刪除操作總是刪除最舊的日誌 # The minimum age of a log file to be eligible for deletion #訊息在kafka中儲存的時間,168小時前的log,可以被刪除掉 log.retention.hours=168 # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining # segments don't drop below log.retention.bytes. #當剩餘空間低於log.segment.bytes位元組,則開始刪除log #log.retention.bytes=1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. # segment日誌檔案大小的上限值,當超過這個值,會建立新的segment日誌檔案 log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies #每隔300000ms,logcleaner執行緒將檢查一次,看是否符合上述保留策略的訊息可以被刪除 log.retention.check.interval.ms=300000 ############################# Zookeeper ############################# ############################# Zookeeper的相關配置 ############################# # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. #kafka依賴的Zookeeper叢集地址,可以配置多個Zookeeper地址,使用,隔開 zookeeper.connect=zk1:2181,zk2:2181,zk3:2181 # Timeout in ms for connecting to zookeeper #Zookeeper連線超時的超時時間 zookeeper.connection.timeout.ms=6000
④.檢視啟動日誌:
[2017-09-16 19:22:12,567] INFO KafkaConfig values: advertised.host.name = null advertised.listeners = null advertised.port = null authorizer.class.name = auto.create.topics.enable = true auto.leader.rebalance.enable = true background.threads = 10 broker.id = 3 broker.id.generation.enable = true broker.rack = null compression.type = producer connections.max.idle.ms = 600000 controlled.shutdown.enable = true controlled.shutdown.max.retries = 3 controlled.shutdown.retry.backoff.ms = 5000 controller.socket.timeout.ms = 30000 default.replication.factor = 1 delete.topic.enable = false fetch.purgatory.purge.interval.requests = 1000 group.max.session.timeout.ms = 300000 group.min.session.timeout.ms = 6000 host.name = inter.broker.protocol.version = 0.10.1-IV2 leader.imbalance.check.interval.seconds = 300 leader.imbalance.per.broker.percentage = 10 listeners = PLAINTEXT://k1:9092 log.cleaner.backoff.ms = 15000 log.cleaner.dedupe.buffer.size = 134217728 log.cleaner.delete.retention.ms = 86400000 log.cleaner.enable = true log.cleaner.io.buffer.load.factor = 0.9 log.cleaner.io.buffer.size = 524288 log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308 log.cleaner.min.cleanable.ratio = 0.5 log.cleaner.min.compaction.lag.ms = 0 log.cleaner.threads = 1 log.cleanup.policy = [delete] log.dir = /tmp/kafka-logs log.dirs = /usr/local/kafka_2.10/kafka-logs log.flush.interval.messages = 9223372036854775807 log.flush.interval.ms = null log.flush.offset.checkpoint.interval.ms = 60000 log.flush.scheduler.interval.ms = 9223372036854775807 log.index.interval.bytes = 4096 log.index.size.max.bytes = 10485760 log.message.format.version = 0.10.1-IV2 log.message.timestamp.difference.max.ms = 9223372036854775807 log.message.timestamp.type = CreateTime log.preallocate = false log.retention.bytes = -1 log.retention.check.interval.ms = 300000 log.retention.hours = 168 log.retention.minutes = null log.retention.ms = null log.roll.hours = 168 log.roll.jitter.hours = 0 log.roll.jitter.ms = null log.roll.ms = null log.segment.bytes = 1073741824 log.segment.delete.delay.ms = 60000 max.connections.per.ip = 2147483647 max.connections.per.ip.overrides = message.max.bytes = 1000012 metric.reporters = [] metrics.num.samples = 2 metrics.sample.window.ms = 30000 min.insync.replicas = 1 num.io.threads = 8 num.network.threads = 3 num.partitions = 3 num.recovery.threads.per.data.dir = 1 num.replica.fetchers = 1 offset.metadata.max.bytes = 4096 offsets.commit.required.acks = -1 offsets.commit.timeout.ms = 5000 offsets.load.buffer.size = 5242880 offsets.retention.check.interval.ms = 600000 offsets.retention.minutes = 1440 offsets.topic.compression.codec = 0 offsets.topic.num.partitions = 50 offsets.topic.replication.factor = 3 offsets.topic.segment.bytes = 104857600 port = 9092 principal.builder.class = class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder producer.purgatory.purge.interval.requests = 1000 ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS unclean.leader.election.enable = true zookeeper.connect = zk1:2181,zk2:2181,zk3:2181 zookeeper.connection.timeout.ms = 6000 zookeeper.session.timeout.ms = 6000 zookeeper.set.acl = false zookeeper.sync.time.ms = 2000 (kafka.server.KafkaConfig) [2017-09-16 19:22:12,910] INFO starting (kafka.server.KafkaServer) [2017-09-16 19:22:13,183] INFO [ThrottledRequestReaper-Fetch], Starting (kafka.server.ClientQuotaManager$ThrottledReques tReaper) [2017-09-16 19:22:13,183] INFO [ThrottledRequestReaper-Produce], Starting (kafka.server.ClientQuotaManager$ThrottledRequ estReaper) queued.max.requests = 500 quota.consumer.default = 9223372036854775807 quota.producer.default = 9223372036854775807 quota.window.num = 11 quota.window.size.seconds = 1 replica.fetch.backoff.ms = 1000 replica.fetch.max.bytes = 1048576 replica.fetch.min.bytes = 1 replica.fetch.response.max.bytes = 10485760 replica.fetch.wait.max.ms = 500 replica.high.watermark.checkpoint.interval.ms = 5000 replica.lag.time.max.ms = 10000 replica.socket.receive.buffer.bytes = 65536 replica.socket.timeout.ms = 30000 replication.quota.window.num = 11 replication.quota.window.size.seconds = 1 request.timeout.ms = 30000 reserved.broker.max.id = 1000 sasl.enabled.mechanisms = [GSSAPI] sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.principal.to.local.rules = [DEFAULT] sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism.inter.broker.protocol = GSSAPI security.inter.broker.protocol = PLAINTEXT socket.receive.buffer.bytes = 102400 socket.request.max.bytes = 104857600 socket.send.buffer.bytes = 102400 ssl.cipher.suites = null ssl.client.auth = none ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null [2017-09-16 19:22:13,241] INFO Connecting to zookeeper on zk1:2181,zk2:2181,zk3:2181 (kafka .server.KafkaServer) [2017-09-16 19:22:15,475] INFO Cluster ID = YkyEXTiPR62G5jdo1v6rKQ (kafka.server.KafkaServer) [2017-09-16 19:22:15,570] INFO Log directory '/usr/local/kafka_2.10/kafka-logs' not found, creating it. (kafka.log.LogMan ager) [2017-09-16 19:22:15,708] INFO Loading logs. (kafka.log.LogManager) [2017-09-16 19:22:15,723] INFO Logs loading complete in 15 ms. (kafka.log.LogManager) [2017-09-16 19:22:21,676] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager) [2017-09-16 19:22:21,844] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManage r) [2017-09-16 19:22:21,850] WARN No meta.properties file under dir /usr/local/kafka_2.10/kafka-logs/meta.properties (kafka. server.BrokerMetadataCheckpoint) [2017-09-16 19:22:22,028] INFO Awaiting socket connections on k3:9092. (kafka.network.Acceptor) [2017-09-16 19:22:22,032] INFO [Socket Server on Broker 3], Started 1 acceptor threads (kafka.network.SocketServer) [2017-09-16 19:22:22,081] INFO [ExpirationReaper-3], Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationRe aper) [2017-09-16 19:22:22,092] INFO [ExpirationReaper-3], Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationRe aper) [2017-09-16 19:22:22,174] INFO [ExpirationReaper-3], Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationRe aper) [2017-09-16 19:22:22,181] INFO [ExpirationReaper-3], Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationRe aper) [2017-09-16 19:22:22,186] INFO [ExpirationReaper-3], Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationRe aper) [2017-09-16 19:22:22,218] INFO [GroupCoordinator 3]: Starting up. (kafka.coordinator.GroupCoordinator) [2017-09-16 19:22:22,220] INFO [GroupCoordinator 3]: Startup complete. (kafka.coordinator.GroupCoordinator) [2017-09-16 19:22:22,233] INFO [Group Metadata Manager on Broker 3]: Removed 0 expired offsets in 8 milliseconds. (kafka. coordinator.GroupMetadataManager) [2017-09-16 19:22:22,890] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$) [2017-09-16 19:22:22,992] INFO Creating /brokers/ids/3 (is it secure? false) (kafka.utils.ZKCheckedEphemeral) [2017-09-16 19:22:23,087] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) [2017-09-16 19:22:23,090] INFO Registered broker 3 at path /brokers/ids/3 with addresses: PLAINTEXT -> EndPoint(192.168.1 .137,9092,PLAINTEXT) (kafka.utils.ZkUtils) [2017-09-16 19:22:23,092] WARN No meta.properties file under dir /usr/local/kafka_2.10/kafka-logs/meta.properties (kafka. server.BrokerMetadataCheckpoint) [2017-09-16 19:22:23,498] INFO [Kafka Server 3], started (kafka.server.KafkaServer) [2017-09-16 19:32:22,220] INFO [Group Metadata Manager on Broker 3]: Removed 0 expired offsets in 0 milliseconds. (kafka. coordinator.GroupMetadataManager)
⑤.建立一個topic
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test-1 --partitions 3 --replication-factor 3 --config max.message.bytes=64000 --config flush.messages=1
⑥.檢視topic資訊:
可以看到主題,分割槽,副本等一些資訊
[root@localhost kafka_2.10]# bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test-1 Topic:test-1 PartitionCount:3 ReplicationFactor:3 Configs:max.message.bytes=64000,flush.messages=1 Topic: test-1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: test-1 Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 Topic: test-1 Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
⑦.java client程式碼:
生產者:
List<ProducerInterceptor<Integer,String>> interceptors = new ArrayList<ProducerInterceptor<Integer,String>>();
interceptors.add(new KafkaProducerInterceptor());
Properties props = new Properties();
props.put("bootstrap.servers", KafkaCfg.BROCKER_LIST);
props.put("key.serializer", IntegerSerializer.class);
props.put("value.serializer", StringSerializer.class);
props.put("compression.type", "gzip");
@SuppressWarnings("resource")
KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(props);
String content = "";
for(int i =0;i<100;i++){
content = "hello:"+(i+1);
ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(
"test-1", i, content);
producer.send(record, new KafkaHandle());
System.out.println("async message:" + content);
}
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/E:/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.1/log4j-slf4j-impl-2.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/E:/repository/org/slf4j/slf4j-log4j12/1.7.1/slf4j-log4j12-1.7.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
722 [main] INFO o.a.k.c.p.ProducerConfig - ProducerConfig values:
interceptor.classes = null
request.timeout.ms = 30000
ssl.truststore.password = null
retry.backoff.ms = 100
buffer.memory = 33554432
batch.size = 16384
ssl.keymanager.algorithm = SunX509
receive.buffer.bytes = 32768
ssl.key.password = null
ssl.cipher.suites = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.service.name = null
ssl.provider = null
max.in.flight.requests.per.connection = 5
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
bootstrap.servers = [192.168.1.135:9092, 192.168.1.136:9092, 192.168.1.137:9092]
client.id =
max.request.size = 1048576
acks = 1
linger.ms = 0
sasl.kerberos.kinit.cmd = /usr/bin/kinit
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
metadata.fetch.timeout.ms = 60000
ssl.endpoint.identification.algorithm = null
ssl.keystore.location = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
ssl.truststore.location = null
ssl.keystore.password = null
block.on.buffer.full = false
key.serializer = class org.apache.kafka.common.serialization.IntegerSerializer
metrics.sample.window.ms = 30000
security.protocol = PLAINTEXT
metadata.max.age.ms = 300000
ssl.protocol = TLS
sasl.kerberos.min.time.before.relogin = 60000
timeout.ms = 30000
connections.max.idle.ms = 540000
ssl.trustmanager.algorithm = PKIX
metric.reporters = []
ssl.truststore.type = JKS
compression.type = gzip
retries = 0
max.block.ms = 60000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
send.buffer.bytes = 131072
reconnect.backoff.ms = 50
metrics.num.samples = 2
ssl.keystore.type = JKS
794 [main] INFO o.a.k.c.p.ProducerConfig - ProducerConfig values:
interceptor.classes = null
request.timeout.ms = 30000
ssl.truststore.password = null
retry.backoff.ms = 100
buffer.memory = 33554432
batch.size = 16384
ssl.keymanager.algorithm = SunX509
receive.buffer.bytes = 32768
ssl.key.password = null
ssl.cipher.suites = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.service.name = null
ssl.provider = null
max.in.flight.requests.per.connection = 5
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
bootstrap.servers = [192.168.1.135:9092, 192.168.1.136:9092, 192.168.1.137:9092]
client.id = producer-1
max.request.size = 1048576
acks = 1
linger.ms = 0
sasl.kerberos.kinit.cmd = /usr/bin/kinit
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
metadata.fetch.timeout.ms = 60000
ssl.endpoint.identification.algorithm = null
ssl.keystore.location = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
ssl.truststore.location = null
ssl.keystore.password = null
block.on.buffer.full = false
key.serializer = class org.apache.kafka.common.serialization.IntegerSerializer
metrics.sample.window.ms = 30000
security.protocol = PLAINTEXT
metadata.max.age.ms = 300000
ssl.protocol = TLS
sasl.kerberos.min.time.before.relogin = 60000
timeout.ms = 30000
connections.max.idle.ms = 540000
ssl.trustmanager.algorithm = PKIX
metric.reporters = []
ssl.truststore.type = JKS
compression.type = gzip
retries = 0
max.block.ms = 60000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
send.buffer.bytes = 131072
reconnect.backoff.ms = 50
metrics.num.samples = 2
ssl.keystore.type = JKS
798 [main] INFO o.a.k.c.u.AppInfoParser - Kafka version : 0.10.0.1
798 [main] INFO o.a.k.c.u.AppInfoParser - Kafka commitId : a7a17cdec9eaa6c5
async message:hello:1
async message:hello:2
async message:hello:3
async message:hello:4
async message:hello:5
async message:hello:6
async message:hello:7
async message:hello:8
async message:hello:9
async message:hello:10
async message:hello:11
async message:hello:12
async message:hello:13
async message:hello:14
async message:hello:15
async message:hello:16
async message:hello:17
async message:hello:18
async message:hello:19
async message:hello:20
async message:hello:21
async message:hello:22
async message:hello:23
async message:hello:24
async message:hello:25
async message:hello:26
async message:hello:27
async message:hello:28
async message:hello:29
async message:hello:30
async message:hello:31
async message:hello:32
async message:hello:33
async message:hello:34
async message:hello:35
async message:hello:36
async message:hello:37
async message:hello:38
async message:hello:39
async message:hello:40
async message:hello:41
async message:hello:42
async message:hello:43
async message:hello:44
async message:hello:45
async message:hello:46
async message:hello:47
async message:hello:48
async message:hello:49
async message:hello:50
async message:hello:51
async message:hello:52
async message:hello:53
async message:hello:54
async message:hello:55
async message:hello:56
async message:hello:57
async message:hello:58
async message:hello:59
async message:hello:60
async message:hello:61
async message:hello:62
async message:hello:63
async message:hello:64
async message:hello:65
async message:hello:66
async message:hello:67
async message:hello:68
async message:hello:69
async message:hello:70
async message:hello:71
async message:hello:72
async message:hello:73
async message:hello:74
async message:hello:75
async message:hello:76
async message:hello:77
async message:hello:78
async message:hello:79
async message:hello:80
async message:hello:81
async message:hello:82
async message:hello:83
async message:hello:84
async message:hello:85
async message:hello:86
async message:hello:87
async message:hello:88
async message:hello:89
async message:hello:90
async message:hello:91
async message:hello:92
async message:hello:93
async message:hello:94
async message:hello:95
async message:hello:96
async message:hello:97
async message:hello:98
async message:hello:99
async message:hello:100
消費者:
ExecutorService fixedPool = Executors.newFixedThreadPool(3);
fixedPool.execute(new Runnable() {
public void run() {
Properties props = new Properties();
props.put("bootstrap.servers", KafkaCfg.BROCKER_LIST);
props.put("group.id", KafkaCfg.GROUP_ID);
props.put("zookeeper.session.timeout.ms", "60000");
props.put("zookeeper.sync.time.ms", "200");
props.put("enable.auto.commit", "true"); // 自動commit
props.put("auto.commit.interval.ms", "1000");
//latest, earliest, none
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", IntegerDeserializer.class);
props.put("value.deserializer", StringDeserializer.class);
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(props);
consumer.subscribe(Arrays.asList(KafkaCfg.TOPIC, KafkaCfg.TOPIC2)); // 可消費多個topic,組成一個list
while (true) {
ConsumerRecords<Integer, String> records = consumer.poll(100);
for (ConsumerRecord<Integer, String> record : records) {
System.out.println("record:"+new Gson().toJson(record));
}
}
}
});
record:{"topic":"test-1","partition":0,"offset":17,"timestamp":1505629339505,"timestampType":"CREATE_TIME","checksum":3084842117,"serializedKeySize":4,"serializedValueSize":7,"key":1,"value":"hello:2"}
record:{"topic":"test-1","partition":0,"offset":18,"timestamp":1505629339520,"timestampType":"CREATE_TIME","checksum":2036504617,"serializedKeySize":4,"serializedValueSize":7,"key":7,"value":"hello:8"}
record:{"topic":"test-1","partition":0,"offset":19,"timestamp":1505629339520,"timestampType":"CREATE_TIME","checksum":2096183246,"serializedKeySize":4,"serializedValueSize":7,"key":8,"value":"hello:9"}
record:{"topic":"test-1","partition":0,"offset":20,"timestamp":1505629339524,"timestampType":"CREATE_TIME","checksum":1567468433,"serializedKeySize":4,"serializedValueSize":8,"key":14,"value":"hello:15"}
record:{"topic":"test-1","partition":0,"offset":21,"timestamp":1505629339525,"timestampType":"CREATE_TIME","checksum":2250809392,"serializedKeySize":4,"serializedValueSize":8,"key":15,"value":"hello:16"}
record:{"topic":"test-1","partition":0,"offset":22,"timestamp":1505629339525,"timestampType":"CREATE_TIME","checksum":795944797,"serializedKeySize":4,"serializedValueSize":8,"key":17,"value":"hello:18"}
record:{"topic":"test-1","partition":0,"offset":23,"timestamp":1505629339526,"timestampType":"CREATE_TIME","checksum":1596880373,"serializedKeySize":4,"serializedValueSize":8,"key":21,"value":"hello:22"}
record:{"topic":"test-1","partition":0,"offset":24,"timestamp":1505629339527,"timestampType":"CREATE_TIME","checksum":2549012433,"serializedKeySize":4,"serializedValueSize":8,"key":26,"value":"hello:27"}
record:{"topic":"test-1","partition":0,"offset":25,"timestamp":1505629339528,"timestampType":"CREATE_TIME","checksum":3946489373,"serializedKeySize":4,"serializedValueSize":8,"key":30,"value":"hello:31"}
record:{"topic":"test-1","partition":0,"offset":26,"timestamp":1505629339528,"timestampType":"CREATE_TIME","checksum":4171966126,"serializedKeySize":4,"serializedValueSize":8,"key":32,"value":"hello:33"}
record:{"topic":"test-1","partition":0,"offset":27,"timestamp":1505629339528,"timestampType":"CREATE_TIME","checksum":3143199368,"serializedKeySize":4,"serializedValueSize":8,"key":33,"value":"hello:34"}
record:{"topic":"test-1","partition":0,"offset":28,"timestamp":1505629339528,"timestampType":"CREATE_TIME","checksum":889962223,"serializedKeySize":4,"serializedValueSize":8,"key":35,"value":"hello:36"}
record:{"topic":"test-1","partition":0,"offset":29,"timestamp":1505629339529,"timestampType":"CREATE_TIME","checksum":658609139,"serializedKeySize":4,"serializedValueSize":8,"key":38,"value":"hello:39"}
record:{"topic":"test-1","partition":0,"offset":30,"timestamp":1505629339530,"timestampType":"CREATE_TIME","checksum":1769068338,"serializedKeySize":4,"serializedValueSize":8,"key":42,"value":"hello:43"}
record:{"topic":"test-1","partition":0,"offset":31,"timestamp":1505629339531,"timestampType":"CREATE_TIME","checksum":3207409220,"serializedKeySize":4,"serializedValueSize":8,"key":44,"value":"hello:45"}
record:{"topic":"test-1","partition":2,"offset":17,"timestamp":1505629339518,"timestampType":"CREATE_TIME","checksum":3419956930,"serializedKeySize":4,"serializedValueSize":7,"key":2,"value":"hello:3"}
record:{"topic":"test-1","partition":2,"offset":18,"timestamp":1505629339519,"timestampType":"CREATE_TIME","checksum":2857189508,"serializedKeySize":4,"serializedValueSize":7,"key":5,"value":"hello:6"}
record:{"topic":"test-1","partition":2,"offset":19,"timestamp":1505629339519,"timestampType":"CREATE_TIME","checksum":2866062050,"serializedKeySize":4,"serializedValueSize":7,"key":6,"value":"hello:7"}
record:{"topic":"test-1","partition":2,"offset":20,"timestamp":1505629339522,"timestampType":"CREATE_TIME","checksum":577748521,"serializedKeySize":4,"serializedValueSize":8,"key":12,"value":"hello:13"}
record:{"topic":"test-1","partition":2,"offset":21,"timestamp":1505629339525,"timestampType":"CREATE_TIME","checksum":1649992521,"serializedKeySize":4,"serializedValueSize":8,"key":16,"value":"hello:17"}
record:{"topic":"test-1","partition":0,"offset":32,"timestamp":1505629339533,"timestampType":"CREATE_TIME","checksum":2322283505,"serializedKeySize":4,"serializedValueSize":8,"key":47,"value":"hello:48"}
record:{"topic":"test-1","partition":0,"offset":33,"timestamp":1505629339535,"timestampType":"CREATE_TIME","checksum":2329901557,"serializedKeySize":4,"serializedValueSize":8,"key":48,"value":"hello:49"}
record:{"topic":"test-1","partition":2,"offset":22,"timestamp":1505629339525,"timestampType":"CREATE_TIME","checksum":3854334725,"serializedKeySize":4,"serializedValueSize":8,"key":18,"value":"hello:19"}
record:{"topic":"test-1","partition":2,"offset":23,"timestamp":1505629339525,"timestampType":"CREATE_TIME","checksum":1792756199,"serializedKeySize":4,"serializedValueSize":8,"key":19,"value":"hello:20"}
record:{"topic":"test-1","partition":2,"offset":24,"timestamp":1505629339526,"timestampType":"CREATE_TIME","checksum":2514692525,"serializedKeySize":4,"serializedValueSize":8,"key":22,"value":"hello:23"}
record:{"topic":"test-1","partition":2,"offset":25,"timestamp":1505629339527,"timestampType":"CREATE_TIME","checksum":1562610569,"serializedKeySize":4,"serializedValueSize":8,"key":25,"value":"hello:26"}
record:{"topic":"test-1","partition":2,"offset":26,"timestamp":1505629339527,"timestampType":"CREATE_TIME","checksum":3501401355,"serializedKeySize":4,"serializedValueSize":8,"key":28,"value":"hello:29"}
record:{"topic":"test-1","partition":2,"offset":27,"timestamp":1505629339528,"timestampType":"CREATE_TIME","checksum":2946838050,"serializedKeySize":4,"serializedValueSize":8,"key":31,"value":"hello:32"}
record:{"topic":"test-1","partition":2,"offset":28,"timestamp":1505629339529,"timestampType":"CREATE_TIME","checksum":2695171007,"serializedKeySize":4,"serializedValueSize":8,"key":36,"value":"hello:37"}
record:{"topic":"test-1","partition":2,"offset":29,"timestamp":1505629339530,"timestampType":"CREATE_TIME","checksum":3877831509,"serializedKeySize":4,"serializedValueSize":8,"key":40,"value":"hello:41"}
record:{"topic":"test-1","partition":2,"offset":30,"timestamp":1505629339530,"timestampType":"CREATE_TIME","checksum":2747042666,"serializedKeySize":4,"serializedValueSize":8,"key":41,"value":"hello:42"}
record:{"topic":"test-1","partition":2,"offset":31,"timestamp":1505629339531,"timestampType":"CREATE_TIME","checksum":4222789243,"serializedKeySize":4,"serializedValueSize":8,"key":45,"value":"hello:46"}
record:{"topic":"test-1","partition":2,"offset":32,"timestamp":1505629339531,"timestampType":"CREATE_TIME","checksum":830470691,"serializedKeySize":4,"serializedValueSize":8,"key":46,"value":"hello:47"}
record:{"topic":"test-1","partition":1,"offset":19,"timestamp":1505629339461,"timestampType":"CREATE_TIME","checksum":27654439,"serializedKeySize":4,"serializedValueSize":7,"key":0,"value":"hello:1"}
record:{"topic":"test-1","partition":1,"offset":20,"timestamp":1505629339519,"timestampType":"CREATE_TIME","checksum":2877195336,"serializedKeySize":4,"serializedValueSize":7,"key":3,"value":"hello:4"}
record:{"topic":"test-1","partition":1,"offset":21,"timestamp":1505629339519,"timestampType":"CREATE_TIME","checksum":2833341777,"serializedKeySize":4,"serializedValueSize":7,"key":4,"value":"hello:5"}
record:{"topic":"test-1","partition":1,"offset":22,"timestamp":1505629339520,"timestampType":"CREATE_TIME","checksum":1116560893,"serializedKeySize":4,"serializedValueSize":8,"key":9,"value":"hello:10"}
record:{"topic":"test-1","partition":1,"offset":23,"timestamp":1505629339520,"timestampType":"CREATE_TIME","checksum":2285896101,"serializedKeySize":4,"serializedValueSize":8,"key":10,"value":"hello:11"}
record:{"topic":"test-1","partition":1,"offset":24,"timestamp":1505629339522,"timestampType":"CREATE_TIME","checksum":672893159,"serializedKeySize":4,"serializedValueSize":8,"key":11,"value":"hello:12"}
record:{"topic":"test-1","partition":1,"offset":25,"timestamp":1505629339522,"timestampType":"CREATE_TIME","checksum":1637741071,"serializedKeySize":4,"serializedValueSize":8,"key":13,"value":"hello:14"}
record:{"topic":"test-1","partition":2,"offset":33,"timestamp":1505629339543,"timestampType":"CREATE_TIME","checksum":3620398696,"serializedKeySize":4,"serializedValueSize":8,"key":51,"value":"hello:52"}
record:{"topic":"test-1","partition":2,"offset":34,"timestamp":1505629339545,"timestampType":"CREATE_TIME","checksum":242342934,"serializedKeySize":4,"serializedValueSize":8,"key":52,"value":"hello:53"}
record:{"topic":"test-1","partition":2,"offset":35,"timestamp":1505629339547,"timestampType":"CREATE_TIME","checksum":2840039757,"serializedKeySize":4,"serializedValueSize":8,"key":53,"value":"hello:54"}
record:{"topic":"test-1","partition":1,"offset":26,"timestamp":1505629339526,"timestampType":"CREATE_TIME","checksum":464649674,"serializedKeySize":4,"serializedValueSize":8,"key":20,"value":"hello:21"}
record:{"topic":"test-1","partition":1,"offset":27,"timestamp":1505629339526,"timestampType":"CREATE_TIME","checksum":3591464331,"serializedKeySize":4,"serializedValueSize":8,"key":23,"value":"hello:24"}
record:{"topic":"test-1","partition":1,"offset":28,"timestamp":1505629339526,"timestampType":"CREATE_TIME","checksum":2254864424,"serializedKeySize":4,"serializedValueSize":8,"key":24,"value":"hello:25"}
record:{"topic":"test-1","partition":1,"offset":29,"timestamp":1505629339527,"timestampType":"CREATE_TIME","checksum":3670479813,"serializedKeySize":4,"serializedValueSize":8,"key":27,"value":"hello:28"}
record:{"topic":"test-1","partition":1,"offset":30,"timestamp":1505629339527,"timestampType":"CREATE_TIME","checksum":1843557739,"serializedKeySize":4,"serializedValueSize":8,"key":29,"value":"hello:30"}
record:{"topic":"test-1","partition":1,"offset":31,"timestamp":1505629339528,"timestampType":"CREATE_TIME","checksum":1905538768,"serializedKeySize":4,"serializedValueSize":8,"key":34,"value":"hello:35"}
record:{"topic":"test-1","partition":1,"offset":32,"timestamp":1505629339529,"timestampType":"CREATE_TIME","checksum":3985428395,"serializedKeySize":4,"serializedValueSize":8,"key":37,"value":"hello:38"}
record:{"topic":"test-1","partition":1,"offset":33,"timestamp":1505629339529,"timestampType":"CREATE_TIME","checksum":3427427349,"serializedKeySize":4,"serializedValueSize":8,"key":39,"value":"hello:40"}
record:{"topic":"test-1","partition":1,"offset":34,"timestamp":1505629339530,"timestampType":"CREATE_TIME","checksum":713267988,"serializedKeySize":4,"serializedValueSize":8,"key":43,"value":"hello:44"}
record:{"topic":"test-1","partition":1,"offset":35,"timestamp":1505629339536,"timestampType":"CREATE_TIME","checksum":813675607,"serializedKeySize":4,"serializedValueSize":8,"key":49,"value":"hello:50"}
record:{"topic":"test-1","partition":1,"offset":36,"timestamp":1505629339541,"timestampType":"CREATE_TIME","checksum":2006019882,"serializedKeySize":4,"serializedValueSize":8,"key":50,"value":"hello:51"}