Storm-Kafka: Offset lags for kafka not supported for older versions
阿新 • • 發佈:2018-12-27
最近又要遷移HADOOP,遷移過程不做表述,因為我使用了Storm實時流元件,之前版本是1.0.2,目前最新版1.2.2. 老版本storm和kafka結合的包是storm-kafka, 新版本的包為storm kafka client。我用老的JAR部署到STORM顯示了一個告警:
Offset lags for kafka not supported for older versions
就是上面kafka spouts Lag裡面沒有資料顯示,這個地方不會影響消費資料,但是使用新的consumer API可以解決。
先新增依賴包,預設scope是provided, 我測試之後發現找不到類,因此手動修改為compile,以便編譯的時候把依賴包打進去。
<!-- https://mvnrepository.com/artifact/org.apache.storm/storm-kafka-client --> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka-client</artifactId> <version>1.2.2</version> <scope>compile</scope> </dependency> </dependencies>
修改spout程式碼,使用最新的API:
TopologyBuilder builder = new TopologyBuilder(); KafkaSpoutConfig<String,String> config = KafkaSpoutConfig.builder("datanode01-ucloud.isesol.com:9092,datanode02-ucloud.isesol.com:9092,datanode03-ucloud.isesol.com:9092,hue-ucloud.isesol.com:9092", "2001") .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST) .setGroupId("test-2001") .build();
老的SPOUT 程式碼為:
String zkConnString = "datanode01-ucloud.isesol.com,datanode02-ucloud.isesol.com,datanode03-ucloud.isesol.com";
String topicName = "2001";
String zkRoot = "/storm";
BrokerHosts hosts = new ZkHosts(zkConnString);
SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, zkRoot, "jlwang");
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
可以看到,老版本是通過ZK來儲存offset, 先版本使用的是__consumer_offset來儲存。通過如下命令查__consumer_offsets:
/opt/cloudera/parcels/KAFKA/bin/kafka-console-consumer --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config
[test-2001,2001,0]::[OffsetMetadata[783176,{"topologyId":"getkafka-70-1530781168","taskId":4,"threadName":"Thread-11-kafka-reader-executor[4 4]"}],CommitTime 1530785032518,ExpirationTime 1530871432518]
[com.isesol.Raphael.kafka-reader-5008version0.0.25,5008,0]::[OffsetMetadata[3946,{"topologyId":"Kafka2Hbase-5008-57-1530697548","taskId":7,"threadName":"Thread-5-kafka-reader-5008-executor[7 7]"}],CommitTime 1530785039407,ExpirationTime 1530871439407]
[com.isesol.Raphael.kafka-reader-2001trans0.0.36,2001,0]::[OffsetMetadata[783228,{"topologyId":"KafkaToHbase-2001-41-1530611039","taskId":6,"threadName":"Thread-7-kafka-reader-2001-executor[6 6]"}],CommitTime 1530785042006,ExpirationTime 1530871442006]
[test-2001,2001,0]::[OffsetMetadata[783262,{"topologyId":"getkafka-70-1530781168","taskId":4,"threadName":"Thread-11-kafka-reader-executor[4 4]"}],CommitTime 1530785062582,ExpirationTime 1530871462582]
[com.isesol.Raphael.kafka-reader-2001trans0.0.36,2001,0]::[OffsetMetadata[783313,{"topologyId":"KafkaToHbase-2001-41-1530611039","taskId":6,"threadName":"Thread-7-kafka-reader-2001-executor[6 6]"}],CommitTime 1530785072198,ExpirationTime 1530871472198]
稍等片刻即可以檢視到訊息的offsets,最前面的test-2001就是我設定的 group.id.
完整的storm-kafka-hbase程式碼之前也發過,這次僅僅是使用了新的API:
package com.isesol.storm;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.*;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.generated.Grouping;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.grouping.CustomStreamGrouping;
import org.apache.storm.hbase.bolt.HBaseBolt;
import org.apache.storm.hbase.bolt.mapper.SimpleHBaseMapper;
import org.apache.storm.shade.com.google.common.collect.Maps;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.task.WorkerTopologyContext;
import org.apache.storm.topology.InputDeclarer;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.StringScheme;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import java.security.MessageDigest;
import java.security.acl.Group;
public class getKafka {
public static void main(String[] args)
throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
/* String zkConnString = "datanode01-ucloud.isesol.com,datanode02-ucloud.isesol.com,datanode03-ucloud.isesol.com";
String topicName = "2001";
String zkRoot = "/storm";
BrokerHosts hosts = new ZkHosts(zkConnString);
SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, zkRoot, "jlwang");
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); */
List<String> fieldNameList = new ArrayList<String>();
// fieldNameList.add("rowkey");
// for (Field field : Topic2001.class.getDeclaredFields()) {
// fieldNameList.add(field.getName());
// }
TopologyBuilder builder = new TopologyBuilder();
KafkaSpoutConfig<String, String> config = KafkaSpoutConfig
.builder(
"datanode01-ucloud.isesol.com:9092,datanode02-ucloud.isesol.com:9092,datanode03-ucloud.isesol.com:9092,hue-ucloud.isesol.com:9092",
"2001")
.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
.setGroupId("test-2001").build();
fieldNameList.add("line");
builder.setSpout("kafka-reader", new KafkaSpout<>(config), 1);
// builder.setSpout("kafka-reader", kafkaSpout, 1).setNumTasks(1);
builder.setBolt("kafkaBolt", new kafkaBolt(), 1).shuffleGrouping("kafka-reader");
builder.setBolt("transferBolt", new transferBolt(), 1).shuffleGrouping("kafkaBolt");
Config conf = new Config();
Map<String, String> HBConfig = Maps.newHashMap();
HBConfig.put("hbase.zookeeper.property.clientPort", "2181");
HBConfig.put("hbase.zookeeper.quorum",
"datanode01-ucloud.isesol.com:2181,datanode02-ucloud.isesol.com:2181,datanode03-ucloud.isesol.com:2181");
HBConfig.put("zookeeper.znode.parent", "/hbase");
conf.put("HBCONFIG", HBConfig);
SimpleHBaseMapper mapper = new SimpleHBaseMapper();
mapper.withColumnFamily("cf");
mapper.withColumnFields(new Fields(fieldNameList));
mapper.withRowKeyField("rowkey");
HBaseBolt hBaseBolt = new HBaseBolt("test", mapper).withConfigKey("HBCONFIG");
hBaseBolt.withFlushIntervalSecs(15);
hBaseBolt.withBatchSize(5000);
builder.setBolt("hbasehandler", hBaseBolt, 1).shuffleGrouping("transferBolt");
String name = getKafka.class.getSimpleName();
conf.setNumWorkers(2);
// conf.setNumAckers(20);
// conf.setNumEventLoggers(1);
// conf.setMaxSpoutPending(20000);
conf.setMessageTimeoutSecs(90);
// LocalCluster localCluster = new LocalCluster();
// localCluster.submitTopology(name, conf, builder.createTopology());
StormSubmitter.submitTopology("getkafka", conf, builder.createTopology());
// Utils.sleep(9999999);
}
}
class transferBolt extends BaseRichBolt {
private Map conf;
private TopologyContext context;
private OutputCollector collector;
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
// TODO Auto-generated method stub
this.conf = stormConf;
this.context = context;
this.collector = collector;
}
public void execute(Tuple input) {
// TODO Auto-generated method stub
try {
String line = input.getString(0);
collector.emit(input, new Values(UUID.randomUUID().toString(), line));
collector.ack(input);
} catch (Exception ex) {
collector.fail(input);
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
declarer.declare(new Fields("rowkey", "line"));
}
}
class kafkaBolt extends BaseRichBolt {
private Map conf;
private TopologyContext context;
private OutputCollector collector;
public void execute(Tuple input) {
// TODO Auto-generated method stub
try {
String line = input.getString(0);
collector.emit(input, new Values(line));
collector.ack(input);
} catch (Exception ex) {
collector.fail(input);
}
}
public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
// TODO Auto-generated method stub
this.conf = arg0;
this.context = arg1;
this.collector = arg2;
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
declarer.declare(new Fields("line"));
}
}
class HandlerBolt extends BaseRichBolt {
ObjectMapper objectMapper;
List<String> fieldNameList;
private Map conf;
private TopologyContext context;
private OutputCollector collector;
public HandlerBolt(List<String> fieldNameList) {
this.fieldNameList = fieldNameList;
}
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.conf = stormConf;
this.context = context;
this.collector = collector;
this.objectMapper = new ObjectMapper();
if (this.fieldNameList == null) {
this.fieldNameList = new ArrayList<String>();
fieldNameList.add("rowkey");
for (Field field : Topic2001.class.getDeclaredFields()) {
this.fieldNameList.add(field.getName());
}
}
}
public static String getMD5(String message) {
String md5str = "";
try {
// 1 建立一個提供資訊摘要演算法的物件,初始化為md5演算法物件
MessageDigest md = MessageDigest.getInstance("MD5");
// 2 將訊息變成byte陣列
byte[] input = message.getBytes();
// 3 計算後獲得位元組陣列,這就是那128位了
byte[] buff = md.digest(input);
// 4 把陣列每一位元組(一個位元組佔八位)換成16進位制連成md5字串
md5str = bytesToHex(buff);
} catch (Exception e) {
e.printStackTrace();
}
return md5str;
}
public static String bytesToHex(byte[] bytes) {
StringBuffer md5str = new StringBuffer();
// 把陣列每一位元組換成16進位制連成md5字串
int digital;
for (int i = 0; i < bytes.length; i++) {
digital = bytes[i];
if (digital < 0) {
digital += 256;
}
if (digital < 16) {
md5str.append("0");
}
md5str.append(Integer.toHexString(digital));
}
return md5str.toString().toUpperCase();
}
public void execute(Tuple input) {
try {
String jsonStr = input.getString(0);
Map<String, Object> objMap = null;
objMap = objectMapper.readValue(jsonStr, Map.class);
String rowKey = String.valueOf(objMap.get("rowKey"));
String contentStr = String.valueOf(objMap.get("messageContent"));
Map contentMap;
contentMap = objectMapper.readValue(contentStr, Map.class);
List<Object> content = new ArrayList<Object>();
for (String fieldName : fieldNameList) {
if ("rowkey".equals(fieldName)) {
content.add(rowKey);
} else {
Object fieldContent = contentMap.get(fieldName);
content.add(fieldContent == null ? "" : String.valueOf(fieldContent));
}
}
Values outPut = new Values();
outPut.addAll(content);
collector.emit(input, outPut);
collector.ack(input);
} catch (Exception e) {
e.printStackTrace();
collector.fail(input);
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
Fields fields = new Fields(this.fieldNameList);
declarer.declare(fields);
}
}
完整的pom.xml:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<groupId>com.isesol</groupId>
<artifactId>storm</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>storm</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.0.2</version>
<scope>provided</scope>
</dependency>
<!-- 0.9.0-kafka-2.0.1 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.9.0-kafka-2.0.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.storm/storm-kafka -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>1.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.storm/storm-kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0-kafka-2.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.storm/storm-hbase -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-hbase</artifactId>
<version>1.1.0</version>
<exclusions>
<exclusion>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0-kafka-3.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.storm/storm-kafka-client -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>1.2.2</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.6</version>
<configuration>
<archive>
<manifest>
<mainClass>com.isesol.storm.getKafka</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>assembly</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>