利用Avro實現Kakfa序列化和反序列化
1 在pom.xml中新增依賴,
1.1 在dependencies中配置:
<dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.8.2</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro-tools</artifactId> <version>1.8.2</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>1.8.2</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro-compiler</artifactId> <version>1.8.2</version> </dependency>
1.2 在<build>中配置,注意不要加<pluginManagement></pluginManagement>:
<build> <plugins> <plugin> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>1.8.2</version> <executions> <execution> <phase>generate-sources</phase> <goals> <goal>schema</goal> </goals> <configuration> <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory> <outputDirectory>${project.basedir}/src/main/java/</outputDirectory> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build>
2 定義avro的scheme
2.1 Avro scheme是通過JSON形式來定義的,一般以.avsc結尾(maven外掛會去指定目錄下獲取.avsc結尾的檔案並生成成java檔案)。
stock.avsc內容如下:
namespace —— 生成的java檔案的包
type —— record
name —— 生成類的名字
fields —— 欄位名和型別
{
"namespace":"org.sunny.avroDAO",
"type":"record",
"name":"StockAvroBean",
"fields":[
{"name":"stockCode","type":"string"},
{"name":"stockName","type":"string"},
{"name":"tradeTime","type":"long"},
{"name":"preclosePrice","type":"float"},
{"name":"openPrice","type":"float"},
{"name":"currentPrice","type":"float"}
]
}
2.2 利用*.avsc檔案生成.java檔案,IDEA->Tool Window->Maven Projects-> compile或者install都可以,在namespace的包下生成name.java檔案(StockAvroBean.java)。
StockAvroBean.java是org.apache.avro.specific.SpecificRecordBase的子類。
另外,avro提供了一個avro-tools包來生成java檔案,可以通過下面命令:
java -jar /path/to/avro-tools-1.8.0.jar compile schema <schema file> <destination>
這也是為什麼沒有avro/avro-tools-1.8.2.jar出現在avro資料夾中的原因。
3 實現序列化.class、反序列化.class,同時,為了方便起見,定義TopicEnum列舉類實現topic和value的class之間的對映。
3.1 序列化:
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.kafka.common.serialization.Serializer;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Map;
/**
* 序列化
* @param <T>
*/
public class AvroSerializer<T extends SpecificRecordBase> implements Serializer<T> {
@Override
public void configure(Map<String, ?> map, boolean b) {}
@Override
public byte[] serialize(String topic, T data) {
if (data == null){
return null;
}
DatumWriter<T> writer = new SpecificDatumWriter<>(data.getSchema()); //將data的schema裝進去
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(outputStream,null);
try {
writer.write(data,encoder);
}catch (IOException e){
e.printStackTrace();
}
return outputStream.toByteArray();
}
@Override
public void close() {}
}
3.2 反序列化:
import org.apache.avro.io.*;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.kafka.common.serialization.Deserializer;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.Map;
/**
* 反序列化
*/
public class AvorDeserializer<T extends SpecificRecordBase> implements Deserializer<T> {
@Override
public void configure(Map<String, ?> map, boolean b) {}
@Override
public T deserialize(String topic, byte[] data) {
if (data == null){
return null;
}
try {
//得到主題對應的資料型別
TopicEnum topicEnum = TopicEnum.getTopicEnum(topic);
if (topicEnum == null){
return null;
}
SpecificRecordBase record = topicEnum.getRecord();
DatumReader<T> datumReader = new SpecificDatumReader<>(record.getSchema());
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data);
BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(byteArrayInputStream,null);
return datumReader.read(null,decoder);
}catch (IOException e){
e.printStackTrace();
}
return null;
}
@Override
public void close() {}
}
3.3 TopicEnum:將topic和value對應的class檔案進行關聯:
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.commons.lang.StringUtils;
/**
* 將topic和value對應的類對應起來
*/
public enum TopicEnum {
STOCK_AVOR("avro-kafka",new StockAvroBean()); //例項
private String topic;
private SpecificRecordBase record;
private TopicEnum(String topic,SpecificRecordBase record){
this.topic = topic;
this.record = record;
}
...Getter and Setter...
public static TopicEnum getTopicEnum(String topicName){
if (topicName.isEmpty()){
return null;
}
for (TopicEnum topicEnum : values()){
if (StringUtils.equalsIgnoreCase(topicEnum.getTopic(),topicName)){
return topicEnum;
}
}
return null;
}
}
4 在Producer和Consumer中配置屬性
消費者的配置部分:
public class AvroConsumer {
private static final String BOOTSTRAP_SERVER = "LOCALHOST:9092";
private Properties initConfig(){
Properties config = new Properties();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVER);
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
config.put(ConsumerConfig.GROUP_ID_CONFIG,"avro-group");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvorDeserializer.class.getName());
return config;
}
}
生產者的配置部分:
public class AvroProducer {
private static final String BROKER_LIST = "localhost:9092";
public AvroProducer(String[] topics){
this.topics = topics;
}
private static Properties initconfig(){
Properties config = new Properties();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BROKER_LIST);//broker_list
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroSerializer.class.getName());
config.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,AvroPartition.class.getName()); //自定義的分割槽準則
return config;
}
}
題外話,上面自定義的分割槽準則,是通過繼承org.apache.kafka.client.producer.Partition實現的,下面提供一種實現方法:
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
/**
* 實現自己的分割槽邏輯
*/
public class AvroPartition implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keybytes, Object value, byte[] valuebytes, Cluster cluster) {
if (key==null){
return 0;
}
String partitionKey = key.toString();
try {
//根據key的最後一位和partitions取模,設定分割槽
int partitionID = Integer.valueOf(partitionKey.substring(partitionKey.length()-2)) % 6;
System.out.println(partitionID);
return partitionID;
}catch (Exception e){
e.printStackTrace();
}
return 0;
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> map) {}
}
參考文章: