Kafka 安裝與部署(單機版)與kafkaDemo除錯測試(包含JAVA Demo)
1. kafka_2.10-0.10.2.0.tar
1.解壓kafka_2.10-0.10.2.0安裝包
tar -xvf kafka_2.10-0.10.2.0.tar
2.配置kafka
cd /software/kafka_2.10-0.10.2.0/conf
(1) server.properties
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # see kafka.server.KafkaConfig for additional details and defaults ############################# Server Basics ############################# # The id of the broker. This must be set to a unique integer for each broker. broker.id=0 # Switch to enable topic deletion or not, default value is false #delete.topic.enable=true ############################# Socket Server Settings ############################# # The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured. # FORMAT: # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 listeners=PLAINTEXT://192.168.1.104:9092 # Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). advertised.listeners=PLAINTEXT://192.168.1.104:9092 hostname=192.168.1.104 # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL # The number of threads handling network requests num.network.threads=3 # The number of threads doing disk I/O num.io.threads=8 # The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=102400 # The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=102400 # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 ############################# Log Basics ############################# # A comma seperated list of directories under which to store log files log.dirs=/tmp/kafka-logs # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. num.partitions=1 # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. # This value is recommended to be increased for installations with data dirs located in RAID array. num.recovery.threads.per.data.dir=1 ############################# Log Flush Policy ############################# # Messages are immediately written to the filesystem but by default we only fsync() to sync # the OS cache lazily. The following configurations control the flush of data to disk. # There are a few important trade-offs here: # 1. Durability: Unflushed data may be lost if you are not using replication. # 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. # The settings below allow one to configure the flush policy to flush data after a period of time or # every N messages (or both). This can be done globally and overridden on a per-topic basis. # The number of messages to accept before forcing a flush of data to disk #log.flush.interval.messages=10000 # The maximum amount of time a message can sit in a log before we force a flush #log.flush.interval.ms=1000 ############################# Log Retention Policy ############################# # The following configurations control the disposal of log segments. The policy can # be set to delete segments after a period of time, or after a given size has accumulated. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens # from the end of the log. # The minimum age of a log file to be eligible for deletion due to age log.retention.hours=168 # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining # segments don't drop below log.retention.bytes. Functions independently of log.retention.hours. #log.retention.bytes=1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=300000 ############################# Zookeeper ############################# # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=192.168.1.104:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=6000
3.啟動kafka
啟動zookeeper:nohup bin/zookeeper-server-start.sh config/zookeeper.properties 1>zookeeper.log 2>zookeeper.err &
啟動kafka:nohup bin/kafka-server-start.sh config/server.properties &
4.單機測試:
(1)生產者
bin/kafka-console-producer.sh --broker-list 192.168.1.104:9092 --topic test
輸進訊息: lmx
(2)消費者
bin/kafka-console-consumer.sh --zookeeper 192.168.1.104:2181 --topic test --from-beginning 收到訊息: lmx
4.JAVA程式碼測試:
(1)配置類:ConfigureAPI.class
package kafkaDemo; public class ConfigureAPI { public final static String GROUP_ID = "test"; public final static String TOPIC = "test-lmx"; public final static int BUFFER_SIZE = 64 * 1024; public final static int TIMEOUT = 20000; public final static int INTERVAL = 10000; public final static String BROKER_LIST = "192.168.1.104:9092,192.168.1.105:9092"; // 去資料間隔 public final static int GET_MEG_INTERVAL = 1000; }
( 2 ) 生產者類:JProducer.class
package kafkaDemo;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class JProducer implements Runnable
{
private Producer<String, String> producer;
public JProducer()
{
Properties props = new Properties();
props.put("bootstrap.servers", ConfigureAPI.BROKER_LIST);
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("request.required.acks", "-1");
producer = new KafkaProducer<String, String>(props);
}
@Override
public void run()
{
// TODO Auto-generated method stub
try
{
String data = "hello lmx!";
producer.send(new ProducerRecord<String, String>(ConfigureAPI.TOPIC, data));
System.out.println(data);
}
catch (Exception e)
{
// TODO: handle exception
e.getStackTrace();
}
finally
{
producer.close();
}
}
public static void main(String[] args)
{
ExecutorService threadPool = Executors.newCachedThreadPool();
threadPool.execute(new JProducer());
threadPool.shutdown();
}
}
執行效果:
package kafkaDemo;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;
public class JConsumer implements Runnable
{
private KafkaConsumer<String, String> consumer;
private JConsumer()
{
Properties props = new Properties();
props.put("bootstrap.servers", ConfigureAPI.BROKER_LIST);
props.put("group.id", ConfigureAPI.GROUP_ID);
props.put("enable.auto.commit", true);
props.put("auto.commit.interval.ms", 1000);
props.put("session.timeout.ms", 30000);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(ConfigureAPI.TOPIC)); // 多個topic逗號隔開
}
@Override
public void run()
{
// TODO Auto-generated method stub
while (true)
{
System.out.println("poll Server message");
ConsumerRecords<String, String> records = consumer.poll(ConfigureAPI.GET_MEG_INTERVAL);
for (ConsumerRecord<String, String> record : records)
{
handleMeg(record.value());
}
}
}
private void handleMeg(String record)
{
System.out.println(record);
}
public static void main(String[] args)
{
ExecutorService threadPool = Executors.newCachedThreadPool();
threadPool.execute(new JConsumer());
threadPool.shutdown();
}
}
執行效果:
附屬某些錯誤解決辦法:
(1) 錯誤:Unable to connect to zookeeper server '192.168.1.104:2181' with timeout of 4000 ms
解決辦法:
1.防火牆要關閉
使用service iptables stop 關閉防火牆
使用service iptables status確認
使用chkconfig iptables off禁用防火牆
2.只打開2181埠
iptables -I INPUT -p tcp --dport 2181 -j ACCEPT
(2) 錯誤:kafka Failed to send messages after 3 tries
解決辦法:
修改server.properties
listeners=PLAINTEXT://192.168.1.104:9092
advertised.listeners=PLAINTEXT://192.168.1.104:9092
hostname=192.168.1.104
總結不好多多擔待,文章只單純個人總結,如不好勿噴,技術有限,有錯漏麻煩指正提出。本人QQ:373965070
相關推薦
Kafka 安裝與部署(單機版)與kafkaDemo除錯測試(包含JAVA Demo)
部署需要的包:1. kafka_2.10-0.10.2.0.tar1.解壓kafka_2.10-0.10.2.0安裝包tar -xvf kafka_2.10-0.10.2.0.tar 2.配置kafkacd /software/kafka_2.10-0.10.2.0/con
Hbase的安裝與部署(叢集版)
HBase 部署與使用 部署 Zookeeper 正常部署 $ ~/modules/zookeeper-3.4.5/bin/zkServer.sh start 首先保證 Zookeeper 叢集的正常部署,並啟動之: Hadoop 正常部署 $ ~/modules/hadoop-2.7.2/sbi
一步教你Docker安裝搭建redis(單機版)
1.Docker 安裝 Redis 方案一:使用docker拉取映象 查詢Docker Hub上的redis映象 #docker search redis 拉取
hadoop2.7.3在centos7上部署安裝(單機版)
hadoop2.7.3在centos7上部署安裝(單機版) (1)hadoop2.7.3下載 (前提:先安裝java環境) 下載地址:http://hadoop.apache.org/releases.html (注意是binary檔案,source那個是原始
redis原始碼安裝(單機版)
● 下載redis3的穩定版本 ● 上傳至linux解壓 [[email protected] ~]# tar -zxvf redis-3.2.12.tar.gz -C /usr/local/src/ ● 進入原始碼包編譯並安裝 [[email protec
Hadoop部署(三)——CentOS 7部署Hadoop(單機版)
測試環境 Linux系統版本:CentOS 7 64位 Hadoop部署方式介紹 Hadoop部署方式分三種:Standalone Mode(單機模式)、Pseudo-Distributed Mode(偽分散式模式)、Fully Distributed Mode(全
Hadoop 和 Hbase 的安裝與配置 (單機模式)
(一定要看最後我趟過的坑,如果安裝過程有問題,可參考最後我列出的問題及解決方法) 下載Hadoop安裝包 這裡安裝版本:hadoop-1.0.4.tar.gz 在安裝Hadoop之前,伺服器上一定要有安裝的jdk jdk安裝方式之一:在官網上下載Linux下的rpm
openshift/origin學習記錄(1)——基於二進位制檔案的安裝(單機版)
先決條件 開啟SELINUX 官方文件推薦開啟SELINUX,否則會導致安裝失敗。 修改/etc/selinux/config SELINUX=enforcing SELINUXTYPE=targeted 安裝docker # y
docker中安裝redis和zookeeper方法,親測可用(單機版)
比較好用的映象地址: docker pull daocloud.io/daocloud/zookeeper:3.4.10 安裝redis: ①通過pull命令下載映象,映象地址可以使用上面的 ②docker啟動redis映象容器,使用以下命令,親測好用 docker run
SQL Server安裝與使用(破解版)
【注】博主使用的是SQL Server2012 其他版本的安裝類似。 【第一步】下載軟體 連結:https://www.microsoft.com/zh-cn/download/details.aspx?id=29066 32位的Windows 7作業系統,只需下載列
【一】linux安裝redis(單機版)、3種啟動方式、及配置檔案介紹。
環境ubuntu16.04 解壓 tar -zxvf redis-3.2.6.tar.gz 修改資料夾名稱 mv redis-3.2.6 redis 編譯 cd /app/redis make 編譯好後會看到redis.conf和src檔案 安裝 cd
分散式--CentOS安裝zookeeper(單機版)
開啟我們的分散式之路吧!!! (1)官網下載zookeeper,我這裡使用的是3.4.5版本。下載地址 並且我們要給自己的伺服器配置hosts先 $ vi /etc/hosts (2)解壓唄,我們這就不解壓在usr/local那裡了,
數字圖像處理原理與實踐(MATLAB版)勘誤表
blog 核心 灰度變換 圖像復原 京東 .html href target 數字圖像處理 本文系《數字圖像處理原理與實踐(MATLAB版)》一書的勘誤表。【內容簡單介紹】本書全面系統地介紹了數字圖像處理技術的理論與方法,內容涉及幾何變換、灰度變換、圖像增強、圖像切割、
使用Spring Data Redis操作Redis(單機版)
nec one com list() 研究 enc keys wire 設置ip Jedis是一款Java連接Redis的客戶端,Spring基於Jedis進行了封裝,提供了簡潔的操作Redis的方法。那就是Spring Data Redis。其實後來研究發現,Spring
php-擴展編譯安裝擴展(通用版)
php編譯擴展 redis編譯安裝這裏以安裝redis擴展為例,其它擴展可以大體仿照如此過程: 1.到 pecl.php.net 搜索 redis 2.下載 stable 版(穩定版)擴展 3.解壓 4.執行 /php/path/bin/phpize (作用是檢測 php 的內核版本,並為擴展生成相應的編譯
白話windows server 2012 r2和windows 7創建ad域與配置(安全版)
關閉 logo 打不開 cmd 2012 r2 以太網 子網 int 180天 文章的可讀性非常重要,這裏提供的是一鍵式操作指南,即使之前完全沒有接觸,也可以配置完成。 ad域的創建是為了便於公司的集中化管理,提高公司運作效率和安全性。 我的操作環境,本機是kali lin
Visual Studio 2017 安裝使用教程(詳細版)
系統設置 -s 分享圖片 代碼 ++ 官網下載 studio 題解 微軟官網 Visual Studio 2017 安裝使用教程(詳細) 本人曾因無法使用vs編寫C語言程序痛苦一個月之久,實乃慚愧,後發現不少同學也同樣存在著相同問題,其原因歸結於網上的各種教程
《矩陣分析與應用(第二版)張賢達》PDF
image aid images db4 粘貼 proc com follow process 下載:https://pan.baidu.com/s/1fbhJ4I2MNKozlYkFiadCoA 《矩陣分析與應用(第二版)張賢達》PDF帶目錄和書簽,文字可以復制粘貼。經典
Ubuntu下Laravel的開發環境安裝及部署(Vagrant + Homestead)
2018-2-6 更新 注意! laravel/homestead box專案地址已經不再是原來的 https://atlas.hashicorp.com/laravel/boxes/homestead,而已經變更成 https://app.vagrantup.com/laravel/
Schedule 排程系統設計(單機版)
鑑於對Spring實現的@Scheduled的排程和SchedulerFactoryBean的研究發現,基於Spring的排程封裝雖滿足了大多需求,但為了簡化使用方式使得Job並不容易得到控制,導致開發對Job的控制和運維成本上升;下面是本人基於Quartz和Spring及Annotation開發的單機