Kafka訊息序列化和反序列化(下)
有序列化就會有反序列化,反序列化的操作是在Kafka Consumer中完成的,使用起來只需要配置一下key.deserializer和value.deseriaizer。對應上面自定義的Company型別的Deserializer就需要實現org.apache.kafka.common.serialization.Deserializer介面,這個介面同樣有三個方法:
- public void configure(Map<String, ?> configs, boolean isKey):用來配置當前類。
- public byte[] serialize(String topic, T data):用來執行反序列化。如果data為null建議處理的時候直接返回null而不是丟擲一個異常。
- public void close():用來關閉當前序列化器。
下面就來看一下DemoSerializer對應的反序列化的DemoDeserializer,詳細程式碼如下:
public class DemoDeserializer implements Deserializer<Company> {
public void configure(Map<String, ?> configs, boolean isKey) {}
public Company deserialize(String topic, byte[] data) {
if (data == null) {
return null;
}
if (data.length < 8) {
throw new SerializationException("Size of data received by DemoDeserializer is shorter than expected!");
}
ByteBuffer buffer = ByteBuffer.wrap(data);
int nameLen, addressLen;
String name, address;
nameLen = buffer.getInt();
byte [] nameBytes = new byte[nameLen];
buffer.get(nameBytes);
addressLen = buffer.getInt();
byte[] addressBytes = new byte[addressLen];
buffer.get(addressLen);
try {
name = new String(nameBytes, "UTF-8");
address = new String(addressBytes, "UTF-8");
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error occur when deserializing!");
}
return new Company(name,address);
}
public void close() {}
}
有些讀者可能對新版的Consumer不是很熟悉,這裡順帶著舉一個完整的消費示例,並以DemoDeserializer作為訊息Value的反序列化器。
Properties properties = new Properties();
properties.put("bootstrap.servers", brokerList);
properties.put("group.id", consumerGroup);
properties.put("session.timeout.ms", 10000);
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "com.hidden.client.DemoDeserializer");
properties.put("client.id", "hidden-consumer-client-id-zzh-2");
KafkaConsumer<String, Company> consumer = new KafkaConsumer<String, Company>(properties);
consumer.subscribe(Arrays.asList(topic));
try {
while (true) {
ConsumerRecords<String, Company> records = consumer.poll(100);
for (ConsumerRecord<String, Company> record : records) {
String info = String.format("topic=%s, partition=%s, offset=%d, consumer=%s, country=%s",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
System.out.println(info);
}
consumer.commitAsync(new OffsetCommitCallback() {
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception != null) {
String error = String.format("Commit failed for offsets {}", offsets, exception);
System.out.println(error);
}
}
});
}
} finally {
consumer.close();
}
有些時候自定義的型別還可以和Avro、ProtoBuf等聯合使用,而且這樣更加的方便快捷,比如我們將前面Company的Serializer和Deserializer用Protostuff包裝一下,由於篇幅限制,筆者這裡只羅列出對應的serialize和deserialize方法,詳細參考如下:
public byte[] serialize(String topic, Company data) {
if (data == null) {
return null;
}
Schema schema = (Schema) RuntimeSchema.getSchema(data.getClass());
LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
byte[] protostuff = null;
try {
protostuff = ProtostuffIOUtil.toByteArray(data, schema, buffer);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
} finally {
buffer.clear();
}
return protostuff;
}
public Company deserialize(String topic, byte[] data) {
if (data == null) {
return null;
}
Schema schema = RuntimeSchema.getSchema(Company.class);
Company ans = new Company();
ProtostuffIOUtil.mergeFrom(data, ans, schema);
return ans;
}
如果Company的欄位很多,我們使用Protostuff進一步封裝一下的方式就顯得簡潔很多。不過這個不是最主要的,而最主要的是經過Protostuff包裝之後,這個Serializer和Deserializer可以向前相容(新加欄位採用預設值)和向後相容(忽略新加欄位),這個特性Avro和Protobuf也都具備。
自定義的型別有一個不得不面對的問題就是Kafka Producer和Kafka Consumer之間的序列化和反序列化的相容性,試想對於StringSerializer來說,Kafka Consumer可以順其自然的採用StringDeserializer,不過對於Company這種專用型別,某個服務使用DemoSerializer進行了序列化之後,那麼下游的消費者服務必須也要實現對應的DemoDeserializer。再者,如果上游的Company型別改變,下游也需要跟著重新實現一個新的DemoSerializer,這個後面所面臨的難題可想而知。所以,如無特殊需要,筆者不建議使用自定義的序列化和反序列化器;如有業務需要,也要使用通用的Avro、Protobuf、Protostuff等序列化工具包裝,儘可能的實現得更加通用且向前後相容。
題外話,對於Kafka的“深耕者”Confluent來說,還有其自身的一套序列化和反序列化解決方案(io.confluent.kafka.serializer.KafkaAvroSerializer),GitHub上有相關資料,讀者如有興趣可以自行擴充套件學習。
歡迎支援《RabbitMQ實戰指南》以及關注微信公眾號:朱小廝的部落格。