1. 程式人生 > >Storm-Kafka: Offset lags for kafka not supported for older versions

Storm-Kafka: Offset lags for kafka not supported for older versions

最近又要遷移HADOOP,遷移過程不做表述,因為我使用了Storm實時流元件,之前版本是1.0.2,目前最新版1.2.2. 老版本storm和kafka結合的包是storm-kafka, 新版本的包為storm kafka client。我用老的JAR部署到STORM顯示了一個告警:

Offset lags for kafka not supported for older versions


就是上面kafka spouts Lag裡面沒有資料顯示,這個地方不會影響消費資料,但是使用新的consumer API可以解決。

先新增依賴包,預設scope是provided, 我測試之後發現找不到類,因此手動修改為compile,以便編譯的時候把依賴包打進去。

		<!-- https://mvnrepository.com/artifact/org.apache.storm/storm-kafka-client -->
		<dependency>
			<groupId>org.apache.storm</groupId>
			<artifactId>storm-kafka-client</artifactId>
			<version>1.2.2</version>
			<scope>compile</scope>
		</dependency>
	</dependencies>

修改spout程式碼,使用最新的API:

	TopologyBuilder builder = new TopologyBuilder();
		KafkaSpoutConfig<String,String> config = KafkaSpoutConfig.builder("datanode01-ucloud.isesol.com:9092,datanode02-ucloud.isesol.com:9092,datanode03-ucloud.isesol.com:9092,hue-ucloud.isesol.com:9092",
				"2001")
				.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
				.setGroupId("test-2001")
				.build();
			

老的SPOUT 程式碼為:

		String zkConnString = "datanode01-ucloud.isesol.com,datanode02-ucloud.isesol.com,datanode03-ucloud.isesol.com";
		String topicName = "2001";
		String zkRoot = "/storm";
		BrokerHosts hosts = new ZkHosts(zkConnString);
		SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, zkRoot, "jlwang");
		spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
		spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
		KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

可以看到,老版本是通過ZK來儲存offset, 先版本使用的是__consumer_offset來儲存。通過如下命令查__consumer_offsets:

/opt/cloudera/parcels/KAFKA/bin/kafka-console-consumer   --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config
[test-2001,2001,0]::[OffsetMetadata[783176,{"topologyId":"getkafka-70-1530781168","taskId":4,"threadName":"Thread-11-kafka-reader-executor[4 4]"}],CommitTime 1530785032518,ExpirationTime 1530871432518]
[com.isesol.Raphael.kafka-reader-5008version0.0.25,5008,0]::[OffsetMetadata[3946,{"topologyId":"Kafka2Hbase-5008-57-1530697548","taskId":7,"threadName":"Thread-5-kafka-reader-5008-executor[7 7]"}],CommitTime 1530785039407,ExpirationTime 1530871439407]
[com.isesol.Raphael.kafka-reader-2001trans0.0.36,2001,0]::[OffsetMetadata[783228,{"topologyId":"KafkaToHbase-2001-41-1530611039","taskId":6,"threadName":"Thread-7-kafka-reader-2001-executor[6 6]"}],CommitTime 1530785042006,ExpirationTime 1530871442006]
[test-2001,2001,0]::[OffsetMetadata[783262,{"topologyId":"getkafka-70-1530781168","taskId":4,"threadName":"Thread-11-kafka-reader-executor[4 4]"}],CommitTime 1530785062582,ExpirationTime 1530871462582]
[com.isesol.Raphael.kafka-reader-2001trans0.0.36,2001,0]::[OffsetMetadata[783313,{"topologyId":"KafkaToHbase-2001-41-1530611039","taskId":6,"threadName":"Thread-7-kafka-reader-2001-executor[6 6]"}],CommitTime 1530785072198,ExpirationTime 1530871472198]
稍等片刻即可以檢視到訊息的offsets,最前面的test-2001就是我設定的 group.id.

完整的storm-kafka-hbase程式碼之前也發過,這次僅僅是使用了新的API:

package com.isesol.storm;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.*;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.generated.Grouping;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.grouping.CustomStreamGrouping;
import org.apache.storm.hbase.bolt.HBaseBolt;
import org.apache.storm.hbase.bolt.mapper.SimpleHBaseMapper;
import org.apache.storm.shade.com.google.common.collect.Maps;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.task.WorkerTopologyContext;
import org.apache.storm.topology.InputDeclarer;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.StringScheme;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;

import java.security.MessageDigest;
import java.security.acl.Group;

public class getKafka {

	public static void main(String[] args)
			throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
	/*	String zkConnString = "datanode01-ucloud.isesol.com,datanode02-ucloud.isesol.com,datanode03-ucloud.isesol.com";
		String topicName = "2001";
		String zkRoot = "/storm";
		BrokerHosts hosts = new ZkHosts(zkConnString);
		SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, zkRoot, "jlwang");
		spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
		spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
		KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); */

