1. 程式人生 > >kafka(3)API使用

kafka(3)API使用

tostring 信息 bootstra 列表 con for edt rdb 對象實例

相關依賴

<!-- Kafka 依賴包 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.1.1</version>
</dependency>


<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>


<version>0.10.1.1</version>
</dependency>

一個簡單的Kafka生產者一般步驟如下:

創建 Properties 對象,設置生產者級別配置。以下3個配置是必須指定的。

(1)

  bootstrap.servers 配置連接 Kafka 代理列表,不必包含 Kafka 集群所有的代理地址,當 連接上一個代理後,會從集群元數據信息中獲取其他存活的代理信息。但為了保證能 夠成功連上 Kafka 集群 在多代理集群的情況下建議至少配置兩個代理。

  key.serializer :配置用於序列化消息 Key 的類。

  value.serializer :配置用於序列化消息實際數據的類。

(2)根據 Properties 對象實例化一個 KafkaProducer 對象。

(3)實例化 ProducerRecord 對象, 每條消息對應一個 ProducerRecord 對象。

(4)調用 KafkaProducer 發送消息的方法將 ProducerRecord 發送到 Kafka 相應節點。 Kafka提供了兩個發送消息的方法,即 send(ProducerRecord <String,String> record 方法和sendσroducerRecord<string,string> record,Callback callback)方法,帶有回調函數的 send() 方法要實現 org.apache kafka.clients.producer Callback 接口。如果消息發送發生異常, Callback 接口的 onCompletion會捕獲到相應異常。 KafkaProducer 默認是異步發送消息, 會將消息緩存到消息緩沖區中,當消息 在消息緩沖區中累計到一定數量後作為一個 RecordBatch 再發叠。生產者發送消息實質分兩個階段:第一階段是將消息發送到消息緩沖區;第二階段是 Sender 線程負責將緩沖區的消息發送到代理,執行真正的I/O操作,而在第一階段執行完後就返回一個Future 象,根據對Future對象處理方式的不同,KafkaProducer 支持兩種發送消息方式。

技術分享圖片

package com.kafka.action.chapter6.producer;

import java.text.DecimalFormat;
import java.util.Properties;
import java.util.Random;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import com.kafka.action.chapter6.dto.StockQuotationinfo;


/**
 * 
* @Title: QuotationProducer.java  
* @Package com.kafka.action.chapter6.producer  
* @Description: 單線程生產者
* @author licl 
* @date 2018年9月9日
 */
public class QuotationProducer {
	// 設置實例生產消息的總數
	private static final int MSG_SIZE = 100;
	// 主題名稱
	private static final String TOPIC = "test";
	// kafka集群
	private static final String BROKER_LIST = "192.168.1.106:9092";

	private static KafkaProducer<String, String> producer = null;
	static {
		/*
		 * I I 1. 構造用於實例化 Kaf kaProducer Properties 信息
		 */
		Properties configs = initConfig();
		// II 2. 初始化一個 KafkaProducer
		producer = new KafkaProducer<String, String>(configs);
	}

	/*
	 * 初始化 Kafka 配置
	 */

	private static Properties initConfig() {
		Properties properties = new Properties();
		properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
		properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
				StringSerializer.class.getName());
		properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
				StringSerializer.class.getName());
		return properties;
	}

	// 生產股票行情信息

	private static StockQuotationinfo createQuotationinfo() {
		StockQuotationinfo quotationinfo = new StockQuotationinfo();
		// 隨機產生 1-10 之間的整數,然後與 600100 相加組成股票代碼
		Random r = new Random();
		Integer stockCode = 600100 + r.nextInt(10);
		// /隨機產生一個 0-1之間的浮點數
		float random = (float) Math.random();
		// 設置漲跌規則
		if (random / 2 < 0.5) {
			random = -random;
		}
		// 設置保存兩位有效數字
		DecimalFormat decimalFormat = new DecimalFormat(".00");
		// 設置最新價在 11元浮動
		quotationinfo.setCurrentPrice(Float.valueOf(decimalFormat
				.format(11 + random)));
		// 設置昨日收盤價為固定值
		quotationinfo.setPreClosePrice(11.80f);
		// 設置開盤價
		quotationinfo.setOpenPrice(11.5f);
		// 設置最低價,並不考慮 10% 限制,/以及當前價是否已是最低價
		quotationinfo.setLowPrice(10.5f);
		// 設置最高價,並不考慮 10 %限制/以及當前價是否已是最高價
		quotationinfo.setHighPrice(12.5f);
		quotationinfo.setStockCode(stockCode.toString());
		quotationinfo.setTradeTime(System.currentTimeMillis());
		quotationinfo.setStockName(" 股票- + stockCode");
		return quotationinfo;

	}

	public static void main(String[] args) {

		ProducerRecord<String, String> record = null;
		StockQuotationinfo quotationinfo = null;
		try {
			int num = 0;
			for (int i = 0; i < MSG_SIZE; i++) {
				quotationinfo = createQuotationinfo();
				record = new ProducerRecord<String, String>(TOPIC, null,
						quotationinfo.getTradeTime(),
						quotationinfo.getStockCode(), quotationinfo.toString());
				// 異步發送消息
				// 1.正常發送
				//producer.send(record);
				
				
				
				// 2.指定回調實現邏輯
				producer.send(record, new Callback() {
					
					@Override
					public void onCompletion(RecordMetadata metadata, Exception exception) {
						if(exception != null){
							System.out.println("Send message occurs exception");
							exception.printStackTrace();
						}
						if(exception == null){
							System.out.println(String.format("offset:%s,partition:%s", metadata.offset(),metadata.partition()));
						}
						
					}
				});
				if (num++ % 10 == 0) {
					// 休眠 2s
					Thread.sleep(2000L);
				}
			}
		} catch (Exception e) {
			e.printStackTrace();

		}finally{
			producer.close();
		}

	}

}

  

