1. 程式人生 > >(四)storm-kafka原始碼走讀之自定義Scheme

(四)storm-kafka原始碼走讀之自定義Scheme

本文原創,轉載請註明出處:

使用KafkaSpout需要子類實現Scheme,storm-kafka實現了StringScheme,KeyValueStringScheme等等,大家可以用。

這些Scheme主要負責從訊息流中解析出所需要的資料。

public interface Scheme extends Serializable {
    public List<Object> deserialize(byte[] ser);
    public Fields getOutputFields();
}

需要實現反序列化方法和輸出fields名稱,來看簡單StringScheme實現:
public class StringScheme implements Scheme {

    public static final String STRING_SCHEME_KEY = "str";

    public List<Object> deserialize(byte[] bytes) {
        return new Values(deserializeString(bytes));
    }

    public static String deserializeString(byte[] string) {
        try {
            return new String(string, "UTF-8");
        } catch (UnsupportedEncodingException e) {
            throw new RuntimeException(e);
        }
    }

    public Fields getOutputFields() {
        return new Fields(STRING_SCHEME_KEY);
    }
}

其實就是直接返回了一個String,在Spout往後發射時就一個欄位,其名為“str”,如果採用StringScheme時,大家在Bolt中可以用
tuple.getStringByField("str")

來獲取其值。有人有疑問前面為什麼用new SchemeAsMultiScheme(new StringScheme())吶?來看SchemeAsMultiScheme程式碼
public class SchemeAsMultiScheme implements MultiScheme {
  public final Scheme scheme;

  public SchemeAsMultiScheme(Scheme scheme) {
    this.scheme = scheme;
  }

  @Override public Iterable<List<Object>> deserialize(final byte[] ser) {
    List<Object> o = scheme.deserialize(ser);
    if(o == null) return null;
    else return Arrays.asList(o);
  }

  @Override public Fields getOutputFields() {
    return scheme.getOutputFields();
  }
}

public interface MultiScheme extends Serializable {
  public Iterable<List<Object>> deserialize(byte[] ser);
  public Fields getOutputFields();
}

其實本身還是呼叫了傳入的scheme方法,只不過返回結果組合成一個list而已,小弟覺得不用也可以。但是storm-kafka裡面預設是需要的,在KafkaUtils解析message時呼叫scheme資訊:

public static Iterable<List<Object>> generateTuples(KafkaConfig kafkaConfig, Message msg) {
        Iterable<List<Object>> tups;
        ByteBuffer payload = msg.payload();
        if (payload == null) {
            return null;
        }
        ByteBuffer key = msg.key();
        if (key != null && kafkaConfig.scheme instanceof KeyValueSchemeAsMultiScheme) {
            tups = ((KeyValueSchemeAsMultiScheme) kafkaConfig.scheme).deserializeKeyAndValue(Utils.toByteArray(key), Utils.toByteArray(payload));
        } else {
            tups = kafkaConfig.scheme.deserialize(Utils.toByteArray(payload));
        }
        return tups;
    }

所以沒什麼大的需求還是用storm-kafka預設的吧。

例子

kafka收到的message多種多樣,而且往下發射的資訊頁多種多樣,所以很多時候我們需要自己寫scheme,下面舉2個例子

example 1

第一:一般預設發射一個field,但是如果我需要多發幾個fields的話,該怎麼辦吶,現在發射2個,其實網上已有大牛,把kafka的offset加到了發射的資訊中去,分析的過程如下:

//returns false if it's reached the end of current batch
    public EmitState next(SpoutOutputCollector collector) {
        if (_waitingToEmit.isEmpty()) {
            fill();
        }
        while (true) {
            MessageAndRealOffset toEmit = _waitingToEmit.pollFirst();
            if (toEmit == null) {
                return EmitState.NO_EMITTED;
            }
            Iterable<List<Object>> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg);
            if (tups != null) {
                for (List<Object> tup : tups) {
                    collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));
                }
                break;
            } else {
                ack(toEmit.offset);
            }
        }
        if (!_waitingToEmit.isEmpty()) {
            return EmitState.EMITTED_MORE_LEFT;
        } else {
            return EmitState.EMITTED_END;
        }
    }

