Kafka 消息序列化和反序列化(上)
首先我們通過一段示例代碼來看下普通情況下Kafka Producer如何編寫:
public class ProducerJavaDemo { public static final String brokerList = "192.168.0.2:9092,192.168.0.3:9092,192.168.0.4:9092"; public static final String topic = "hidden-topic"; public static void main(String[] args) { Properties properties = new Properties(); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("client.id", "hidden-producer-client-id-1"); properties.put("bootstrap.servers", brokerList); Producer<String,String> producer = new KafkaProducer<String,String>(properties); while (true) { String message = "kafka_message-" + new Date().getTime() + "-edited by hidden.zhu"; ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic,message); try { Future<RecordMetadata> future = producer.send(producerRecord, new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { System.out.print(metadata.offset()+" "); System.out.print(metadata.topic()+" "); System.out.println(metadata.partition()); } }); } catch (Exception e) { e.printStackTrace(); } try { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } } }
這裏采用的客戶端不是0.8.x.x時代的Scala版本,而是Java編寫的新Kafka Producer, 相應的Maven依賴如下:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
上面的程序中使用的是Kafka客戶端自帶的org.apache.kafka.common.serialization.StringSerializer,除了用於String類型的序列化器之外還有:ByteArray、ByteBuffer、Bytes、Double、Integer、Long這幾種類型,它們都實現了org.apache.kafka.common.serialization.Serializer接口,此接口有三種方法:
public void configure(Map<String, ?> configs, boolean isKey):用來配置當前類。
public byte[] serialize(String topic, T data):用來執行序列化。
public void close():用來關閉當前序列化器。一般情況下這個方法都是個空方法,如果實現了此方法,必須確保此方法的冪等性,因為這個方法很可能會被KafkaProducer調用多次。
下面我們來看看Kafka中org.apache.kafka.common.serialization.StringSerializer的具體實現,源碼如下:
public class StringSerializer implements Serializer<String> {
private String encoding = "UTF8";
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
Object encodingValue = configs.get(propertyName);
if (encodingValue == null)
encodingValue = configs.get("serializer.encoding");
if (encodingValue != null && encodingValue instanceof String)
encoding = (String) encodingValue;
}
@Override
public byte[] serialize(String topic, String data) {
try {
if (data == null)
return null;
else
return data.getBytes(encoding);
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
}
}
@Override
public void close() {
// nothing to do
}
}
首先看下StringSerializer中的configure(Map)
public class Company {
private String name;
private String address;
//省略Getter, Setter, Constructor & toString方法
}
接下去我們來實現Company類型的Serializer,即下面代碼示例中的DemoSerializer。
package com.hidden.client;
public class DemoSerializer implements Serializer<Company> {
public void configure(Map<String, ?> configs, boolean isKey) {}
public byte[] serialize(String topic, Company data) {
if (data == null) {
return null;
}
byte[] name, address;
try {
if (data.getName() != null) {
name = data.getName().getBytes("UTF-8");
} else {
name = new byte[0];
}
if (data.getAddress() != null) {
address = data.getAddress().getBytes("UTF-8");
} else {
address = new byte[0];
}
ByteBuffer buffer = ByteBuffer.allocate(4+4+name.length + address.length);
buffer.putInt(name.length);
buffer.put(name);
buffer.putInt(address.length);
buffer.put(address);
return buffer.array();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return new byte[0];
}
public void close() {}
}
使用時只需要在Kafka Producer的config中修改value.serializer屬性即可,示例如下:
properties.put("value.serializer", "com.hidden.client.DemoSerializer");
//記得也要將相應的String類型改為Company類型,如:
//Producer<String,Company> producer = new KafkaProducer<String,Company>(properties);
//Company company = new Company();
//company.setName("hidden.cooperation-" + new Date().getTime());
//company.setAddress("Shanghai, China");
//ProducerRecord<String, Company> producerRecord = new ProducerRecord<String, Company>(topic,company);1234567
Kafka 消息序列化和反序列化(上)