1. 程式人生 > >Kafka Tools

Kafka Tools

display logs 集群擴展 foo copy doc str pad oca

參考,

https://cwiki.apache.org/confluence/display/KAFKA/System+Tools

https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools

http://kafka.apache.org/documentation.html#quickstart

http://kafka.apache.org/documentation.html#operations

為了便於使用。kafka提供了比較強大的Tools。把常常須要使用的整理一下

開關kafka Server

bin/kafka-server-start.sh config/server.properties
bin/kafka-server-stop.sh
JMX_PORT=9999 nohup bin/kafka-server-start.sh config/server.properties &

topic相關

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
bin/kafka-topics.sh --list --zookeeper localhost:2181

describe topic的具體情況

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

改動topic的partition,僅僅能添加

bin/kafka-topics.sh --alter --zookeeper localhost:2181 --partitions 3 --topic test

到0.8.2才正式支持刪除topic,當前是beta版

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name

查看有問題的partition

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --unavailable-partitions --topic test
per-topic 改動參數
> bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 1 
        --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1
> bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic 
    --config max.message.bytes=128000
> bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic 
    --deleteConfig max.message.bytes

集群擴展
集群擴展。對於broker還是比較簡單的,可是現有的topic上的partition是不會做自己主動遷移的
須要手工做遷移,但kafka提供了比較方便的工具。

--generate,生成參考的遷移計劃
given a list of topics and a list of brokers,工具會給出遷徙方案

把topic全然遷移到新的brokers

> cat topics-to-move.json
{"topics": [{"topic": "foo1"},
            {"topic": "foo2"}],
 "version":1
}
技術分享
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate 
Current partition replica assignment

{"version":1,
 "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
               {"topic":"foo1","partition":0,"replicas":[3,4]},
               {"topic":"foo2","partition":2,"replicas":[1,2]},
               {"topic":"foo2","partition":0,"replicas":[3,4]},
               {"topic":"foo1","partition":1,"replicas":[2,3]},
               {"topic":"foo2","partition":1,"replicas":[2,3]}]
}

Proposed partition reassignment configuration

{"version":1,
 "partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
               {"topic":"foo1","partition":0,"replicas":[5,6]},
               {"topic":"foo2","partition":2,"replicas":[5,6]},
               {"topic":"foo2","partition":0,"replicas":[5,6]},
               {"topic":"foo1","partition":1,"replicas":[5,6]},
               {"topic":"foo2","partition":1,"replicas":[5,6]}]
}
技術分享

給出當前的assignment情況和,遷移方案

我們能夠同一時候保存當前的assignment情況和遷移方案。當前的assignment情況能夠用於rollback

--execute,開始運行遷移

技術分享
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute
Current partition replica assignment

{"version":1,
 "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
               {"topic":"foo1","partition":0,"replicas":[3,4]},
               {"topic":"foo2","partition":2,"replicas":[1,2]},
               {"topic":"foo2","partition":0,"replicas":[3,4]},
               {"topic":"foo1","partition":1,"replicas":[2,3]},
               {"topic":"foo2","partition":1,"replicas":[2,3]}]
}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
 "partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
               {"topic":"foo1","partition":0,"replicas":[5,6]},
               {"topic":"foo2","partition":2,"replicas":[5,6]},
               {"topic":"foo2","partition":0,"replicas":[5,6]},
               {"topic":"foo1","partition":1,"replicas":[5,6]},
               {"topic":"foo2","partition":1,"replicas":[5,6]}]
}
技術分享

--verify,check當前的遷移狀態

技術分享
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition [foo1,0] completed successfully
Reassignment of partition [foo1,1] is in progress
Reassignment of partition [foo1,2] is in progress
Reassignment of partition [foo2,0] completed successfully
Reassignment of partition [foo2,1] completed successfully 
Reassignment of partition [foo2,2] completed successfully
技術分享

選擇topic的某個partition的某些replica進行遷徙

moves partition 0 of topic foo1 to brokers 5,6 and partition 1 of topic foo2 to brokers 2,3