		List<String> fieldNameList = new ArrayList<String>();
		// fieldNameList.add("rowkey");
		// for (Field field : Topic2001.class.getDeclaredFields()) {
		// fieldNameList.add(field.getName());
		// }

		TopologyBuilder builder = new TopologyBuilder();
		KafkaSpoutConfig<String, String> config = KafkaSpoutConfig
				.builder(
						"datanode01-ucloud.isesol.com:9092,datanode02-ucloud.isesol.com:9092,datanode03-ucloud.isesol.com:9092,hue-ucloud.isesol.com:9092",
						"2001")
				.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
				.setGroupId("test-2001").build();

		fieldNameList.add("line");

		builder.setSpout("kafka-reader", new KafkaSpout<>(config), 1);
		// builder.setSpout("kafka-reader", kafkaSpout, 1).setNumTasks(1);
		builder.setBolt("kafkaBolt", new kafkaBolt(), 1).shuffleGrouping("kafka-reader");
		builder.setBolt("transferBolt", new transferBolt(), 1).shuffleGrouping("kafkaBolt");
		Config conf = new Config();
		Map<String, String> HBConfig = Maps.newHashMap();
		HBConfig.put("hbase.zookeeper.property.clientPort", "2181");
		HBConfig.put("hbase.zookeeper.quorum",
				"datanode01-ucloud.isesol.com:2181,datanode02-ucloud.isesol.com:2181,datanode03-ucloud.isesol.com:2181");
		HBConfig.put("zookeeper.znode.parent", "/hbase");

		conf.put("HBCONFIG", HBConfig);
		SimpleHBaseMapper mapper = new SimpleHBaseMapper();
		mapper.withColumnFamily("cf");
		mapper.withColumnFields(new Fields(fieldNameList));
		mapper.withRowKeyField("rowkey");
		HBaseBolt hBaseBolt = new HBaseBolt("test", mapper).withConfigKey("HBCONFIG");
		hBaseBolt.withFlushIntervalSecs(15);
		hBaseBolt.withBatchSize(5000);
		builder.setBolt("hbasehandler", hBaseBolt, 1).shuffleGrouping("transferBolt");
		String name = getKafka.class.getSimpleName();
		conf.setNumWorkers(2);
		// conf.setNumAckers(20);
		// conf.setNumEventLoggers(1);
		// conf.setMaxSpoutPending(20000);
		conf.setMessageTimeoutSecs(90);
		// LocalCluster localCluster = new LocalCluster();
		// localCluster.submitTopology(name, conf, builder.createTopology());
		StormSubmitter.submitTopology("getkafka", conf, builder.createTopology());
		// Utils.sleep(9999999);

	}
}

class transferBolt extends BaseRichBolt {

	private Map conf;
	private TopologyContext context;
	private OutputCollector collector;

	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
		// TODO Auto-generated method stub

		this.conf = stormConf;
		this.context = context;
		this.collector = collector;

	}

	public void execute(Tuple input) {
		// TODO Auto-generated method stub

		try {
			String line = input.getString(0);
			collector.emit(input, new Values(UUID.randomUUID().toString(), line));
			collector.ack(input);
		} catch (Exception ex) {
			collector.fail(input);
		}

	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// TODO Auto-generated method stub
		declarer.declare(new Fields("rowkey", "line"));
	}

}

class kafkaBolt extends BaseRichBolt {

	private Map conf;
	private TopologyContext context;
	private OutputCollector collector;

	public void execute(Tuple input) {
		// TODO Auto-generated method stub
		try {
			String line = input.getString(0);
			collector.emit(input, new Values(line));
			collector.ack(input);
		} catch (Exception ex) {
			collector.fail(input);
		}

	}

	public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
		// TODO Auto-generated method stub
		this.conf = arg0;
		this.context = arg1;
		this.collector = arg2;
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// TODO Auto-generated method stub

		declarer.declare(new Fields("line"));

	}
}

class HandlerBolt extends BaseRichBolt {

	ObjectMapper objectMapper;
	List<String> fieldNameList;
	private Map conf;
	private TopologyContext context;
	private OutputCollector collector;

