Kafka 低階API 檢視topic
package com.zsb.test.util; import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.Map.Entry; import kafka.api.PartitionOffsetRequestInfo; import kafka.common.TopicAndPartition; import kafka.javaapi.OffsetResponse; import kafka.javaapi.PartitionMetadata; import kafka.javaapi.TopicMetadata; import kafka.javaapi.TopicMetadataRequest; import kafka.javaapi.consumer.SimpleConsumer; /** * Date: 2016年5月26日 <br> * @author zhoushanbin */ public class KafkaInfoTools { public KafkaInfoTools() { } public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) { TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo( whichTime, 1)); kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest( requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); OffsetResponse response = consumer.getOffsetsBefore(request); if (response.hasError()) { System.out .println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition)); return 0; } long[] offsets = response.offsets(topic, partition); return offsets[0]; } public static TreeMap<Integer,PartitionMetadata> findLeader(List<String> a_seedBrokers, int a_port, String a_topic) { TreeMap<Integer, PartitionMetadata> map = new TreeMap<Integer, PartitionMetadata>(); for (String seed : a_seedBrokers) { SimpleConsumer consumer = null; try { consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup"+new Date().getTime()); List<String> topics = Collections.singletonList(a_topic); TopicMetadataRequest req = new TopicMetadataRequest(topics); kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); List<TopicMetadata> metaData = resp.topicsMetadata(); for (TopicMetadata item : metaData) { for (PartitionMetadata part : item.partitionsMetadata()) { map.put(part.partitionId(), part); } } } catch (Exception e) { System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + ", ] Reason: " + e); } finally { if (consumer != null) consumer.close(); } } return map; } public static void main(String[] args) { String topic = "topic-test"; String seed = "10.166.198.20"; int port = 39091; List<String> seeds = new ArrayList<String>(); seeds.add(seed); TreeMap<Integer,PartitionMetadata> metadatas = findLeader(seeds, port, topic); int sum = 0; for (Entry<Integer,PartitionMetadata> entry : metadatas.entrySet()) { int partition = entry.getKey(); String leadBroker = entry.getValue().leader().host(); String clientName = "Client_" + topic + "_" + partition; SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName); long readOffset = getLastOffset(consumer, topic, partition, kafka.api.OffsetRequest.LatestTime(), clientName); sum += readOffset; System.out.println(partition+":"+readOffset); if(consumer!=null)consumer.close(); } System.out.println("總和:"+sum); } }
相關推薦
Kafka 低階API 檢視topic
package com.zsb.test.util; import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.HashMap; imp
Kafka Java API操作topic
Kafka官方提供了兩個指令碼來管理topic,包括topic的增刪改查。其中kafka-topics.sh負責topic的建立與刪除;kafka-configs.sh指令碼負責topic的修改和查詢,但很多使用者都更加傾向於使用程式API的方式對topic進行操作。 (adsby
kafka檢視topic和訊息內容命令
1、查詢topic,進入kafka目錄: bin/kafka-topics.sh --list --zookeeper localhost:2181 2、查詢topic內容: bin/kafka-console-consumer.sh --bootstrap-
這幾天折騰spark的kafka的低階API createDirectStream的一些總結。
大家都知道在spark1.3版本後,kafkautil裡面提供了兩個建立dstream的方法,一個是老版本中有的createStream方法,還有一個是後面新加的createDirectStream方法。關於這兩個方法的優缺點,官方已經說的很詳細(http://spark.
Kafka技術內幕:消費者(高階和低階API)和 協調者
生產者傳送訊息時在客戶端就按照節點和Partition進行分組,屬於同一個目標節點的多個Partition會作為同一個請求傳送到服務端,作為目標節點的服務端也可以處理來自不同生產者客戶端的請求。如果從網路層通訊來看,客戶端和服務端都會使用佇列的方式確保順序地客戶端傳送請求
kafka 檢視topic消費
sudo su su kafka 加證書前: cd home/kafka/software/kafka/bin ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic monitor --fr
Kafka筆記整理(二):Kafka Java API使用
大數據 Kafka Java [TOC] Kafka筆記整理(二):Kafka Java API使用 下面的測試代碼使用的都是下面的topic: $ kafka-topics.sh --describe hadoop --zookeeper uplooking01:2181,uplooking0
Hadoop生態圈-Kafka的API之生產者-消費者
HA size ron 作品 消費 消費者 hadoop ado 原創 Hadoop生態圈-Kafka的API之生產者-消費者 作者:尹正傑 版權
kafka producer API v0.8.2 是一個分水嶺
kafka v0.8.2是一個分水嶺,對於kafka v0.8.x以前的kafka producer API,用如下的import標頭檔案 //for kafka client before v0.8
Kafka 學習筆記(3)——kafka java API
1 新建maven 工程 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi=
Kafka消費者API簡介
舊版消費者 當前kafka版本還保留著Scala版本的兩套消費者,被稱為舊版消費者。舊版消費者屬於kafka核心模組,分別為SimpleConsumer(低階Low-Level)和ZookeeperConsumerConnector(高階
storm整合kafka新版API(offset In Kafka)示例
本例storm版本為1.1.0 kafka版本為2.11.0.10.0.1 匯入maven依賴 <!--引入storm --> <dependency> <groupId>org.apache.storm&
kafka(2) ===》 broker/topic常規配置
broker: 1.broker.id: 叢集中每一個broker的broker.id需要是唯一的整數,用來標識broker 2.port:啟動埠,預設9092(不建議修改) 3.zookeeper.connect: zookeeper地址 h
storm整合kafka新版API(0.8版本之後)
本例storm版本為1.1.0 kafka版本為2.11.0.10.0.1 匯入maven依賴 <!--引入storm --> <dependency> <groupId>org.apache.storm&l
kafka如何徹底刪除topic及資料
轉載:https://blog.csdn.net/belalds/article/details/80575751 前言: 刪除kafka topic及其資料,嚴格來說並不是很難的操作。但是,往往給kafka 使用者帶來諸多問題。專案組之前接觸過多個開發者,發現都會偶然出現無法
Kafka 如何讀取offset topic內容 (v0.11.0.0前和以後)
眾所周知,由於Zookeeper並不適合大批量的頻繁寫入操作,新版Kafka已推薦將consumer的位移資訊儲存在Kafka內部的topic中,即__consumer_offsets topic,並且預設提供了kafka_consumer_groups.sh指令碼供使用者檢視consumer資訊。
kafka 高階api使用示例
一、基本概念 Kafka集成了Producer/Consumer連線Broker的客戶端工具,但是在訊息處理方面,這兩者主要用於服務端(Broker)的簡單操作,如: 1.建立Topic 2.羅列出已存在的Topic 3.對已有Topic的Produce/Cons
【Kafka】kafka動態獲取某個topic的partition資訊
現在有這樣一種場景,系統啟動前,預期abc topic建立了一個100個分割槽,大概用個一段時間,之後可能會動態新增分割槽數,這就要求生產者在生產資料時,能夠動態實時的獲取分割槽數,做到及時有效的雜湊生效,讓資料進入新增的分割槽,kafka的jar包裡倒是有這麼一個api可以
十一 kafka資料安全,以及Spark Kafka Streaming API
一基本網址 http://spark.apache.org/docs/1.6.2/api/java/index.html 在API中搜索org.apache.spark.streaming.kafka 二spark對接kafka流兩種方案 在org.apache.spark.streaming.k
kafka如何直接檢視log檔案中的資訊
我們在使用kafka的過程中有時候可以需要檢視我們生產的訊息的各種資訊,這些都是被記錄在卡夫卡的log檔案中的。由於log檔案的特殊格式,我們是無法直接檢視log檔案中的資訊的。本文提供一下方式檢視kafka的log檔案中所記錄的資訊。 執行命令 bin/