1. 程式人生 > >Kafka 低階API 檢視topic

Kafka 低階API 檢視topic

package com.zsb.test.util;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.Map.Entry;

import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.TopicAndPartition;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;

/**
 * Date: 2016年5月26日 <br>
 * @author zhoushanbin
 */
public class KafkaInfoTools {

	public KafkaInfoTools() {
		
	}
	
	public static long getLastOffset(SimpleConsumer consumer, String topic,
			int partition, long whichTime, String clientName) {
		TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
				partition);
		Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
		requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(
				whichTime, 1));
		kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
				requestInfo, kafka.api.OffsetRequest.CurrentVersion(),
				clientName);
		OffsetResponse response = consumer.getOffsetsBefore(request);

		if (response.hasError()) {
			System.out
					.println("Error fetching data Offset Data the Broker. Reason: "
							+ response.errorCode(topic, partition));
			return 0;
		}
		long[] offsets = response.offsets(topic, partition);
		return offsets[0];
	}

	public static TreeMap<Integer,PartitionMetadata> findLeader(List<String> a_seedBrokers,
			int a_port, String a_topic) {
		TreeMap<Integer, PartitionMetadata> map = new TreeMap<Integer, PartitionMetadata>();
		for (String seed : a_seedBrokers) {
			SimpleConsumer consumer = null;
			try {
				consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024,
						"leaderLookup"+new Date().getTime());
				List<String> topics = Collections.singletonList(a_topic);
				TopicMetadataRequest req = new TopicMetadataRequest(topics);
				kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

				List<TopicMetadata> metaData = resp.topicsMetadata();
				for (TopicMetadata item : metaData) {
					for (PartitionMetadata part : item.partitionsMetadata()) {
						map.put(part.partitionId(), part);
					}
				}
			} catch (Exception e) {
				System.out.println("Error communicating with Broker [" + seed
						+ "] to find Leader for [" + a_topic + ", ] Reason: " + e);
			} finally {
				if (consumer != null)
					consumer.close();
			}
		}
		return map;
	}
	
	public static void main(String[] args) {
		
		
		String topic = "topic-test";
		
		String seed = "10.166.198.20";
		
		int port = 39091;

		List<String> seeds = new ArrayList<String>();
		seeds.add(seed);
		TreeMap<Integer,PartitionMetadata> metadatas = findLeader(seeds, port, topic);
		
		int sum = 0;
		
		for (Entry<Integer,PartitionMetadata> entry : metadatas.entrySet()) {
			int partition = entry.getKey();
			String leadBroker = entry.getValue().leader().host();
			String clientName = "Client_" + topic + "_" + partition;
			SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000,
					64 * 1024, clientName);
			long readOffset = getLastOffset(consumer, topic, partition,
					kafka.api.OffsetRequest.LatestTime(), clientName);
			sum += readOffset;
			System.out.println(partition+":"+readOffset);
			if(consumer!=null)consumer.close();
		}
		System.out.println("總和:"+sum);
	}
}

相關推薦

Kafka 低階API 檢視topic

package com.zsb.test.util; import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.HashMap; imp

Kafka Java API操作topic

Kafka官方提供了兩個指令碼來管理topic,包括topic的增刪改查。其中kafka-topics.sh負責topic的建立與刪除;kafka-configs.sh指令碼負責topic的修改和查詢,但很多使用者都更加傾向於使用程式API的方式對topic進行操作。 (adsby

kafka檢視topic和訊息內容命令

1、查詢topic,進入kafka目錄: bin/kafka-topics.sh --list --zookeeper localhost:2181   2、查詢topic內容: bin/kafka-console-consumer.sh --bootstrap-

這幾天折騰spark的kafka低階API createDirectStream的一些總結。

大家都知道在spark1.3版本後,kafkautil裡面提供了兩個建立dstream的方法,一個是老版本中有的createStream方法,還有一個是後面新加的createDirectStream方法。關於這兩個方法的優缺點,官方已經說的很詳細(http://spark.

Kafka技術內幕:消費者(高階和低階API)和 協調者

生產者傳送訊息時在客戶端就按照節點和Partition進行分組,屬於同一個目標節點的多個Partition會作為同一個請求傳送到服務端,作為目標節點的服務端也可以處理來自不同生產者客戶端的請求。如果從網路層通訊來看,客戶端和服務端都會使用佇列的方式確保順序地客戶端傳送請求

kafka 檢視topic消費

sudo su su kafka 加證書前: cd home/kafka/software/kafka/bin ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic  monitor --fr

Kafka筆記整理(二):Kafka Java API使用

大數據 Kafka Java [TOC] Kafka筆記整理(二):Kafka Java API使用 下面的測試代碼使用的都是下面的topic: $ kafka-topics.sh --describe hadoop --zookeeper uplooking01:2181,uplooking0

Hadoop生態圈-KafkaAPI之生產者-消費者

HA size ron 作品 消費 消費者 hadoop ado 原創                     Hadoop生態圈-Kafka的API之生產者-消費者                                             作者:尹正傑 版權

kafka producer API v0.8.2 是一個分水嶺

kafka v0.8.2是一個分水嶺,對於kafka v0.8.x以前的kafka producer API,用如下的import標頭檔案 //for kafka client before v0.8

Kafka 學習筆記(3)——kafka java API

1 新建maven 工程 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi=

Kafka消費者API簡介

舊版消費者      當前kafka版本還保留著Scala版本的兩套消費者,被稱為舊版消費者。舊版消費者屬於kafka核心模組,分別為SimpleConsumer(低階Low-Level)和ZookeeperConsumerConnector(高階

storm整合kafka新版API(offset In Kafka)示例

本例storm版本為1.1.0  kafka版本為2.11.0.10.0.1 匯入maven依賴 <!--引入storm --> <dependency> <groupId>org.apache.storm&

kafka(2) ===》 broker/topic常規配置

broker: 1.broker.id: 叢集中每一個broker的broker.id需要是唯一的整數,用來標識broker 2.port:啟動埠,預設9092(不建議修改) 3.zookeeper.connect: zookeeper地址      h

storm整合kafka新版API(0.8版本之後)

本例storm版本為1.1.0  kafka版本為2.11.0.10.0.1 匯入maven依賴 <!--引入storm --> <dependency> <groupId>org.apache.storm&l

kafka如何徹底刪除topic及資料

轉載:https://blog.csdn.net/belalds/article/details/80575751 前言: 刪除kafka topic及其資料,嚴格來說並不是很難的操作。但是,往往給kafka 使用者帶來諸多問題。專案組之前接觸過多個開發者,發現都會偶然出現無法

Kafka 如何讀取offset topic內容 (v0.11.0.0前和以後)

眾所周知,由於Zookeeper並不適合大批量的頻繁寫入操作,新版Kafka已推薦將consumer的位移資訊儲存在Kafka內部的topic中,即__consumer_offsets topic,並且預設提供了kafka_consumer_groups.sh指令碼供使用者檢視consumer資訊。

kafka 高階api使用示例

一、基本概念   Kafka集成了Producer/Consumer連線Broker的客戶端工具,但是在訊息處理方面,這兩者主要用於服務端(Broker)的簡單操作,如:     1.建立Topic     2.羅列出已存在的Topic     3.對已有Topic的Produce/Cons

Kafkakafka動態獲取某個topic的partition資訊

現在有這樣一種場景,系統啟動前,預期abc topic建立了一個100個分割槽,大概用個一段時間,之後可能會動態新增分割槽數,這就要求生產者在生產資料時,能夠動態實時的獲取分割槽數,做到及時有效的雜湊生效,讓資料進入新增的分割槽,kafka的jar包裡倒是有這麼一個api可以

十一 kafka資料安全,以及Spark Kafka Streaming API

一基本網址 http://spark.apache.org/docs/1.6.2/api/java/index.html 在API中搜索org.apache.spark.streaming.kafka 二spark對接kafka流兩種方案 在org.apache.spark.streaming.k

kafka如何直接檢視log檔案中的資訊

我們在使用kafka的過程中有時候可以需要檢視我們生產的訊息的各種資訊,這些都是被記錄在卡夫卡的log檔案中的。由於log檔案的特殊格式,我們是無法直接檢視log檔案中的資訊的。本文提供一下方式檢視kafka的log檔案中所記錄的資訊。 執行命令 bin/