Storm-Kafka使用筆記(一):Scheme和Mapper
一、背景
因為是專案驅動的去了解Storm-kafka,所以會由於分工去先了解某一部分,順序有點雜亂。
二、Storm-Kafka介紹
瞭解Storm的都知道,Storm拓撲中比較基本的就是Spout和Bolt,Spout作為資料發射源,可以從資料庫或者其他地方取得資料然後發射出去,Bolt就是中間一個個運算處理的元件,最後一個Bolt可以擔任資料處理結果彙總或者資料落地的角色。
三、Storm-Kafka為我們提供了什麼
最先了解的是KafkaSpout和KafkaBolt,顯而易見就是把我們上面說的Spout和最後一個Bolt的功用具體化,具體到KafkaSpout就是從Kafka取資料來源的Spout,而KafkaBolt就是把資料處理結果轉化為一定格式,傳送到Kafka的Bolt。
但是,在這之中我們需要有一些規約,就是Spout從Kafka拿到資料後我要怎麼處理轉換成Storm中的資料格式–Tuple,還有Bolt要怎麼把接收到的Tuple轉換成Kafka的格式傳送到Kafka,這就涉及到另外兩個基礎的概念,Scheme和Mapper,它們分別說明了我們上面的規約,並把資料進行轉換再返回。
Scheme就實現了從byte[]到其他格式的轉換(預設提供的是從位元組流到字串的轉換)。Mapper就實現了從Tuple到其他格式的轉換(預設提供的是從Tuple取Field為key的作為key返回,取Field為message的作為message轉換),也讓我們可以個性化實現(當然Storm-Kafka也提供了預設的)。
四、Mapper
Mapper介面:
package org.apache.storm.kafka.bolt.mapper;
import org.apache.storm.tuple.Tuple;
import java.io.Serializable;
/**
* as the really verbose name suggests this interface mapps a storm tuple to kafka key and message.
* @param <K> type of key.
* @param <V> type of value.
*/
public interface TupleToKafkaMapper<K,V> extends Serializable {
K getKeyFromTuple(Tuple tuple);
V getMessageFromTuple(Tuple tuple);
}
可以看到Mapper接口裡面只有兩個方法,分別是獲取Key和Message,對應的就是傳送給Kafka的資訊裡的Key(可選)和Message。
Storm-Kafka提供預設的FieldNameBasedTupleToKafkaMapper是這樣的:
//返回Key和Msg,預設的Field是"key" "message",在new的時候可以自定義,也可以在方法裡面做封裝和拼接
package org.apache.storm.kafka.bolt.mapper;
import org.apache.storm.tuple.Tuple;
public class FieldNameBasedTupleToKafkaMapper<K,V> implements TupleToKafkaMapper<K, V> {
public static final String BOLT_KEY = "key";
public static final String BOLT_MESSAGE = "message";
public String boltKeyField;
public String boltMessageField;
public FieldNameBasedTupleToKafkaMapper() {
this(BOLT_KEY, BOLT_MESSAGE);
}
public FieldNameBasedTupleToKafkaMapper(String boltKeyField, String boltMessageField) {
this.boltKeyField = boltKeyField;
this.boltMessageField = boltMessageField;
}
@Override
public K getKeyFromTuple(Tuple tuple) {
//for backward compatibility, we return null when key is not present.
return tuple.contains(boltKeyField) ? (K) tuple.getValueByField(boltKeyField) : null;
}
@Override
public V getMessageFromTuple(Tuple tuple) {
return (V) tuple.getValueByField(boltMessageField);
}
}
然後,其實不難想到KafkaBolt裡面的操作,就是
//取出值然後封裝發出,非常簡單
key = mapper.getKeyFromTuple(input);
message = mapper.getMessageFromTuple(input);
producer.send(new KeyedMessage(topic, key, message));
五、Scheme
Scheme主要負責定義如何從訊息流中解析所需資料。
Scheme介面:
public interface Scheme extends Serializable {
public List<Object> deserialize(byte[] ser);
public Fields getOutputFields();
}
包括反序列化的方法和輸出的欄位宣告。
Storm-Kafka自帶的StringScheme的實現:
public class StringScheme implements Scheme {
private static final Charset UTF8_CHARSET = StandardCharsets.UTF_8;
public static final String STRING_SCHEME_KEY = "str";
public List<Object> deserialize(ByteBuffer bytes) {
return new Values(deserializeString(bytes));
}
public static String deserializeString(ByteBuffer string) {
if (string.hasArray()) {
int base = string.arrayOffset();
return new String(string.array(), base + string.position(), string.remaining());
} else {
return new String(Utils.toByteArray(string), UTF8_CHARSET);
}
}
public Fields getOutputFields() {
return new Fields(STRING_SCHEME_KEY);
}
}
其實就是返回了String和聲明瞭欄位”str”,只要在方法裡自定義對位元組流的操作,並聲明瞭欄位,就可以自定義自己的Scheme了。