Kafka 核心 API ==> AdminClient API
一、Kafka 核心 API
上文中對 Kafka 做了一些簡單的介紹,那麼在開發過程中我們如何去訪問 Kafka 呢?這就需要使用到本文將要介紹的Kafka客戶端API。下圖是官方文件中的一個圖,形象的描述了能與 Kafka整合的客戶端型別
Kafka的五類客戶端API型別如下:
- AdminClient API:允許管理和檢測Topic、broker以及其他Kafka例項,與Kafka自帶的指令碼命令作用類似
- Producer API:釋出訊息到1個或多個Topic,也就是生產者或者說釋出方需要用到的API
- Consumer API:訂閱1個或多個Topic,並處理產生的訊息,也就是消費者或者說訂閱方需要用到的API
- Stream API:高效地將輸入流轉換到輸出流,通常應用在一些流處理場景
- Connector API:從一些源系統或應用程式拉取資料到Kafka,如上圖中的DB
本文中,我們將主要介紹 AdminClient API。
二、AdminClient API
顯然,操作AdminClient API的前提是需要建立一個 AdminClient
例項。程式碼示例:
/** * 建立AdminClient客戶端物件 */ public static AdminClient createAdminClientByProperties() { Properties prop = newProperties(); // 配置Kafka服務的訪問地址及埠號 prop.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.182.128:9092"); // 建立AdminClient例項 return AdminClient.create(prop); } /** * 建立AdminClient的第二種方式 */ public static AdminClient createAdminClientByMap(){ Map<String, Object> conf = Maps.newHashMap();// 配置Kafka服務的訪問地址及埠號 conf.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.182.128:9092"); // 建立AdminClient例項 return AdminClient.create(conf); }
建立了 AdminClient
的例項物件後,我們就可以通過它提供的方法操作 Kafka,常用的方法如下:
方法名稱 | 作用 |
createTopics | 建立一個或多個Topic |
listTopics | 查詢Topic列表 |
deleteTopics | 刪除一個或多個Topic |
describeTopics | 查詢Topic的描述資訊 |
describeConfigs | 查詢Topic、Broker等的所有配置項資訊 |
alterConfigs | 用於修改Topic、Broker等的配置項資訊(該方法在新版本中被標記為已過期) |
incrementalAlterConfigs | 同樣也是用於修改Topic、Broker等的配置項資訊,但功能更多、更靈活,用於代替alterConfigs |
createPartitions | 用於調整Topic的Partition數量,只能增加不能減少或刪除,也就是說新設定的Partition數量必須大於等於之前的Partition數量 |
Tips:
describeTopics
和describeConfigs
的意義主要是在監控上,很多用於監控Kafka的元件都會使用到這兩個API,因為通過這兩個API可以獲取到Topic自身和周邊的詳細資訊
三、建立 Topic
使用createTopics
方法可以建立Topic,傳入的引數也與kafka-topics.sh
命令指令碼的引數一樣。程式碼示例:
/** * 建立一個或多個topic * * @param topicNames topic名稱的集合 */ public static void createTopic(List<String> topicNames) throws Exception { // 建立AdminClient客戶端物件 AdminClient adminClient = BuildAdminClient.createAdminClientByMap(); List<NewTopic> topicList = Lists.newArrayList(); /** * 定義topic資訊 * String name topic名 * int numPartitions 分割槽數 * short replicationFactor 副本數,必須不能大於broker數量 */ topicNames.forEach(topicName -> topicList.add( new NewTopic(topicName, 1, Short.parseShort("1")))); // 建立topic CreateTopicsResult result = adminClient.createTopics(topicList); // get方法是一個阻塞方法,一定要等到createTopics完成之後才進行下一步操作 result.all().get(); // 列印新建立的topic名 result.values().forEach((name, future) -> System.out.println("topicName:" + name)); // 關閉資源 adminClient.close(); }
四、刪除 Topic
deleteTopics
方法可以刪除一個或多個Topic,程式碼示例:
/** * 刪除一個或多個topic * * @param topicNames topic名稱的集合 */ public static void removeTopic(List<String> topicNames) throws Exception { // 建立AdminClient客戶端物件 AdminClient adminClient = BuildAdminClient.createAdminClientByProperties(); // 刪除topic集合 DeleteTopicsResult result = adminClient.deleteTopics(topicNames); // get方法是一個阻塞方法,一定要等到deleteTopics完成之後才進行下一步操作 result.all().get(); // 關閉資源 adminClient.close(); }
五、檢視 Topics 列表
listTopics
方法用於查詢Topic列表,通過傳入 ListTopicsOptions
引數可以設定一些可選項。程式碼示例:
/** * 獲取所有的topic資訊,包括Kafka內部的topic * 如:__consumer_offsets,internal=true */ public static void listTopicsWithOptions() throws Exception { // 建立AdminClient客戶端物件 AdminClient adminClient = BuildAdminClient.createAdminClientByProperties(); ListTopicsOptions options = new ListTopicsOptions(); // 列出內部的Topic options.listInternal(true); // 列出所有的topic ListTopicsResult result = adminClient.listTopics(options); // 獲取所有topic的名字,返回的是一個Set集合 Set<String> topicNames = result.names().get(); // 列印所有topic的名字 topicNames.forEach(System.out::println); // 獲取所有topic的資訊,返回的是一個Collection集合 // (name=hello-kafka, internal=false),internal代表是否為內部的topic Collection<TopicListing> topicListings = result.listings().get(); // 列印所有topic的資訊 topicListings.forEach(System.out::println); // 關閉資源 adminClient.close(); }
六、檢視 Topic 的描述資訊
一個 Topic 會有自身的描述資訊,例如:partition
的數量,副本集的數量,是否為 internal
等等。AdminClient
中提供了 describeTopics
方法來查詢這些描述資訊。程式碼示例:
/** * 獲取topic的描述資訊 * * topic name = a-topic, desc = (name=a-topic, internal=false, partitions=(partition=0, leader=192.168.182.128:9092 (id: 0 rack: null), replicas=192.168.182.128:9092 (id: 0 rack: null), isr=192.168.182.128:9092 (id: 0 rack: null)), authorizedOperations=null) * topic name = b-topic, desc = (name=b-topic, internal=false, partitions=(partition=0, leader=192.168.182.128:9092 (id: 0 rack: null), replicas=192.168.182.128:9092 (id: 0 rack: null), isr=192.168.182.128:9092 (id: 0 rack: null)), authorizedOperations=null) */ public static void describeTopics(List<String> topics) throws Exception { // 建立AdminClient客戶端物件 AdminClient adminClient = BuildAdminClient.createAdminClientByProperties(); // 獲取Topic的描述資訊 DescribeTopicsResult result = adminClient.describeTopics(topics); // 解析描述資訊結果, Map<String, TopicDescription> ==> topicName:topicDescription Map<String, TopicDescription> topicDescriptionMap = result.all().get(); topicDescriptionMap.forEach((topicName, description) -> System.out.printf("topic name = %s, desc = %s \n", topicName, description)); // 關閉資源 adminClient.close(); }
七、檢視 Topic 的配置資訊
除了Kafka自身的配置項外,其內部的Topic也會有非常多的配置項,我們可以通過describeConfigs
方法來獲取某個Topic中的配置項資訊。程式碼示例:
/** * 獲取topic的配置資訊 */ public static void describeConfigTopics(List<String> topicNames) throws Exception { // 建立AdminClient客戶端物件 AdminClient adminClient = BuildAdminClient.createAdminClientByMap(); List<ConfigResource> configResources = Lists.newArrayListWithCapacity(64); topicNames.forEach(topicName -> configResources.add( // 指定要獲取的源 new ConfigResource(ConfigResource.Type.TOPIC, topicName))); // 獲取topic的配置資訊 DescribeConfigsResult result = adminClient.describeConfigs(configResources); // 解析topic的配置資訊 Map<ConfigResource, Config> resourceConfigMap = result.all().get(); resourceConfigMap.forEach((configResource, config) -> System.out.printf("topic config ConfigResource = %s, Config = %s \n", configResource, config)); // 關閉資源 adminClient.close(); }
八、修改 Topic 的分割槽數量
在建立Topic時我們需要設定Partition的數量,但如果覺得初始設定的Partition數量太少了,那麼就可以使用createPartitions
方法來調整Topic的Partition數量,但是需要注意在Kafka中Partition只能增加不能減少。程式碼示例:
/** * 修改topic的分割槽數量 * 只能增加不能減少 */ public static void updateTopicPartition(List<String> topicNames, Integer partitionNum) throws Exception { // 建立AdminClient客戶端物件 AdminClient adminClient = BuildAdminClient.createAdminClientByMap(); // 構建修改分割槽的topic請求引數 Map<String, NewPartitions> newPartitions = Maps.newHashMap(); topicNames.forEach(topicName -> newPartitions.put(topicName, NewPartitions.increaseTo(partitionNum))); // 執行修改 CreatePartitionsResult result = adminClient.createPartitions(newPartitions); // get方法是一個阻塞方法,一定要等到createPartitions完成之後才進行下一步操作 result.all().get(); // 關閉資源 adminClient.close(); }
Tips:
- Partition的索引從0開始,所以第一個
partition=0
,第二個partition=1
九、修改 Topic 配置資訊
除了可以檢視Topic的配置項資訊外,AdminClient
還提供了相關方法來修改Topic配置項的值。在早期版本中,使用alterConfigs
方法來修改配置項。程式碼示例:
/** * 修改topic的配置資訊 * 使用舊版api:alterConfigs */ public static void updateTopicConfigOld(List<String> topicNames) throws Exception { // 建立AdminClient客戶端物件 AdminClient adminClient = BuildAdminClient.createAdminClientByMap(); List<ConfigResource> configResources = Lists.newArrayListWithCapacity(64); // 指定要修改的ConfigResource型別及名稱 topicNames.forEach(topicName -> configResources.add(new ConfigResource(ConfigResource.Type.TOPIC, topicName))); // 建立修改的配置項,配置項以ConfigEntry形式存在 Config config = new Config(Collections.singletonList(new ConfigEntry("preallocate", "true"))); // 引數構造 Map<ConfigResource, Config> configMap = Maps.newHashMap(); configResources.forEach(configResource -> configMap.put(configResource, config)); // 修改topic 配置,用的是老api,已經過時 AlterConfigsResult result = adminClient.alterConfigs(configMap); // get方法是一個阻塞方法,一定要等到alterConfigs完成之後才進行下一步操作 result.all().get(); // 關閉資源 adminClient.close(); }
執行以上程式碼,成功將 topic 的配置項 preallocate
的值改為了 true。
在新版本中則是使用 incrementalAlterConfigs
方法來修改Topic的配置項,該方法使用起來相對於 alterConfigs
要略微複雜一些,但因此功能更多、更靈活。程式碼示例:
/** * 修改topic的配置資訊 * 使用新版api:incrementalAlterConfigs */ public static void updateTopicConfigNew(List<String> topicNames) throws Exception { // 建立AdminClient客戶端物件 AdminClient adminClient = BuildAdminClient.createAdminClientByMap(); List<ConfigResource> configResources = Lists.newArrayListWithCapacity(64); // 指定要修改的ConfigResource型別及名稱 topicNames.forEach(topicName -> configResources.add(new ConfigResource(ConfigResource.Type.TOPIC, topicName))); // 配置項同樣以ConfigEntry形式存在,只不過增加了操作型別 // 以及能夠支援操作多個配置項,相對來說功能更多、更靈活 Collection<AlterConfigOp> configs = Lists.newArrayList( new AlterConfigOp(new ConfigEntry("preallocate", "false"), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry("min.cleanable.dirty.ratio", "0.5"), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry("unclean.leader.election.enable", "false"), AlterConfigOp.OpType.SET)); // 引數構造 Map<ConfigResource, Collection<AlterConfigOp>> configMaps = Maps.newHashMap(); configResources.forEach(configResource -> configMaps.put(configResource, configs)); // 下面這個是新api.但是有些麻煩 // 在某些版本中,incrementalAlterConfigs方法可能會存在些問題,對單例項的Kafka支援得不是很好,會出現無法成功修改配置項的情況 AlterConfigsResult result = adminClient.incrementalAlterConfigs(configMaps); // get方法是一個阻塞方法,一定要等到incrementalAlterConfigs完成之後才進行下一步操作 result.all().get(); // 關閉資源 adminClient.close(); }Tips:
- 在某些版本中,
incrementalAlterConfigs
方法可能會存在些問題,對單例項的 Kafka 支援得不是很好,會出現無法成功修改配置項的情況,此時就可以使用alterConfigs
方法來代替。
執行以上程式碼,修改了三個配置項的值:preallocate、min.cleanable.dirty.ratio 和unclean.leader.election.enable。