kafka java 主題及分割槽 副本操作程式碼
阿新 • • 發佈:2019-01-05
package com.jingshan.topic; import java.util.Properties; import org.apache.kafka.common.security.JaasUtils; import kafka.admin.AdminUtils; import kafka.admin.BrokerMetadata; import kafka.server.ConfigType; import kafka.utils.ZkUtils; import scala.collection.Map; import scala.collection.Seq; /** * 主題管理: 例項化 ZkUtils 物件 建立主題:AdminUtils.createTopic()建立主題 * * @author Administrator * */ public class MyTopic { // 系統引數定義 // ZK 連線位置 可以多個 // private final static String ZK_CONNECT = "192.168.246.100:2181,192.168.246.101:2181"; private final static String ZK_CONNECT = "192.168.246.100:2181"; // session 過期時間 private final static int SESSION_TIMEOUT = 30000; // 連線超時時間 private final static int CONNECT_TIMEOUT = 30000; // 入口函式 public static void main(String[] args) { System.out.println("MyTopic.main()"); Properties properties = new Properties(); // 新建主題 // createTopic("kafka-top-4", 1, 1, properties ); // 修改主題 // modifyTopicConfig("kafka-top-4",properties); // 刪除主題 // delTopic("kafka-top-4"); // 調整分割槽 // modifyPartition("kafka-top-3",1,1); // 增加分割槽 因為只有一臺機器,這個就不演示了 // addPartition("kafka-top-3",2,"1:1,2:2"); } // 建立主題 /** * * @param topic 主題名稱 * @param partition 分割槽數量 * @param repilca 副本數量 * @param properties 配置引數 * 預設的建立主題的指令碼為: * kafka-topics.sh --create --zookeeper server-1:2181,server-2:2181,server-3:2181 --replication-factor 2 --partition 3 --topic kafka-action */ public static void createTopic(String topic,int partition, int repilca,Properties properties) { // 初始化 ZkUtils 工具物件,方便主題管理 ZkUtils zkUtils = null; try { // 嘗試建立主題 // 例項化 主題 按照 當前配置的引數 zkUtils = ZkUtils.apply(ZK_CONNECT, SESSION_TIMEOUT,CONNECT_TIMEOUT,JaasUtils.isZkSecurityEnabled()); // AdminUtils 管理工具 if(!AdminUtils.topicExists(zkUtils, topic)) { // 不存在主題 具體進行建立 AdminUtils.createTopic(zkUtils, topic, partition, repilca, properties, AdminUtils.createTopic$default$6()); }else { // 存在了 怎麼著 System.out.println("Topic: "+topic+" is all exist! "); } } catch (Exception e) { // TODO: 異常錯誤資訊處理 e.printStackTrace(); } finally { // TODO: 無論怎麼著都執行 zkUtils.close(); } } // 修改主題 級別配置 /** * 根據主題名字,以及引數進行修改。 * @param topic * @param properties */ public static void modifyTopicConfig(String topic,Properties properties) { ZkUtils zkUtils = null; try { // 例項化 ZkUtils zkUtils = ZkUtils.apply(ZK_CONNECT, SESSION_TIMEOUT,CONNECT_TIMEOUT,JaasUtils.isZkSecurityEnabled()); // 首先獲取當前已有的配置,這裡是查詢主題級別的配置,因此指定配置型別為Topic Properties curProp = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(),topic); // 融合最新的主題配置 curProp.putAll(properties); // 替換當前 主題配置執行資訊 AdminUtils.changeTopicConfig(zkUtils, topic, curProp); } catch (Exception e) { // TODO: 異常錯誤資訊處理 e.printStackTrace(); } finally { // TODO: 無論怎麼著都執行 zkUtils.close(); } } // 刪除主題 /** * * @param topic */ public static void delTopic(String topic) { ZkUtils zkUtils = null; try { // 例項化 ZkUtils 關鍵是 連線地址 zkUtils = ZkUtils.apply(ZK_CONNECT, SESSION_TIMEOUT,CONNECT_TIMEOUT,JaasUtils.isZkSecurityEnabled()); // 執行刪除指定主題動作 AdminUtils.deleteTopic(zkUtils, topic); } catch (Exception e) { // TODO: 異常錯誤資訊處理 e.printStackTrace(); } finally { // TODO: 無論怎麼著都執行 zkUtils.close(); } } // 增加分割槽 /** * * @param topic 主題名稱 * @param partition 分割槽總數,這個直接是最終的數量 * @param partitions 副本分配方案 格式為 "2:1,3:1" 這個表示的是2個分割槽分別對應的副本情況,0分割槽對應brokerid為2,1的,1分割槽對應的副本ID為 3,1 * 不同的分割槽的副本用逗號分隔,同一個分割槽的多個副本之間用冒號分隔 * 同時需要注意的是,副本分配方案要包括已有分割槽的副本分配資訊,根據分配順序從左到右依次與分割槽對應,分割槽編號遞增 */ public static void addPartition(String topic,int partition,String partitions) { ZkUtils zkUtils = null; try { // 例項化 ZkUtils 關鍵是 連線地址 zkUtils = ZkUtils.apply(ZK_CONNECT, SESSION_TIMEOUT,CONNECT_TIMEOUT,JaasUtils.isZkSecurityEnabled()); // 執行刪除指定主題動作 AdminUtils.addPartitions(zkUtils, topic, partition, partitions, true, AdminUtils.addPartitions$default$6()); } catch (Exception e) { // TODO: 異常錯誤資訊處理 e.printStackTrace(); } finally { // TODO: 無論怎麼著都執行 zkUtils.close(); } } // 分割槽副本重新配置 /** * * @param topic 主題 * @param partition 分割槽數 * @param repilca 副本數 * 通過修改,達成重新分配的目的 * 步驟: * 1 例項化 ZkUtils * 2 獲取代理元資料 (BrokerMetadata) 資訊 * 3 生成分割槽副本分配方案,當然也可以自定義分配方案 * 4 呼叫 createOrUpdateTopicPartitionAssignmentPathInZK()方法完成副本分配 * 5 釋放與 zookeeper的連線 */ public static void modifyPartition(String topic,int partition, int repilca) { ZkUtils zkUtils = null; try { // 1 例項化 ZkUtils zkUtils = ZkUtils.apply(ZK_CONNECT, SESSION_TIMEOUT,CONNECT_TIMEOUT,JaasUtils.isZkSecurityEnabled()); // 2 獲取代理元資料 (BrokerMetadata) 資訊 Seq<BrokerMetadata> brokerMetadata = AdminUtils.getBrokerMetadatas(zkUtils, AdminUtils.getBrokerMetadatas$default$2(), AdminUtils.getBrokerMetadatas$default$3()); // 3 生成分割槽副本分配方案,當然也可以自定義分配方案 Map<Object, Seq<Object>> replicaAssign = AdminUtils.assignReplicasToBrokers(brokerMetadata, partition, repilca, AdminUtils.assignReplicasToBrokers$default$4(), AdminUtils.assignReplicasToBrokers$default$5()); // 4 呼叫 createOrUpdateTopicPartitionAssignmentPathInZK()方法完成副本分配 AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, replicaAssign, null, true); } catch (Exception e) { // TODO: 異常錯誤資訊處理 e.printStackTrace(); } finally { // TODO: 無論怎麼著都執行 // 5 釋放與 zookeeper的連線 zkUtils.close(); } } }