1. 程式人生 > >kafka版本1.1.0 javaAPI實現生產者、消費者

kafka版本1.1.0 javaAPI實現生產者、消費者

一、環境準備:

1、maven工程中引入依賴:

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.12</artifactId>
        <version>1.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>1.1.0</version>
    </dependency>
2、建立kafkaProperties
public class KafkaProperties {
    //blocker連線
    public static final String broker = "192.168.80.140:9092";
    //topic
    public static final String topic = "kafka_api";
}
3、本機主機是否能telnet通(kafka節點),如果不通需要關閉防火牆4、修改kafka中server.properties配置檔案:
hostname和埠是用來建議給生產者和消費者使用的,如果沒有設定,將會使用listeners的配置,如果listeners也沒有配置,將使用java.net.InetAddress.getCanonicalHostName()來獲取這個hostname和port,基本就是localhost。
"PLAINTEXT"表示協議,可選的值有PLAINTEXT和SSL,hostname可以指定IP地址,也可以用"0.0.0.0"表示對所有的網路介面有效,如果hostname為空表示只對預設的網路介面有效
如果你沒有配置advertised.listeners,就使用listeners的配置通告給訊息的生產者和消費者,這個過程是在生產者和消費者獲取源資料(metadata)。
1)listeners=PLAINTEXT://192.168.80.140:90922)advertised.listeners=PLAINTEXT://192.168.80.140:9092(#0.9.x以後的版本advertised.host.name 和 advertised.host.port)3)zookeeper.connect=192.168.80.140:2181

二、java API

1、實現生產者(Producer)


import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class MyProducer extends Thread {
    private String topic;
    private KafkaProducer<String,String> producer;

    public MyProducer(String topic) {
        this.topic = topic;

        Properties properties = new Properties();
        properties.put("bootstrap.servers",KafkaProperties.broker);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producer = new KafkaProducer<String, String>(properties);
    }

    @Override
    public void run() {
        int messageNo = 1;
        while (true){
            String message = "message" + messageNo;
            System.out.println("send = "+message);
            producer.send(new ProducerRecord<String, String>(topic,message));
            messageNo ++;

            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args){
        new MyProducer(KafkaProperties.topic).start();
    }
}

生產者控制檯輸出結果:

2、實現消費者(Consumer)


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;

public class MyConsumer extends Thread {
    private String topic;
    KafkaConsumer<String, String> consumer;

    public MyConsumer() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", KafkaProperties.broker);
        properties.setProperty("group.id","test");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Arrays.asList(KafkaProperties.topic));
    }

    @Override
    public void run() {
        while (true){
            ConsumerRecords<String,String> records = consumer.poll(100);
            for (ConsumerRecord<String,String> recode: records) {
                System.out.println("recodeOffset = " + recode.offset() + "recodeValue = " + recode.value());
            }
        }
    }
    public static void main(String[] args){
        new MyConsumer().start();
    }
}

消費者控制檯輸出結果:


相關推薦

kafka版本1.1.0 javaAPI實現生產者消費者

一、環境準備:1、maven工程中引入依賴: <dependency> <groupId>org.apache.kafka</groupId>

Kafka的簡單介紹與使用,生產者消費者JavaApi

一、簡介 2、實時流資料管道,可以在 3、構建流式引用 4、是一個分散式流式處理平臺, 統稱訊息佇列或訊息中介軟體,有生產者和消費者之分 消費者去kafka中拉資料(而不是kafka給資料) 其實kafka就是一個臨時儲存的外掛,但是這個外掛效能很強大 kafka 是用scala編譯的 0.

Java實現Kafka生產者消費者

一、生產者 import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Prope

QT 總結(一)(1.某位設10 2.載入dlllib庫 3.char * 轉換為 wchar_t * 4.textBrowser 顯示亂碼 追加 5.checkBox的使能)

今天寫小程式遇到的問題,做個小總結: 1.如何對某一位置0或者置1? 寫成巨集,方便移植 #define setbit(x,y) x|=(1<<y) //將X的第Y位置1 #define clrbit(x,y) x&=~(1<<y) //將

使用Python多線程實現生產者消費者模型

watermark vpd 51cto 實現 this read sleep get DG 1,我所使用到的python版本 2,下面編寫具體的實現過程 import threadingimport time import Queue #首先生成一個隊列q =Queue.

C++11實現生產者消費者

#include <iostream> #include <thread> #include <mutex> #include <deque> #include <vector> #include <condition

Lock和condition實現生產者消費者模型

package ThreadLearn; /** * 面試題:寫一個固定容量同步容器,擁有put和get方法,以及getCount方法, * 能夠支援2個生產者執行緒以及10個消費者執行緒的阻塞呼叫 * * 使用wait和notify/notifyAll來實現 * * 使用L

SpringBoot+RabbitMq實現生產者消費者的多種情景

一、新建maven工程:springboot-rabbitmq 二、引入springboot和rabbitmq的依賴  <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3

佇列實現生產者消費者

import threading,time,queue,randomdef scz(): while True: num=random.randint(1,100000) q.put(num)#放進佇列 print("生產者產生了%d資料" %(num)) time.sleep(5) q.task_do

多執行緒模擬實現生產者消費者模型

多執行緒模擬實現生產者/消費者模型 package com.chow.queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java

RabbitMQ實現生產者消費者(帶註釋)

1、生產者:rabbitmq_publisher.php <?php date_default_timezone_set("Asia/Shanghai"); //配置資訊 $conn_args = array( 'host' => '127.0.0.1', 'po

用ReentrantLock和Condition實現生產者消費者模式 waitnotify應用場景(生產者-消費者模式)

前面一篇文章《wait、notify應用場景(生產者-消費者模式)》是一種生產者消費者模式實現,今晚這是Lock方式實現,下面是原始碼: 生產者程式碼: /** * 生產者 * * @author tangquanbin * @date 2018/12/18 22:10 */ public

利用wait()和notify()實現生產者消費者問題

     生產者與消費者問題是併發程式設計裡面的經典問題,下面用wait()和notify()來實現消費者執行緒和生產者執行緒的併發執行。    說之前先講幾個概念:    wait()與slee

在python中實現生產者消費者的例子(一):使用multiprocessing和pipe()

本文介紹如何用multiprocessing模組及pipe實現生產者和消費者的例子 程式碼例項如下: import multiprocessing #定義消費者 def consumer(pipe):     output_p,input_p=pipe     input

Java阻塞佇列實現生產者消費者

生產者 import java.util.Random; import java.util.concurrent.BlockingQueue; //生產者 public class Producer implements Runnable{ private final Block

linux多執行緒學習(七)——實現生產者消費者

在上一篇文章中,利用訊號量實現了執行緒間的互斥,這一篇將要利用訊號量的互斥同步機制來實現一個經典例項,就是“生產者和消費者”。 1、簡單描述生產者和消費者的問題。 有一個緩衝區和兩個執行緒:生產者和消費者。生產者把產品放入緩衝區,而消費者從緩衝區中拿走。當緩衝區滿時,生產者必

作業系統-使用訊號量實現生產者消費者(C++實現

常用函式: HANDLE WINAPI CreateSemaphore(                  _In_opt_  LPSECURITY_ATTRIBUTES lpSemaphoreAttributes   _In_      LONG lInitialCoun

實現生產者消費者模式

產品 style 等待 pytho ask 緩沖 制造 目錄 tex 實現生產者與消費者模式 目錄 生產者與消費者模式 實現 生產者與消費者模式 什麽是生產者消費者模式 生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼

Java中通過wait和notify來實現生產者消費者模式

今天通過介紹一下如何通過wait和notify來實現生產者和消費者模式。 通過synchronized同步程式碼塊實現執行緒的同步操作,從而保證資料的一致性。下面具體介紹一下這個模式的實現過程。 1.首先編寫一個生產者類: public class Producer imp

java多執行緒--ReentrantLock實現生產者消費者模式

一.本例實現 :一對一交替列印, 一.生產者邏輯 :每次只允許一個生產者來進行生產操作(生產者之間互斥訪問倉庫),必須等消費者取走資料之後,才能進行下一次的生產 二.消費者邏輯 :每次只允許一個消費者來進行生產操作(消費者之間互斥訪問倉庫),必須等生產者生產資料之後,才能