package com.kafka.action.chapter6.producer;

import java.text.DecimalFormat;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
/**
 * 
* @Title: QuotationProducer.java  
* @Package com.kafka.action.chapter6.producer  
* @Description: 多線程生產者
* @date 2018年9月9日
 */
import com.kafka.action.chapter6.dto.StockQuotationinfo;

public class KafkaProducerThread implements Runnable {
	
	// 設置實例生產消息的總數
	private static final int MSG_SIZE = 100;
	private static final String TOPIC = "test";
	private KafkaProducer<String, String> producer = null;
	private ProducerRecord<String, String> record = null;
	StockQuotationinfo quotationinfo = null;

	ExecutorService executor = Executors.newFixedThreadPool(10);
	long current = System.currentTimeMillis();

	private static StockQuotationinfo createQuotationinfo() {
		StockQuotationinfo quotationinfo = new StockQuotationinfo();
		// 隨機產生 1-10 之間的整數,然後與 600100 相加組成股票代碼
		Random r = new Random();
		Integer stockCode = 600100 + r.nextInt(10);
		// /隨機產生一個 0-1之間的浮點數
		float random = (float) Math.random();
		// 設置漲跌規則
		if (random / 2 < 0.5) {
			random = -random;
		}
		// 設置保存兩位有效數字
		DecimalFormat decimalFormat = new DecimalFormat(".00");
		// 設置最新價在 11元浮動
		quotationinfo.setCurrentPrice(Float.valueOf(decimalFormat
				.format(11 + random)));
		// 設置昨日收盤價為固定值
		quotationinfo.setPreClosePrice(11.80f);
		// 設置開盤價
		quotationinfo.setOpenPrice(11.5f);
		// 設置最低價,並不考慮 10% 限制,/以及當前價是否已是最低價
		quotationinfo.setLowPrice(10.5f);
		// 設置最高價,並不考慮 10 %限制/以及當前價是否已是最高價
		quotationinfo.setHighPrice(12.5f);
		quotationinfo.setStockCode(stockCode.toString());
		quotationinfo.setTradeTime(System.currentTimeMillis());
		quotationinfo.setStockName(" 股票- + stockCode");
		return quotationinfo;

	}

