基礎-Tuple和Fields結構
阿新 • • 發佈:2019-02-03
Fields和Tuple
概述
路徑:storm-core/src/jvm/org/apache/storm/tuple/
Tuple和Fields是Storm用來傳輸資料(即流)的結構體。其中Tuple是Storm的資料傳輸單位,每一個tuple由一組資料組成。Tuple可以理解為由key-value組成的Map,不過,key會在傳送發預先進行定義(即使用Fields來進行定義),Tuple只需要按序進行賦值即可,接收方在接收到Tuple後,根據下標來獲取所需的值。以下是一個簡單的例子。
@Override
public void declare OutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("from", "to", "duration"));
}
在上述的程式碼段中,使用new Fields(“from”, “to”, “duration”)來對元件所傳送的Tuple的格式進行定義,即當向外傳送Tuple時,tuple由三個欄位組成。例如:
this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration))
對Tuple的各個欄位進行賦值後向外發射。Values類用來例項化一個Tuple物件。
Fields
Fields資料結構用於儲存(即前文提及的“定義”)Tuple的欄位名列表。
public class Fields implements Iterable<String>, Serializable {
private static final long serialVersionUID = -3377931843059975424L;
//儲存欄位名
private List<String> _fields;
//儲存從欄位名到它在欄位名列表中下標的對映
private Map<String, Integer> _index = new HashMap<>();
public Fields(String... fields) {
this(Arrays.asList(fields));
}
public Fields(List<String> fields) {
_fields = new ArrayList<>(fields.size());
for (String field : fields) {
if (_fields.contains(field))
throw new IllegalArgumentException(
String.format("duplicate field '%s'", field)
);
_fields.add(field);
}
index();
}
private void index() {
for(int i=0; i<_fields.size(); i++) {
_index.put(_fields.get(i), i);
}
}
...
}
Tuple
Tuple用來實現訊息的傳輸,在Storm中,每一條訊息都是一個Tuple物件。首先看Tuple介面的定義:
public interface Tuple extends ITuple{
//返回tuple物件的global stream id,即(component + stream)
@Deprecated
public GlobalStreamId getSourceGlobalStreamid();
public GlobalStreamId getSourceGlobalStreamId();
//返回傳送這個Tuple的元件,即傳送方
public String getSourceComponent();
//返回傳送這個Tuple的Task Id
public int getSourceTask();
//返回tuple的StreamId,可以理解為流的標籤或者分類
public String getSourceStreamId();
//返回訊息序號,用於跟蹤訊息是否成功處理等
public MessageId getMessageId();
}
Tuple介面繼承自ITuple,ITuple主要聲明瞭一些對tuple物件的屬性進行操作的函式,例如獲取欄位長度、取值等。
接下來看Tuple介面的實現類:
public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed, IMeta, Tuple {
//屬性自上而下分別為訊息的值、傳送Task的Id、流Id、Topology資訊、訊息序號、(未知,來自coljure,待續)
private List<Object> values;
private int taskId;
private String streamId;
private GeneralTopologyContext context;
private MessageId id;
private IPersistentMap _meta;
public TupleImpl(GeneralTopologyContext context, List<Object> values, int taskId, String streamId, MessageId id) {
this.values = values;
this.taskId = taskId;
this.streamId = streamId;
this.id = id;
this.context = context;
String componentId = context.getComponentId(taskId);
Fields schema = context.getComponentOutputFields(componentId, streamId);
if(values.size()!=schema.size()) {
throw new IllegalArgumentException(
"Tuple created with wrong number of fields. " +
"Expected " + schema.size() + " fields but got " +
values.size() + " fields");
}
}
public TupleImpl(GeneralTopologyContext context, List<Object> values, int taskId, String streamId) {
this(context, values, taskId, streamId, MessageId.makeUnanchored());
}
...
}
TupleImpl實現類繼承的類或者介面有一部分來自Coljure程式碼,其目的是為了能夠在Coljure程式碼中更好地操作這些Tuple物件。
Values
用來快速地例項化一個Tuple物件。