使用java操作zookeeper實現kafka-topics.sh的功能
阿新 • • 發佈:2018-12-13
使用java操作zookeeper實現kafka-topics.sh的功能
需求
因為工作要求,需要對測試環境的kafka的所有topic增加分割槽。因為topic很多,所以手動使用命令列實現不太現實(寫這篇文章的時候忽然想起來也可以實現。只需要寫個指令碼,加一個迴圈就可以了,好像還簡單一點-_-||)。所以尋求通過程式碼連線客戶端的方式實現。
程式碼如下
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.utils.ZkUtils;
import org.apache.kafka.common.security.JaasUtils;
import org.junit.Test;
import scala.collection.JavaConversions;
import scala.collection.Map;
import scala.collection.Seq;
import java.util.List;
import java.util.Properties;
/**
* Created by NightWatch on 2018/12/11.
*/
public class TopicSh {
private static String hostAndPort = "10.1.24.216:2181";
@Test
//建立topic
public void createTopic() {
ZkUtils zkUtils = ZkUtils.apply(hostAndPort, 30000, 30000, JaasUtils.isZkSecurityEnabled());
AdminUtils.createTopic(zkUtils, "topic-name", 1, 1, new Properties(), RackAwareMode. Enforced$.MODULE$);
}
@Test
//獲取所有topic
public void listTopicOne() {
ZkUtils zkUtils = ZkUtils.apply(hostAndPort, 30000, 30000, JaasUtils.isZkSecurityEnabled());
Map<String, Properties> map = AdminUtils.fetchAllTopicConfigs(zkUtils);
java.util.Map<String, Properties> javaMap = JavaConversions.asJavaMap(map);
for (java.util.Map.Entry<String, Properties> entry : javaMap.entrySet()) {
//topic名稱
String topic = entry.getKey();
System.out.println(topic);
//TODO : 這個value我暫時也沒搞懂是啥,等空閒了在研究吧
Properties value = entry.getValue();
}
}
@Test
//獲取所有topic,這種寫法更簡單一些
public void listTopicTwo() {
ZkUtils zkUtils = ZkUtils.apply(hostAndPort, 30000, 30000, JaasUtils.isZkSecurityEnabled());
Seq<String> allTopics = zkUtils.getAllTopics();
List<String> topicsList = JavaConversions.seqAsJavaList(allTopics);
//TODO: 這個寫法我也沒搞懂,同上
topicsList.forEach(System.out::println);
}
@Test
//修改topic分割槽數
public void alterTopicPartition() {
ZkUtils zkUtils = ZkUtils.apply(hostAndPort, 30000, 30000, JaasUtils.isZkSecurityEnabled());
List<String> topics = JavaConversions.seqAsJavaList(zkUtils.getAllTopics());
for (String topic : topics) {
AdminUtils.addPartitions(zkUtils, topic, 4, "", true, RackAwareMode.Enforced$.MODULE$);
}
zkUtils.close();
}
}
遇到的問題
執行之後發現報瞭如下錯誤:
org.I0Itec.zkclient.exception.ZkException: org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /brokers/ids
at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:68)
at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:1000)
at org.I0Itec.zkclient.ZkClient.getChildren(ZkClient.java:676)
at org.I0Itec.zkclient.ZkClient.getChildren(ZkClient.java:672)
at kafka.utils.ZkUtils.getChildrenParentMayNotExist(ZkUtils.scala:568)
at kafka.utils.ZkUtils.getAllBrokersInCluster(ZkUtils.scala:161)
at kafka.admin.AdminUtils$.getBrokerMetadatas(AdminUtils.scala:380)
at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:402)
at kafka.admin.AdminUtils.createTopic(AdminUtils.scala)
at com.bicon.kafka.TopicSh.createTopic(TopicSh.java:26)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:119)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:234)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:74)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /brokers/ids
at org.apache.zookeeper.KeeperException.create(KeeperException.java:113)
at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
at org.apache.zookeeper.ZooKeeper.getChildren(ZooKeeper.java:1472)
at org.apache.zookeeper.ZooKeeper.getChildren(ZooKeeper.java:1500)
at org.I0Itec.zkclient.ZkConnection.getChildren(ZkConnection.java:119)
at org.I0Itec.zkclient.ZkClient$4.call(ZkClient.java:679)
at org.I0Itec.zkclient.ZkClient$4.call(ZkClient.java:676)
at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:990)
... 35 more
錯誤顯示沒有認證,一開始我推測可能是檔案許可權的問題
於是我看了一下zookeeper的datadir目錄
[[email protected] zookeeper]# ll
總用量 4
drwxr-xr-x 2 root root 4096 12月 7 09:41 version-2
發現非root使用者果然沒有寫的許可權,但是我又覺得不會是這個問題,然後我嘗試百度了一下,發現好像還真不是這個原因。
原來zookeeper會設定acl,登入zookeeper命令列檢視
[[email protected] kafka_2.11-0.11.0.1]# ./bin/zookeeper-shell.sh 28.2.5.89:2181
Connecting to 28.2.5.89:2181
Welcome to ZooKeeper!
JLine support is disabled
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
ls /
[cluster, controller, brokers, zookeeper, kafka-acl, kafka-acl-changes, admin, isr_change_notification, controller_epoch, kafka-manager, consumers, latest_producer_id_block, config]
getAcl /brokers/ids
'ip,'10.1.24.216
: cdrwa
果然被誰設定了許可權。。於是修改許可權
setAcl /brokers/ids world:anyone:cdrwa
cZxid = 0x5
ctime = Wed Dec 06 20:17:05 CST 2017
mZxid = 0x5
mtime = Wed Dec 06 20:17:05 CST 2017
pZxid = 0x1fd7
cversion = 63
dataVersion = 0
aclVersion = 2
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1
getAcl /brokers/ids
'world,'anyone
: cdrwa
在此執行程式,成功!
注:因為我這裡是測試環境,所以隨便修改許可權配置沒啥問題,不過大家還是別學我
最後貼上提醒了我的文章 https://my.oschina.net/anxiaole/blog/1814143