	@Override
	public void run() {
		// 1.線程池
		try {
			for (int i = 0; i < MSG_SIZE; i++) {
				quotationinfo = createQuotationinfo();
				record = new ProducerRecord<String, String>(TOPIC, null,
						quotationinfo.getTradeTime(),
						quotationinfo.getStockCode(), quotationinfo.toString());
				executor.submit(new KafkaProducerThread(producer, record));
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			producer.close();
			executor.shutdown();
		}

	}

	public KafkaProducerThread(KafkaProducer<String, String> producer,
			ProducerRecord<String, String> record) {
		this.producer = producer;
		this.record = record;
	}

}

  

package com.kafka.action.chapter6.dto;

import java.io.Serializable;

public class StockQuotationinfo implements Serializable{

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;
	public StockQuotationinfo() {
		super();
	}
	//股票代碼
	private String stockCode ;
	//股票名稱
	private String stockName ;
	@Override
	public String toString() {
		return "StockQuotationinfo [stockCode=" + stockCode + ", stockName="
				+ stockName + ", tradeTime=" + tradeTime + ", preClosePrice="
				+ preClosePrice + ", openPrice=" + openPrice
				+ ", currentPrice=" + currentPrice + ", highPrice=" + highPrice
				+ ", lowPrice=" + lowPrice + "]";
	}
	//交易時間
	private long tradeTime;
	//昨日收盤價
	private float preClosePrice;
	//開盤價
	private float openPrice ;
	//當前價,收盤時即為當日收盤價
	private float currentPrice ;
	//今日最高
	private float highPrice;
	//今日最低
	private float lowPrice;
	public StockQuotationinfo(String stockCode, String stockName,
			long tradeTime, float preClosePrice, float openPrice,
			float currentPrice, float highPrice, float lowPrice) {
		super();
		this.stockCode = stockCode;
		this.stockName = stockName;
		this.tradeTime = tradeTime;
		this.preClosePrice = preClosePrice;
		this.openPrice = openPrice;
		this.currentPrice = currentPrice;
		this.highPrice = highPrice;
		this.lowPrice = lowPrice;
	}
	public String getStockCode() {
		return stockCode;
	}
	public void setStockCode(String stockCode) {
		this.stockCode = stockCode;
	}
	public String getStockName() {
		return stockName;
	}
	public void setStockName(String stockName) {
		this.stockName = stockName;
	}
	public long getTradeTime() {
		return tradeTime;
	}
	public void setTradeTime(long tradeTime) {
		this.tradeTime = tradeTime;
	}
	public float getPreClosePrice() {
		return preClosePrice;
	}
	public void setPreClosePrice(float preClosePrice) {
		this.preClosePrice = preClosePrice;
	}
	public float getOpenPrice() {
		return openPrice;
	}
	public void setOpenPrice(float openPrice) {
		this.openPrice = openPrice;
	}
	public float getCurrentPrice() {
		return currentPrice;
	}
	public void setCurrentPrice(float currentPrice) {
		this.currentPrice = currentPrice;
	}
	public float getHighPrice() {
		return highPrice;
	}
	public void setHighPrice(float highPrice) {
		this.highPrice = highPrice;
	}
	public float getLowPrice() {
		return lowPrice;
	}
	public void setLowPrice(float lowPrice) {
		this.lowPrice = lowPrice;
	}
	public static long getSerialversionuid() {
		return serialVersionUID;
	}


}

 

消費者

package demo2;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.junit.Test;


public class MyKafkaConsumer {


       /**
     * 自動提交offset
     */
    @Test
    public void comsumeMsgAutoCommit() {

        Properties props = new Properties();
        props.put("bootstrap.servers", Constants.KAFKA_SERVER_ADRESS + ":" + Constants.KAFKA_SERVER_PORT);
        props.put("group.id", Constants.GROUP_ID);
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

		KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        consumer.subscribe(Arrays.asList(Constants.MY_TOPIC));
        
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
            sleep(1);
        }
    }

    /**
     * 手動提交offset
     */
    @Test
    public void consumerMsgManualCommit() {
        Properties props = new Properties();
        props.put("bootstrap.servers", Constants.KAFKA_SERVER_ADRESS + ":" + Constants.KAFKA_SERVER_PORT);
        props.put("group.id", Constants.GROUP_ID);
        props.put("max.poll.records", 10); 
        props.put("auto.offset.reset", "earliest"); 
        props.put("enable.auto.commit", "false");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(Constants.MY_TOPIC));
        final int minBatchSize = 100;
        List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                buffer.add(record);
                System.out.println(buffer.size());
            }
            if (buffer.size() >= minBatchSize) {
            	System.out.println("進入手動提交offset");
                insertIntoDb(buffer);
                consumer.commitSync();
                buffer.clear();
            }
            

        }
    }

    private void insertIntoDb(List<ConsumerRecord<String, String>> buffer) {
        for (ConsumerRecord<String, String> record : buffer) {
            System.out.printf("insertIntoDb:offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }


    private void sleep(int seconds) {
        try {
            TimeUnit.SECONDS.sleep(seconds);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

  

package demo2;

import java.net.InetAddress;

public class Constants {

    final static String GROUP_ID = "test_group";
    final static String MY_TOPIC = "test";
    final static String KAFKA_SERVER_ADRESS = "192.168.1.106";
    final static int KAFKA_SERVER_PORT = 9092;

	
}

  

kafka(3)API使用