1. 程式人生 > >3、kafka的Java程式碼操作

3、kafka的Java程式碼操作



maven依賴地址:http://www.mvnrepository.com/artifact/org.apache.kafka/kafka_2.10/0.8.2.0
maven依賴:  
<span style="white-space:pre">	</span><dependencies>
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka_2.10</artifactId>
			<version>0.8.2.0</version>
		</dependency>
	</dependencies>


生產者程式碼:
package kafka;

import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringEncoder;

public class Kafkaproducer extends Thread {
	private String topic;
	
	public Kafkaproducer(String topic){
		super();
		this.topic=topic;
	}
	
	@Override
	public void run() {
		Producer<Integer, String> producer=CreateProducer();
		for (int i = 1; i < 10; i++) {
			String message="message"+i;
			producer.send(new KeyedMessage<Integer, String>(topic, message));
			System.out.println("傳送:"+message);
			try {
				sleep(1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
	
	public Producer<Integer, String> CreateProducer(){
		Properties props=new Properties();
		props.setProperty("zookeeper.connect", "192.168.1.200:2181");
		props.setProperty("serializer.class", StringEncoder.class.getName());
		props.setProperty("metadata.broker.list", "192.168.1.200:9092");
		Producer<Integer, String> producer = new Producer<Integer, String>(new ProducerConfig(props));
		return producer;
	}
	
	public static void main(String[] args) {
		new Kafkaproducer("test").start();
	}
	
}


eclipse的console上顯示的資訊:
log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
傳送:message1
傳送:message2
傳送:message3
傳送:message4
傳送:message5
傳送:message6
傳送:message7
傳送:message8
傳送:message9



在終端上用命令啟動消費者:
[[email protected] kafka_2.10-0.8.2.1]# bin/kafka-console-consumer.sh --zookeeper 192.168.1.200:2181 --topic test --from-beginning


終端上顯示的資訊:
message1
message2
message3
message4
message5
message6
message7
message8
message9



==================================================================================================================================
消費者程式碼:


package kafka;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;


public class KafkaConsumer extends Thread{
	private String topic;
	
	private KafkaConsumer(String topic) {
		super();
		this.topic=topic;
	}
	
	@Override
	public void run() {
		ConsumerConnector consumer = createConsumer();
		Map<String,Integer> topicCountMap=new HashMap<String, Integer>();
		topicCountMap.put(topic, 1);
		
		Map<String, List<KafkaStream<byte[], byte[]>>> MessageStreams = consumer.createMessageStreams(topicCountMap);
		KafkaStream<byte[], byte[]> kafkaStream = MessageStreams.get(topic).get(0);
		ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator();
		while (iterator.hasNext()) {
			String message = new String(iterator.next().message());
			System.out.println("接收到:"+message);
		}
	}
	
	public ConsumerConnector createConsumer(){
		Properties properties = new Properties();
		properties.setProperty("zookeeper.connect", "192.168.1.200:2181");
		properties.setProperty("group.id", "group1");
		ConsumerConnector createJavaConsumerConnector = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
		return createJavaConsumerConnector;
	}
	
	public static void main(String[] args) {
		new KafkaConsumer("test").start();
	}
}



先啟動消費者,再啟動生產者類:
生產者console輸出內容:
log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
傳送:message1
傳送:message2
傳送:message3
傳送:message4
傳送:message5
傳送:message6
傳送:message7
傳送:message8
傳送:message9


消費者console輸出內容:
log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
接收到:message1
接收到:message2
接收到:message3
接收到:message4
接收到:message5
接收到:message6
接收到:message7
接收到:message8
接收到:message9


消費者終端同樣會收到並輸出:
message1
message2
message3
message4
message5
message6
message7
message8
message9




相關推薦

開啟運維之路之第 7 篇——RedisDesktopManager使用Keys通用操作Java程式碼操作基本的Redis

RedisDesktopManager下載地址:Redis桌面管理工具官方下載地址 安裝好,直接雙擊開啟。 說明:我本機的 IP 由於使用公司的 IP ,經常會變動,但不影響連線 Linux 虛擬機器。 現在發現個問題,無法連線到 Redis 。 解決過程: ①Redi

3myeclipse編譯/操作時自動儲存設定

一般來說myeclipse會自動儲存,有時也有意外 設定自動儲存: Window > Preferences  > General > Workspace > [x] Save automatically before build

3kafka的Java程式碼操作

maven依賴地址:http://www.mvnrepository.com/artifact/org.apache.kafka/kafka_2.10/0.8.2.0 maven依賴:  <span style="white-space:pre"> </

3Python文件操作工具 xlsxwriter 工具

sheet 設置 writer workbook odi amp $# file exce # _*_ encoding:utf-8 _*_import xlsxwriter#創建xlsx後綴名的excelexcel = xlsxwriter.Workbook(r‘D:\g

3Manipulating Results 結果操作

修改預設的content-type 執行結果的content-type是通過response中的返回值來自動推測的, 例如: val textResult = Ok("Hello World!") 將自動設定 Content-Type 的頭為

elasticsearch-6.4.3 java程式碼操作ES

elasticsearch-6.4.3 java程式碼操作ES      這次我講的是6.4.3版本的elasticsearch相關java程式碼,其餘版本的elasticsearch用這套程式碼不一定會好使,所以說看的時候請注意你的elasticsearch版

eclipse使用git進行程式碼修改合併GitHub程式碼同步和版本回退等操作

1、修改程式碼後提交 修改的檔案回出現在以下位置中,選中右鍵add index——》填寫commit message——》commit 將程式碼push到GitHub:右鍵專案——》team——》remote——》push——》填寫你的GitHub倉庫的uri(例如https://githu

3mysql資料庫的基本操作

操作環境:cmd命令視窗 1、建立資料庫 create database super; (推薦用英文命名) 2、資料庫儲存路徑檢視 show variables like '%datadir%'; 3、檢視現有的資料庫 show databases 4、使用資料庫 u

【專案知識點彙總】二JNI程式碼編譯方式camke 和 ndk 方式 -- Android Studio 操作

一、介紹 Android Studio 編譯JNI程式碼有兩種方式:cmake 和 ndk 方式 使用感受: 1、cmake方式會受到所用Android sdk版本的影響,主要是ndk的版本影響,沒有深入去探究原理 2、ndk方式可以跨Android sdk 版本執行

MQ 學習 3 php 程式碼 操作

<?php /** * Created by PhpStorm. * User: ASUS * Date: 2018/10/22 * Time: 19:51 */ namespace App\Services; use App\Models\RfImageA

git操作 —— 檢視倉庫更新程式碼更改賬戶忽略規則

1.檢視連線遠端的倉庫 git remote -v 2.git 更新伺服器程式碼到本地 沒有任何修改就用 git pull 3.更新程式碼到遠端伺服器 git add --all 檔案新增到版本控制器 git commit -m “本次提交描述” 該命令會

3關於匿名內部類一個小題目(補全程式碼

/* 匿名內部類面試題: 按照要求,補齊程式碼 interface Inter { void show(); } class Outer { //補齊程式碼 } class OuterDemo { public static void mai

基礎教程:3Xshell 6 個人版安裝與遠端操作連線伺服器

3.1 下載Xshell6 (1)開啟官網 https://www.netsarang.com/download/free_license.html (2)單擊“Xshell 6”圖示下的“Download”按鈕,進入下面頁面。預設情況下已經選擇“Home and school u

Spark2.0機器學習系列之3:決策樹及Spark 2.0-MLlibScikit程式碼分析

概述 分類決策樹模型是一種描述對例項進行分類的樹形結構。 決策樹可以看為一個if-then規則集合,具有“互斥完備”性質 。決策樹基本上都是 採用的是貪心(即非回溯)的演算法,自頂向下遞迴分治構造。 生成決策樹一般包含三個步驟: 特徵選擇 決策樹生成 剪枝

3學什麼技術之前端開發JS程式碼規範語法

學什麼技術之前端開發JS程式碼規範語法JS程式碼規範一(語法&格式篇)基本原則所有的程式碼都要符合可維護性原則 —— 簡單、便於閱讀。部分編碼原則是與效能原則相悖的, 如果遇到這種情況, 請優先遵守語法規範。 (注: 如果確實有不確定的 情況或者效能影響很大, 請聯絡

Dojo初探之3:dojo的DOM操作query操作和domConstruct元素位置操作(基於dojo1.11.2版本)

前言: 前面兩章講了dojo的基本規範和配置,當然這個配置不是必須的,當你有這需求的時候就可以用到dojo的config配置。 綴述: 這章開始真正講解dojo的所有基本操作,包含dom、quer

win10安裝.net framework 3.5 錯誤程式碼 錯誤程式碼 0x800F09060x800F081F0x800F0907

今天在控制面板裡安裝.net framework 3.5時遇到了錯誤,錯誤程式碼0x800F081F,搜了好久沒有解決,查了下官方的錯誤程式碼說明,解決方案在下面的連結,寫的也並不是非常清楚,圖上說的方法二都不知道指的是哪裡,感覺這個官方的support可能也是不知道從哪裡

呼叫JAVA API對HDFS檔案進行檔案的讀寫上傳下載刪除等操作程式碼詳解

Hadoop檔案系統  基本的檔案系統命令操作, 通過hadoop fs -help可以獲取所有的命令的詳細幫助檔案。 Java抽象類org.apache.hadoop.fs.FileSystem定義了hadoop的一個檔案系統介面。該類是一個抽象類,通過以下兩種靜態工廠方

Scala中檔案的讀取寫入控制檯輸入操作程式碼實戰

內容: 1、檔案的讀取、寫入操作 2、控制檯操作程式碼實戰 val file = Source.fromFile("E:\\WangJialin.txt") for(line <-file.getLines){println(file)

ThinkPHP 3.2 連線多個數據庫使用(MD)操作說明

<?php /** * 配置檔案 */ return array( // 預設連線資料庫 'DB_TYPE' => 'mysql', // 資料庫型別 'DB_H