> cat custom-reassignment.json
{"version":1,"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},{"topic":"foo2","partition":1,"replicas":[2,3]}]}
技術分享
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --execute
Current partition replica assignment

{"version":1,
 "partitions":[{"topic":"foo1","partition":0,"replicas":[1,2]},
               {"topic":"foo2","partition":1,"replicas":[3,4]}]
}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
 "partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},
               {"topic":"foo2","partition":1,"replicas":[2,3]}]
}
技術分享

brokers下線

當前版本號不支持下線的規劃,須要到0.8.2才支持,這須要把一個broker上的replica清空

添加replication factor

partition 0的replica數從1增長到3,當前replica存在broker5,在broker6,7上添加replica

> cat increase-replication-factor.json
{"version":1,
 "partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}
技術分享
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute
Current partition replica assignment

{"version":1,
 "partitions":[{"topic":"foo","partition":0,"replicas":[5]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
 "partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}
技術分享

Producer console

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
This is a message
This is another message

後面能夠隨意的輸入message。都會發到broker的topic中

Comsumer console

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

從頭讀這個topic,能夠反復讀到全部數據
我在想為啥,每次都能replay。原來每次都是隨機產生一個groupid
consumerProps.put("group.id","console-consumer-" + new Random().nextInt(100000))

Consumer Offset Checker

這個會顯示出consumer group的offset情況, 必須參數為--group。 不指定--topic,默覺得全部topic

Displays the: Consumer Group, Topic, Partitions, Offset, logSize, Lag, Owner for the specified set of Topics and Consumer Group

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker

required argument: [group]
Option Description
------ -----------
--broker-info Print broker info
--group Consumer group.
--help Print this message.
--topic Comma-separated list of consumer
topics (all topics if absent).
--zkconnect ZooKeeper connect string. (default: localhost:2181)

Example,

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group pv

Group Topic Pid Offset logSize Lag Owner
pv page_visits 0 21 21 0 none
pv page_visits 1 19 19 0 none
pv page_visits 2 20 20 0 none

Export Zookeeper Offsets

將Zk中的offset信息以以下的形式打到file裏面去

A utility that retrieves the offsets of broker partitions in ZK and prints to an output file in the following format:

/consumers/group1/offsets/topic1/1-0:286894308
/consumers/group1/offsets/topic1/2-0:284803985

bin/kafka-run-class.sh kafka.tools.ExportZkOffsets

required argument: [zkconnect]
Option Description
------ -----------
--group Consumer group.
--help Print this message.
--output-file Output file
--zkconnect ZooKeeper connect string. (default: localhost:2181)

Update Offsets In Zookeeper

這個挺實用,用於replay, kafka的文檔有點坑爹,看了不知道咋用。還是看源代碼才看明確

A utility that updates the offset of every broker partition to the offset of earliest or latest log segment file, in ZK.

bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK

USAGE: kafka.tools.UpdateOffsetsInZK$ [earliest | latest] consumer.properties topic

Example,

bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK earliest config/consumer.properties page_visits

Group Topic Pid Offset logSize Lag Owner
pv page_visits 0 0 21 21 none
pv page_visits 1 0 19 19 none
pv page_visits 2 0 20 20 none

能夠看到offset已經被清0。Lag=logSize

更加直接的方式是。直接去Zookeeper裏面看

通過zkCli.sh連上後,通過ls查看

Broker Node Registry

/brokers/ids/[0...N] --> host:port (ephemeral node)

Broker Topic Registry

/brokers/topics/[topic]/[0...N] --> nPartions (ephemeral node)

Consumer Id Registry

/consumers/[group_id]/ids/[consumer_id] --> {"topic1": #streams, ..., "topicN": #streams} (ephemeral node)

Consumer Offset Tracking

/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] --> offset_counter_value ((persistent node)

Partition Owner registry

/consumers/[group_id]/owners/[topic]/[broker_id-partition_id] --> consumer_node_id (ephemeral node)

Kafka Tools