1. 程式人生 > 其它 >阿里雲ECS部署單機kafka 並對外提供服務(帶認證)

阿里雲ECS部署單機kafka 並對外提供服務(帶認證)

1. zk配置(單機)

zoo.cfg

tickTime=2000
initLimit=10
syncLimit=5
dataLogDir=/data/zookeeper/log/
dataDir=/data/zookeeper/data
clientPort=2181
server.1= *.*.*.*:2888:3888

只配置一個,為單機啟動

./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /data/zookeeper/bin/../conf/zoo.cfg
Mode: standalone

2. kafka配置

cat server.properties |grep -v '#'|grep -v '^$'
broker.id=0
listeners=SASL_PLAINTEXT

://本機IP:9092
advertised.listeners=SASL_PLAINTEXT://外網對映的IP與埠
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka270/datalog
num.partitions=3
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=zk的地址與埠

zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
delete.topic.enable=true
log.cleaner.enable=true
auto.create.topics.enable=true
default.replication.factor=1
auto.leader.rebalance.enable=true
request.required.acks =-1

注意中文字的要替換成自己的,其它紅色是認證必須要增加的。

在config目錄增加兩個檔案

cat kafka_server_jaas.conf
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin"
user_admin="admin";
};

注意:user_admin後面為username 的使用者名稱,user_admin="admin"為下面的password的密碼。要完全對應。

cat kafka_client_jaas.conf
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin";
};

與上面的配置要對應

修改兩個配置

cat consumer.properties |grep -v '#'|grep -v '^$'
bootstrap.servers=內部監聽的IP與埠
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
group.id=test-consumer-group

紅色為新增加的

cat producer.properties |grep -v '#'|grep -v '^$'
bootstrap.servers=內部監聽的IP與埠
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
compression.type=none

同上面

增加配置變數

cat /etc/profile

export KAFKA_OPTS=-Djava.security.auth.login.config=你的配置檔案路徑/kafka_server_jaas.conf

生產指令碼與消費指令碼增加客戶端配置

cat kafka-console-producer.sh |grep -v '#'|grep -v '^$'
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M"
fi
if [ "x$KAFKA_OPTS" ]; then
export KAFKA_OPTS="-Djava.security.auth.login.config=你的配置檔案路徑/kafka_client_jaas.conf"
fi
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"

紅色為後期增加

cat kafka-console-consumer.sh |grep -v '#'|grep -v '^$'
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M"
fi
if [ "x$KAFKA_OPTS" ]; then export KAFKA_OPTS="-Djava.security.auth.login.config=你的配置檔案路徑/kafka_client_jaas.conf"
fi
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"

同上面

測試是否正常

生產

./kafka-console-producer.sh --bootstrap-server 內部監聽IP與埠 --topic test1 --producer.config ../config/producer.properties

消費訊息

./kafka-console-consumer.sh --bootstrap-server 內部監聽IP與埠 --from-beginning --topic test1 --consumer.config ../config/consumer.properties 

生產裡輸入資訊,訊息裡可以看到。為正常