從上面看出,發射tuple時已經把offset作為messageId往下發射了,所以我們認為在下面接收tuple的Bolt中可以通過messageId獲取offset,但是我們再來看看backtype.storm.daemon.executor的程式碼:
(log-message"Opening spout " component-id ":" (keys task-datas))
        (doseq[[task-id task-data]task-datas
                :let[^ISpout spout-obj (:objecttask-data)
                      tasks-fn(:tasks-fntask-data)
                      send-spout-msg (fn[out-stream-id values message-id out-task-id]
                                       (.increment emitted-count)
                                       (let[out-tasks (ifout-task-id
                                                         (tasks-fnout-task-id out-stream-id values)
                                                         (tasks-fnout-stream-id values))
                                             rooted? (andmessage-id has-ackers?)
                                             root-id (ifrooted? (MessageId/generateId rand))
                                             out-ids (fast-list-for[t out-tasks](ifrooted? (MessageId/generateId rand)))]


從這段程式碼可以看出,messageId是隨機生成的,跟之前kafkaSpout 錨定的new KafkaMessageId(_partition, toEmit.offset)一點關係都沒有,所以需要自己手動把offset加到發射的tuple中去,這就需要我們自己實現Scheme了,程式碼如下:

publicclass KafkaOffsetWrapperScheme implements Scheme {
 
    public static final String SCHEME_OFFSET_KEY = "offset";
 
    private String _offsetTupleKeyName;
    private Scheme _localScheme;
 
    public KafkaOffsetWrapperScheme() {
        _localScheme = new StringScheme();
        _offsetTupleKeyName = SCHEME_OFFSET_KEY;
    }
 
 
    public KafkaOffsetWrapperScheme(Scheme localScheme,
                                    String offsetTupleKeyName) {
        _localScheme = localScheme;
        _offsetTupleKeyName = offsetTupleKeyName;
    }
 
    public KafkaOffsetWrapperScheme(Scheme localScheme) {
        this(localScheme, SCHEME_OFFSET_KEY);
    }
 
    public List<Object> deserialize(byte[] bytes) {
        return_localScheme.deserialize(bytes);
    }
 
    publicFields getOutputFields() {
        List<String> outputFields = _localScheme
                        .getOutputFields()
                        .toList();
        outputFields.add(_offsetTupleKeyName);
        returnnew Fields(outputFields);
    }
}



這裡的scheme輸出是兩個fields,一個是str,由StringScheme負責反序列化,或者自己實現其他的scheme;一個是offset,但是offset如何加到發射的tuple中吶??我們從PartitionManager中找到被髮射的tuple

public EmitState next(SpoutOutputCollector collector) {
    if (_waitingToEmit.isEmpty()) {
        fill();
    }
    while (true) {
        MessageAndRealOffset toEmit = _waitingToEmit.pollFirst();
        if (toEmit == null) {
            return EmitState.NO_EMITTED;
        }
        Iterable<List<Object>> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg);
        if (tups != null) {
            for (List<Object> tup : tups) {
                tup.add(toEmit.offset);
                collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));
            }
            break;
        } else {
            ack(toEmit.offset);
        }
    }
    if (!_waitingToEmit.isEmpty()) {
        return EmitState.EMITTED_MORE_LEFT;
    } else {
        return EmitState.EMITTED_END;
    }
}
KafkaUtils.generateTuples(xxx,xxx)
public static Iterable<List<Object>> generateTuples(KafkaConfig kafkaConfig, Message msg) {
        Iterable<List<Object>> tups;
        ByteBuffer payload = msg.payload();
        if (payload == null) {
            return null;
        }
        ByteBuffer key = msg.key();
        if (key != null && kafkaConfig.scheme instanceof KeyValueSchemeAsMultiScheme) {
            tups = ((KeyValueSchemeAsMultiScheme) kafkaConfig.scheme).deserializeKeyAndValue(Utils.toByteArray(key), Utils.toByteArray(payload));
        } else {
            tups = kafkaConfig.scheme.deserialize(Utils.toByteArray(payload));
        }
        return tups;
    }
目前我們已經成功把offset加到了發射的tuple中,在bolt中,可以通過tuple.getValue(1),或tuple.getStringByField("offset");來或者

唯一要做的就是在構建SpoutConfig時,指定scheme為KafkaOffsetWrapperScheme

example 2

第二,kafka裡面的存的message是其他格式的,如thrift,avro,protobuf格式,那這樣就需要自己實現反序列化的過程

這裡以avro scheme格式為例(這裡就不對avro掃盲了,自己google一下吧)

這時kafka中存放的是avro格式的message,如果avro schema如下

{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}

那我們需要實現Scheme介面
public class AvroMessageScheme implements Scheme{

    private final static Logger logger = LoggerFactory.getLogger(AvroMessageScheme.class);

    private GenericRecord e2;
    private AvroRecord avroRecord;

    public AvroMessageScheme() {

        }

        @Override
        public List<Object> deserialize(byte[] bytes) {
                e2 = null;
                avroRecord = null;

        try {
            InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream("examples.avsc");
            Schema schema = new Schema.Parser().parse(is);
            DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);
            Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, null);
            e2 = datumReader.read(null, decoder);
            avroRecord = new AvroRecord(e2);
        } catch (Exception e) {
            e.printStackTrace();
            return new Values(avroRecord);
        }

        return new Values(avroRecord);
    }

        @Override
        public Fields getOutputFields() {
                 return new Fields("msg");  
        }

}
這裡往下面發射的是一個POJO類,其實完全可以發射String。這樣效率會更高一點。
其AvroRecord POJO如下
public class AvroRecord implements Serializable {
    private String name;
    private int favorite_number;
    private String favorite_color;

    public AvroRecord(GenericRecord gr) {
        try {
            this.name = String.valueOf(gr.get("name"));
            this.favorite_number = Integer.parseInt(gr.get("favorite_number"));
            this.favorite_color = gr.get("favorite_color").toString();
        } catch (Exception e) {
            logger.error("read AvroRecord error!");
        }
    }

    @Override
    public String toString() {
        return "AvroRecord{" +
                "name='" + name + '\'' +
                ", favorite_number=" + favorite_number +
                ", favorite_color='" + favorite_color + '\'' +
                '}';
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getFavorite_color() {
        return favorite_color;
    }

    public void setFavorite_color(String favorite_color) {
        this.favorite_color = favorite_color;
    }

    public int getFavorite_number() {
        return favorite_number;
    }

    public void setFavorite_number(int favorite_number) {
        this.favorite_number = favorite_number;
    }
}

該例子筆者未經過測試,望慎重使用

Reference

https://blog.deck36.de/no-more-over-counting-making-counters-in-apache-storm-idempotent-using-redis-hyperloglog/