	public HandlerBolt(List<String> fieldNameList) {
		this.fieldNameList = fieldNameList;

	}

	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
		this.conf = stormConf;
		this.context = context;
		this.collector = collector;
		this.objectMapper = new ObjectMapper();
		if (this.fieldNameList == null) {
			this.fieldNameList = new ArrayList<String>();
			fieldNameList.add("rowkey");
			for (Field field : Topic2001.class.getDeclaredFields()) {
				this.fieldNameList.add(field.getName());
			}
		}

	}

	public static String getMD5(String message) {
		String md5str = "";
		try {
			// 1 建立一個提供資訊摘要演算法的物件,初始化為md5演算法物件
			MessageDigest md = MessageDigest.getInstance("MD5");

			// 2 將訊息變成byte陣列
			byte[] input = message.getBytes();

			// 3 計算後獲得位元組陣列,這就是那128位了
			byte[] buff = md.digest(input);

			// 4 把陣列每一位元組(一個位元組佔八位)換成16進位制連成md5字串
			md5str = bytesToHex(buff);

		} catch (Exception e) {
			e.printStackTrace();
		}
		return md5str;
	}

	public static String bytesToHex(byte[] bytes) {
		StringBuffer md5str = new StringBuffer();
		// 把陣列每一位元組換成16進位制連成md5字串
		int digital;
		for (int i = 0; i < bytes.length; i++) {
			digital = bytes[i];

			if (digital < 0) {
				digital += 256;
			}
			if (digital < 16) {
				md5str.append("0");
			}
			md5str.append(Integer.toHexString(digital));
		}
		return md5str.toString().toUpperCase();
	}

	public void execute(Tuple input) {
		try {
			String jsonStr = input.getString(0);
			Map<String, Object> objMap = null;
			objMap = objectMapper.readValue(jsonStr, Map.class);

			String rowKey = String.valueOf(objMap.get("rowKey"));
			String contentStr = String.valueOf(objMap.get("messageContent"));
			Map contentMap;
			contentMap = objectMapper.readValue(contentStr, Map.class);

			List<Object> content = new ArrayList<Object>();

			for (String fieldName : fieldNameList) {
				if ("rowkey".equals(fieldName)) {
					content.add(rowKey);
				} else {
					Object fieldContent = contentMap.get(fieldName);
					content.add(fieldContent == null ? "" : String.valueOf(fieldContent));
				}
			}
			Values outPut = new Values();
			outPut.addAll(content);
			collector.emit(input, outPut);
			collector.ack(input);
		} catch (Exception e) {
			e.printStackTrace();
			collector.fail(input);
		}
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {

		Fields fields = new Fields(this.fieldNameList);
		declarer.declare(fields);
	}

}

完整的pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<repositories>
		<repository>
			<id>cloudera</id>
			<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
		</repository>
	</repositories>
	<groupId>com.isesol</groupId>
	<artifactId>storm</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>storm</name>
	<url>http://maven.apache.org</url>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	</properties>

	<dependencies>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>3.8.1</version>
			<scope>test</scope>
		</dependency>

		<dependency>
			<groupId>org.apache.storm</groupId>
			<artifactId>storm-core</artifactId>
			<version>1.0.2</version>
			<scope>provided</scope>
		</dependency>
		<!-- 0.9.0-kafka-2.0.1 -->
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka_2.10</artifactId>
			<version>0.9.0-kafka-2.0.1</version>
			<exclusions>
				<exclusion>
					<groupId>org.slf4j</groupId>
					<artifactId>slf4j-log4j12</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.apache.storm/storm-kafka -->

		<dependency>
			<groupId>org.apache.storm</groupId>
			<artifactId>storm-kafka</artifactId>
			<version>1.0.0</version>
		</dependency>


		<!-- https://mvnrepository.com/artifact/org.apache.storm/storm-kafka -->




		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>0.9.0-kafka-2.0.0</version>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.apache.storm/storm-hbase -->
		<dependency>
			<groupId>org.apache.storm</groupId>
			<artifactId>storm-hbase</artifactId>
			<version>1.1.0</version>
			<exclusions>
				<exclusion>
					<groupId>jdk.tools</groupId>
					<artifactId>jdk.tools</artifactId>
				</exclusion>
			</exclusions>
		</dependency>

		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>0.11.0-kafka-3.0.0</version>
		</dependency>

		<!-- https://mvnrepository.com/artifact/org.apache.storm/storm-kafka-client -->
		<dependency>
			<groupId>org.apache.storm</groupId>
			<artifactId>storm-kafka-client</artifactId>
			<version>1.2.2</version>
			<scope>compile</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<artifactId>maven-assembly-plugin</artifactId>
				<version>2.6</version>
				<configuration>
					<archive>
						<manifest>
							<mainClass>com.isesol.storm.getKafka</mainClass>
						</manifest>
					</archive>
					<descriptorRefs>
						<descriptorRef>jar-with-dependencies</descriptorRef>
					</descriptorRefs>
				</configuration>
				<executions>
					<execution>
						<id>make-assembly</id>
						<phase>package</phase>
						<goals>
							<goal>assembly</goal>
						</goals>
					</execution>
				</executions>
			</plugin>


			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.1</version>
				<configuration>
					<source>1.8</source>
					<target>1.8</target>
					<encoding>UTF-8</encoding>
				</configuration>
			</plugin>
		</plugins>
	</build>
</project>