1. 程式人生 > >基礎-Tuple和Fields結構

基礎-Tuple和Fields結構

Fields和Tuple

概述

路徑:storm-core/src/jvm/org/apache/storm/tuple/

Tuple和Fields是Storm用來傳輸資料(即流)的結構體。其中Tuple是Storm的資料傳輸單位,每一個tuple由一組資料組成。Tuple可以理解為由key-value組成的Map,不過,key會在傳送發預先進行定義(即使用Fields來進行定義),Tuple只需要按序進行賦值即可,接收方在接收到Tuple後,根據下標來獲取所需的值。以下是一個簡單的例子。

@Override
 public void declare OutputFields(OutputFieldsDeclarer declarer) {
     declarer.declare(new
Fields("from", "to", "duration")); }

在上述的程式碼段中,使用new Fields(“from”, “to”, “duration”)來對元件所傳送的Tuple的格式進行定義,即當向外傳送Tuple時,tuple由三個欄位組成。例如:

this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration))

對Tuple的各個欄位進行賦值後向外發射。Values類用來例項化一個Tuple物件。

Fields

Fields資料結構用於儲存(即前文提及的“定義”)Tuple的欄位名列表。

public class Fields implements Iterable<String>, Serializable {
    private static final long serialVersionUID = -3377931843059975424L;
    //儲存欄位名
    private List<String> _fields;
    //儲存從欄位名到它在欄位名列表中下標的對映
    private Map<String, Integer> _index = new HashMap<>();

    public
Fields(String... fields) { this(Arrays.asList(fields)); } public Fields(List<String> fields) { _fields = new ArrayList<>(fields.size()); for (String field : fields) { if (_fields.contains(field)) throw new IllegalArgumentException( String.format("duplicate field '%s'", field) ); _fields.add(field); } index(); } private void index() { for(int i=0; i<_fields.size(); i++) { _index.put(_fields.get(i), i); } } ... }

Tuple

Tuple用來實現訊息的傳輸,在Storm中,每一條訊息都是一個Tuple物件。首先看Tuple介面的定義:

public interface Tuple extends ITuple{
    //返回tuple物件的global stream id,即(component + stream)
    @Deprecated
    public GlobalStreamId getSourceGlobalStreamid();

    public GlobalStreamId getSourceGlobalStreamId();

    //返回傳送這個Tuple的元件,即傳送方
    public String getSourceComponent();

    //返回傳送這個Tuple的Task Id
    public int getSourceTask();

    //返回tuple的StreamId,可以理解為流的標籤或者分類
    public String getSourceStreamId();

    //返回訊息序號,用於跟蹤訊息是否成功處理等
    public MessageId getMessageId();
}

Tuple介面繼承自ITuple,ITuple主要聲明瞭一些對tuple物件的屬性進行操作的函式,例如獲取欄位長度、取值等。

接下來看Tuple介面的實現類:

public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed, IMeta, Tuple {
     //屬性自上而下分別為訊息的值、傳送Task的Id、流Id、Topology資訊、訊息序號、(未知,來自coljure,待續)
    private List<Object> values;
    private int taskId;
    private String streamId;
    private GeneralTopologyContext context;
    private MessageId id;
    private IPersistentMap _meta;

    public TupleImpl(GeneralTopologyContext context, List<Object> values, int taskId, String streamId, MessageId id) {
        this.values = values;
        this.taskId = taskId;
        this.streamId = streamId;
        this.id = id;
        this.context = context;

        String componentId = context.getComponentId(taskId);
        Fields schema = context.getComponentOutputFields(componentId, streamId);
        if(values.size()!=schema.size()) {
            throw new IllegalArgumentException(
                    "Tuple created with wrong number of fields. " +
                    "Expected " + schema.size() + " fields but got " +
                    values.size() + " fields");
        }
    }

    public TupleImpl(GeneralTopologyContext context, List<Object> values, int taskId, String streamId) {
        this(context, values, taskId, streamId, MessageId.makeUnanchored());
    }    
    ...
}

TupleImpl實現類繼承的類或者介面有一部分來自Coljure程式碼,其目的是為了能夠在Coljure程式碼中更好地操作這些Tuple物件。

Values

用來快速地例項化一個Tuple物件。