3、kafka的Java程式碼操作
maven依賴地址:http://www.mvnrepository.com/artifact/org.apache.kafka/kafka_2.10/0.8.2.0
maven依賴:
<span style="white-space:pre"> </span><dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.2.0</version> </dependency> </dependencies>
生產者程式碼:
package kafka; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import kafka.serializer.StringEncoder; public class Kafkaproducer extends Thread { private String topic; public Kafkaproducer(String topic){ super(); this.topic=topic; } @Override public void run() { Producer<Integer, String> producer=CreateProducer(); for (int i = 1; i < 10; i++) { String message="message"+i; producer.send(new KeyedMessage<Integer, String>(topic, message)); System.out.println("傳送:"+message); try { sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } public Producer<Integer, String> CreateProducer(){ Properties props=new Properties(); props.setProperty("zookeeper.connect", "192.168.1.200:2181"); props.setProperty("serializer.class", StringEncoder.class.getName()); props.setProperty("metadata.broker.list", "192.168.1.200:9092"); Producer<Integer, String> producer = new Producer<Integer, String>(new ProducerConfig(props)); return producer; } public static void main(String[] args) { new Kafkaproducer("test").start(); } }
eclipse的console上顯示的資訊:
log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 傳送:message1 傳送:message2 傳送:message3 傳送:message4 傳送:message5 傳送:message6 傳送:message7 傳送:message8 傳送:message9
在終端上用命令啟動消費者:
[[email protected] kafka_2.10-0.8.2.1]# bin/kafka-console-consumer.sh --zookeeper 192.168.1.200:2181 --topic test --from-beginning
終端上顯示的資訊:
message1
message2
message3
message4
message5
message6
message7
message8
message9
==================================================================================================================================
消費者程式碼:
package kafka;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class KafkaConsumer extends Thread{
private String topic;
private KafkaConsumer(String topic) {
super();
this.topic=topic;
}
@Override
public void run() {
ConsumerConnector consumer = createConsumer();
Map<String,Integer> topicCountMap=new HashMap<String, Integer>();
topicCountMap.put(topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> MessageStreams = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> kafkaStream = MessageStreams.get(topic).get(0);
ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator();
while (iterator.hasNext()) {
String message = new String(iterator.next().message());
System.out.println("接收到:"+message);
}
}
public ConsumerConnector createConsumer(){
Properties properties = new Properties();
properties.setProperty("zookeeper.connect", "192.168.1.200:2181");
properties.setProperty("group.id", "group1");
ConsumerConnector createJavaConsumerConnector = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
return createJavaConsumerConnector;
}
public static void main(String[] args) {
new KafkaConsumer("test").start();
}
}
先啟動消費者,再啟動生產者類:
生產者console輸出內容:
log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
傳送:message1
傳送:message2
傳送:message3
傳送:message4
傳送:message5
傳送:message6
傳送:message7
傳送:message8
傳送:message9
消費者console輸出內容:
log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
接收到:message1
接收到:message2
接收到:message3
接收到:message4
接收到:message5
接收到:message6
接收到:message7
接收到:message8
接收到:message9
消費者終端同樣會收到並輸出:
message1
message2
message3
message4
message5
message6
message7
message8
message9
相關推薦
開啟運維之路之第 7 篇——RedisDesktopManager使用、Keys通用操作、Java程式碼操作基本的Redis
RedisDesktopManager下載地址:Redis桌面管理工具官方下載地址 安裝好,直接雙擊開啟。 說明:我本機的 IP 由於使用公司的 IP ,經常會變動,但不影響連線 Linux 虛擬機器。 現在發現個問題,無法連線到 Redis 。 解決過程: ①Redi
3、myeclipse編譯/操作時自動儲存設定
一般來說myeclipse會自動儲存,有時也有意外 設定自動儲存: Window > Preferences > General > Workspace > [x] Save automatically before build
3、kafka的Java程式碼操作
maven依賴地址:http://www.mvnrepository.com/artifact/org.apache.kafka/kafka_2.10/0.8.2.0 maven依賴: <span style="white-space:pre"> </
3、Python文件操作工具 xlsxwriter 工具
sheet 設置 writer workbook odi amp $# file exce # _*_ encoding:utf-8 _*_import xlsxwriter#創建xlsx後綴名的excelexcel = xlsxwriter.Workbook(r‘D:\g
3、Manipulating Results 結果操作
修改預設的content-type 執行結果的content-type是通過response中的返回值來自動推測的, 例如: val textResult = Ok("Hello World!") 將自動設定 Content-Type 的頭為
elasticsearch-6.4.3 java程式碼操作ES
elasticsearch-6.4.3 java程式碼操作ES 這次我講的是6.4.3版本的elasticsearch相關java程式碼,其餘版本的elasticsearch用這套程式碼不一定會好使,所以說看的時候請注意你的elasticsearch版
eclipse使用git進行程式碼修改合併、GitHub程式碼同步和版本回退等操作
1、修改程式碼後提交 修改的檔案回出現在以下位置中,選中右鍵add index——》填寫commit message——》commit 將程式碼push到GitHub:右鍵專案——》team——》remote——》push——》填寫你的GitHub倉庫的uri(例如https://githu
3、mysql資料庫的基本操作
操作環境:cmd命令視窗 1、建立資料庫 create database super; (推薦用英文命名) 2、資料庫儲存路徑檢視 show variables like '%datadir%'; 3、檢視現有的資料庫 show databases 4、使用資料庫 u
【專案知識點彙總】二、JNI程式碼編譯方式camke 和 ndk 方式 -- Android Studio 操作
一、介紹 Android Studio 編譯JNI程式碼有兩種方式:cmake 和 ndk 方式 使用感受: 1、cmake方式會受到所用Android sdk版本的影響,主要是ndk的版本影響,沒有深入去探究原理 2、ndk方式可以跨Android sdk 版本執行
MQ 學習 3 php 程式碼 操作
<?php /** * Created by PhpStorm. * User: ASUS * Date: 2018/10/22 * Time: 19:51 */ namespace App\Services; use App\Models\RfImageA
git操作 —— 檢視倉庫、更新程式碼、更改賬戶、忽略規則
1.檢視連線遠端的倉庫 git remote -v 2.git 更新伺服器程式碼到本地 沒有任何修改就用 git pull 3.更新程式碼到遠端伺服器 git add --all 檔案新增到版本控制器 git commit -m “本次提交描述” 該命令會
3、關於匿名內部類一個小題目(補全程式碼)
/* 匿名內部類面試題: 按照要求,補齊程式碼 interface Inter { void show(); } class Outer { //補齊程式碼 } class OuterDemo { public static void mai
基礎教程:3、Xshell 6 個人版安裝與遠端操作連線伺服器
3.1 下載Xshell6 (1)開啟官網 https://www.netsarang.com/download/free_license.html (2)單擊“Xshell 6”圖示下的“Download”按鈕,進入下面頁面。預設情況下已經選擇“Home and school u
Spark2.0機器學習系列之3:決策樹及Spark 2.0-MLlib、Scikit程式碼分析
概述 分類決策樹模型是一種描述對例項進行分類的樹形結構。 決策樹可以看為一個if-then規則集合,具有“互斥完備”性質 。決策樹基本上都是 採用的是貪心(即非回溯)的演算法,自頂向下遞迴分治構造。 生成決策樹一般包含三個步驟: 特徵選擇 決策樹生成 剪枝
3、學什麼技術之前端開發JS程式碼規範語法
學什麼技術之前端開發JS程式碼規範語法JS程式碼規範一(語法&格式篇)基本原則所有的程式碼都要符合可維護性原則 —— 簡單、便於閱讀。部分編碼原則是與效能原則相悖的, 如果遇到這種情況, 請優先遵守語法規範。 (注: 如果確實有不確定的 情況或者效能影響很大, 請聯絡
Dojo初探之3:dojo的DOM操作、query操作和domConstruct元素位置操作(基於dojo1.11.2版本)
前言: 前面兩章講了dojo的基本規範和配置,當然這個配置不是必須的,當你有這需求的時候就可以用到dojo的config配置。 綴述: 這章開始真正講解dojo的所有基本操作,包含dom、quer
win10安裝.net framework 3.5 錯誤程式碼 錯誤程式碼 0x800F0906、0x800F081F、0x800F0907
今天在控制面板裡安裝.net framework 3.5時遇到了錯誤,錯誤程式碼0x800F081F,搜了好久沒有解決,查了下官方的錯誤程式碼說明,解決方案在下面的連結,寫的也並不是非常清楚,圖上說的方法二都不知道指的是哪裡,感覺這個官方的support可能也是不知道從哪裡
呼叫JAVA API對HDFS檔案進行檔案的讀寫、上傳下載、刪除等操作程式碼詳解
Hadoop檔案系統 基本的檔案系統命令操作, 通過hadoop fs -help可以獲取所有的命令的詳細幫助檔案。 Java抽象類org.apache.hadoop.fs.FileSystem定義了hadoop的一個檔案系統介面。該類是一個抽象類,通過以下兩種靜態工廠方
Scala中檔案的讀取、寫入、控制檯輸入操作程式碼實戰
內容: 1、檔案的讀取、寫入操作 2、控制檯操作程式碼實戰 val file = Source.fromFile("E:\\WangJialin.txt") for(line <-file.getLines){println(file)
ThinkPHP 3.2 連線多個數據庫使用(M、D)操作說明
<?php /** * 配置檔案 */ return array( // 預設連線資料庫 'DB_TYPE' => 'mysql', // 資料庫型別 'DB_H