1. 程式人生 > >kafka java 主題及分割槽 副本操作程式碼

kafka java 主題及分割槽 副本操作程式碼

package com.jingshan.topic;

import java.util.Properties;

import org.apache.kafka.common.security.JaasUtils;

import kafka.admin.AdminUtils;
import kafka.admin.BrokerMetadata;
import kafka.server.ConfigType;
import kafka.utils.ZkUtils;
import scala.collection.Map;
import scala.collection.Seq;

/**
 * 主題管理: 例項化 ZkUtils 物件 建立主題:AdminUtils.createTopic()建立主題
 * 
 * @author Administrator
 *
 */
public class MyTopic {
	// 系統引數定義
	// ZK 連線位置 可以多個
	// private final static String ZK_CONNECT = "192.168.246.100:2181,192.168.246.101:2181";
	private final static String ZK_CONNECT = "192.168.246.100:2181";
	// session 過期時間
	private final static int SESSION_TIMEOUT = 30000;
	// 連線超時時間
	private final static int CONNECT_TIMEOUT = 30000;
	
	
	// 入口函式
	public static void main(String[] args) {
		System.out.println("MyTopic.main()");
		Properties properties = new Properties();
		// 新建主題
		// createTopic("kafka-top-4", 1, 1, properties );
		// 修改主題
		// modifyTopicConfig("kafka-top-4",properties);
		// 刪除主題 
		// delTopic("kafka-top-4");
		// 調整分割槽
		// modifyPartition("kafka-top-3",1,1);
		// 增加分割槽  因為只有一臺機器,這個就不演示了
		// addPartition("kafka-top-3",2,"1:1,2:2");
	}
	// 建立主題
	/**
	 * 
	 * @param topic 主題名稱
	 * @param partition 分割槽數量
	 * @param repilca 副本數量
	 * @param properties 配置引數
	 * 預設的建立主題的指令碼為:
	 * kafka-topics.sh --create --zookeeper server-1:2181,server-2:2181,server-3:2181 --replication-factor 2 --partition 3 --topic kafka-action
 	 */
	public static void createTopic(String topic,int partition, int repilca,Properties properties) {
		// 初始化 ZkUtils 工具物件,方便主題管理
		ZkUtils zkUtils = null;
		try {
			// 嘗試建立主題
			// 例項化 主題  按照 當前配置的引數
			zkUtils = ZkUtils.apply(ZK_CONNECT, SESSION_TIMEOUT,CONNECT_TIMEOUT,JaasUtils.isZkSecurityEnabled());
			// AdminUtils 管理工具
			if(!AdminUtils.topicExists(zkUtils, topic)) {
				// 不存在主題 具體進行建立
				AdminUtils.createTopic(zkUtils, topic, partition, repilca, properties, AdminUtils.createTopic$default$6());			
			}else {
				// 存在了 怎麼著
				System.out.println("Topic: "+topic+" is all exist! ");
			}
		} catch (Exception e) {
			// TODO: 異常錯誤資訊處理
			e.printStackTrace();
		} finally {
			// TODO: 無論怎麼著都執行
			zkUtils.close();
		}
		
	}
	// 修改主題 級別配置
	/**
	 * 根據主題名字,以及引數進行修改。
	 * @param topic
	 * @param properties
	 */
	public static void modifyTopicConfig(String topic,Properties properties) {
		ZkUtils zkUtils = null;
		try {
			// 例項化 ZkUtils 
			zkUtils = ZkUtils.apply(ZK_CONNECT, SESSION_TIMEOUT,CONNECT_TIMEOUT,JaasUtils.isZkSecurityEnabled());
			// 首先獲取當前已有的配置,這裡是查詢主題級別的配置,因此指定配置型別為Topic
			Properties curProp = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(),topic);
			// 融合最新的主題配置
			curProp.putAll(properties);
			// 替換當前 主題配置執行資訊
			AdminUtils.changeTopicConfig(zkUtils, topic, curProp);
		} catch (Exception e) {
			// TODO: 異常錯誤資訊處理
			e.printStackTrace();
		} finally {
			// TODO: 無論怎麼著都執行
			zkUtils.close();
		}
	}
	
	// 刪除主題
	/**
	 * 
	 * @param topic
	 */
	public static void delTopic(String topic) {
		ZkUtils zkUtils = null;
		try {
			// 例項化 ZkUtils 關鍵是 連線地址
			zkUtils = ZkUtils.apply(ZK_CONNECT, SESSION_TIMEOUT,CONNECT_TIMEOUT,JaasUtils.isZkSecurityEnabled());
			// 執行刪除指定主題動作
			AdminUtils.deleteTopic(zkUtils, topic);
		} catch (Exception e) {
			// TODO: 異常錯誤資訊處理
			e.printStackTrace();
		} finally {
			// TODO: 無論怎麼著都執行
			zkUtils.close();
		}
	}
	// 增加分割槽
	/**
	 * 
	 * @param topic 主題名稱
	 * @param partition 分割槽總數,這個直接是最終的數量
	 * @param partitions 副本分配方案 格式為 "2:1,3:1" 這個表示的是2個分割槽分別對應的副本情況,0分割槽對應brokerid為2,1的,1分割槽對應的副本ID為 3,1
	 * 不同的分割槽的副本用逗號分隔,同一個分割槽的多個副本之間用冒號分隔
	 * 同時需要注意的是,副本分配方案要包括已有分割槽的副本分配資訊,根據分配順序從左到右依次與分割槽對應,分割槽編號遞增
	 */
	public static void addPartition(String topic,int partition,String partitions) {
		ZkUtils zkUtils = null;
		try {
			// 例項化 ZkUtils 關鍵是 連線地址
			zkUtils = ZkUtils.apply(ZK_CONNECT, SESSION_TIMEOUT,CONNECT_TIMEOUT,JaasUtils.isZkSecurityEnabled());
			// 執行刪除指定主題動作
			AdminUtils.addPartitions(zkUtils, topic, partition, partitions, true, AdminUtils.addPartitions$default$6());
		} catch (Exception e) {
			// TODO: 異常錯誤資訊處理
			e.printStackTrace();
		} finally {
			// TODO: 無論怎麼著都執行
			zkUtils.close();
		}		
	}
	
	// 分割槽副本重新配置
	/**
	 * 
	 * @param topic  主題
	 * @param partition 分割槽數
	 * @param repilca 副本數
	 * 通過修改,達成重新分配的目的
	 * 步驟:
	 * 1 例項化 ZkUtils
	 * 2 獲取代理元資料 (BrokerMetadata) 資訊
	 * 3 生成分割槽副本分配方案,當然也可以自定義分配方案
	 * 4 呼叫 createOrUpdateTopicPartitionAssignmentPathInZK()方法完成副本分配
	 * 5 釋放與 zookeeper的連線
	 */
	public static void modifyPartition(String topic,int partition, int repilca) {
		ZkUtils zkUtils = null;
		try {
			// 1 例項化 ZkUtils
			zkUtils = ZkUtils.apply(ZK_CONNECT, SESSION_TIMEOUT,CONNECT_TIMEOUT,JaasUtils.isZkSecurityEnabled());
			// 2 獲取代理元資料 (BrokerMetadata) 資訊
			Seq<BrokerMetadata> brokerMetadata = AdminUtils.getBrokerMetadatas(zkUtils, AdminUtils.getBrokerMetadatas$default$2(), AdminUtils.getBrokerMetadatas$default$3());
			// 3 生成分割槽副本分配方案,當然也可以自定義分配方案
			Map<Object, Seq<Object>> replicaAssign = AdminUtils.assignReplicasToBrokers(brokerMetadata, partition, repilca, AdminUtils.assignReplicasToBrokers$default$4(), AdminUtils.assignReplicasToBrokers$default$5());
			// 4 呼叫 createOrUpdateTopicPartitionAssignmentPathInZK()方法完成副本分配
			AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, replicaAssign, null, true);
		} catch (Exception e) {
			// TODO: 異常錯誤資訊處理
			e.printStackTrace();
		} finally {
			// TODO: 無論怎麼著都執行
			// 5 釋放與 zookeeper的連線
			zkUtils.close();
		}	
	}
	
}