ES的java客戶端中SearchRequest和SearchResponse的序列化和反序列化
阿新 • • 發佈:2018-12-22
Elastcsearch官方提供了一個elasticsearch-rest-high-level-client,作者在寫bug的時候需要將其中的ActionRequest(子類包括SearchRequest、IndexRequest、UpdateRequest等)和SearchResponse類分別做序列化和反序列化。
首先,這兩個類都沒有實現Serializable介面也無法直接用Json工具序列化為Json字串。但是官方給我們提供了其它方法,親測有效,以下作為記錄:
ActionRequest(使用SearchRequest作例子):
官方提供了readFrom(StreamInput in)和writeTo(StreamOutput out)方法供使用
public abstract class ActionRequest extends TransportRequest { ... @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); } ... }
所以,我們實現序列化的核心程式碼如下:
BytesStreamOutput output = new BytesStreamOutput();
actionRequest.writeTo(output);
//用base64進行編碼(如果沒有這步,反序列化時會失敗)
String s = BaseCoder.encryptBASE64(output.bytes().toBytesRef().bytes);
反序列化:
//base64解碼 byte[] s1 = BaseCoder.decryptBASE64(s); BytesStreamOutput bytesStreamOutput = new BytesStreamOutput(); bytesStreamOutput.write(s1); //設定解析器 IndicesModule indicesModule = new IndicesModule(Collections.emptyList()); SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.emptyList()); List<NamedWriteableRegistry.Entry> entries = new ArrayList<>(); entries.addAll(indicesModule.getNamedWriteables()); entries.addAll(searchModule.getNamedWriteables()); NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(entries); //轉換成輸入流 StreamInput in= new NamedWriteableAwareStreamInput(bytesStreamOutput.bytes().streamInput(), namedWriteableRegistry); //還原 SearchRequest request = new SearchRequest(); actionRequest.readFrom(in);
SearchResponse:
作者在對SearchResponse僅需序列化處理時發現上面的辦法行不通,於是換了一個辦法。es給我們提供了json的解決方式(XContent)。
序列化:
XContentBuilder builder = XContentFactory.jsonBuilder();
XContentBuilder xContent = searchResponse.toXContent(builder, ToXContent.EMPTY_PARAMS);
String jsonResponse = xContent.string();
反序列化:
NamedXContentRegistry namedXContentRegistry = new NamedXContentRegistry(getDefaultNamedXContents());
XContentParser parser = JsonXContent.jsonXContent.createParser(namedXContentRegistry, jsonResponse);
SearchResponse response= SearchResponse.fromXContent(parser);
private static List<NamedXContentRegistry.Entry> getDefaultNamedXContents() {
Map<String, ContextParser<Object, ? extends Aggregation>> map = new HashMap<>();
//這裡設定統計欄位(Aggregation)
map.put(TopHitsAggregationBuilder.NAME, (p, c) -> ParsedTopHits.fromXContent(p, (String) c));
map.put(DateRangeAggregationBuilder.NAME, (p, c) -> ParsedDateRange.fromXContent(p, (String) c));
map.put(FilterAggregationBuilder.NAME, (p, c) -> ParsedFilter.fromXContent(p, (String) c));
map.put(StringTerms.NAME, (p, c) -> ParsedStringTerms.fromXContent(p, (String) c));
map.put(LongTerms.NAME, (p, c) -> ParsedLongTerms.fromXContent(p, (String) c));
map.put(DoubleTerms.NAME, (p, c) -> ParsedDoubleTerms.fromXContent(p, (String) c));
List<NamedXContentRegistry.Entry> entries = map.entrySet().stream()
.map(entry -> new NamedXContentRegistry.Entry(Aggregation.class, new ParseField(entry.getKey()), entry.getValue()))
.collect(Collectors.toList());
SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.EMPTY_LIST);
//普通欄位
entries.addAll(searchModule.getNamedXContents());
return entries;
}