1. 程式人生 > >java jackson avro kryo等幾種序列化與反序列化工具的使用

java jackson avro kryo等幾種序列化與反序列化工具的使用

最近由於工作需要,需要研究常用的集中序列化方式,主要目的是物件序列化後佔用空間會大大減少,便於儲存和傳輸,下面是幾種序列化方式的使用demo

1. Java自帶的Serialize

依賴jar包:無

程式碼示意:

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}

object JavaSerialize {
  def serialize(obj: Object): Array[Byte] = {
    var oos: ObjectOutputStream = null
    var baos: ByteArrayOutputStream = null
    try {
      baos = new ByteArrayOutputStream()
      oos = new ObjectOutputStream(baos)
      oos.writeObject(obj)
      baos.toByteArray()
    }catch {
      case e: Exception =>
        println(e.getLocalizedMessage + e.getStackTraceString)
        null
    }
  }

  def deserialize(bytes: Array[Byte]): Object = {
    var bais: ByteArrayInputStream  = null
    try {
      bais = new ByteArrayInputStream(bytes)
      val ois = new ObjectInputStream(bais)
      ois.readObject()
    }catch {
      case e: Exception =>
        println(e.getLocalizedMessage + e.getStackTraceString)
        null
    }
  }
}

2. Jackson序列化方式

依賴jar包:json4s-jackson_2.10-3.2.11.jar、jackson-annotations-2.3.0.jar、jackson-core-2.3.1.jar、jackson-databind-2.3.1.jar(均可在maven上下載)

程式碼示意:
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization
import org.json4s.jackson.Serialization._

object JacksonSerialize {
  def serialize[T <: Serializable with AnyRef : Manifest](obj: T): String = {
    implicit val formats = Serialization.formats(NoTypeHints)
    write(obj)
  }

  def deserialize[T: Manifest](objStr: String): T = {
    implicit val formats = Serialization.formats(NoTypeHints)
    read[T](objStr)
  }
}

程式碼也是非常簡單,好處是序列化後的結果是以json格式顯示,可以直接閱讀,更人性化,但是缺點是序列化耗時較久,並且序列化後大小也不小

3. Avro序列化方式

依賴jar包:avro-tools-1.7.7.jar(用於編譯生成類)、avro-1.7.7.jar

第一步:定義資料結構scheme檔案user.avsc,如下:
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}
第二步:通過工具生成類
(1)avro-tools-1.7.7.jar 包和user.avsc 放置在同一個路徑下
(2)執行 java -jar avro-tools-1.7.7.jar compile schema user.avsc java.
(3)會在當前目錄下,自動生成User.java檔案,然後在程式碼中引用此類
第三步:程式碼示意
import java.io.ByteArrayOutputStream
import example.avro.User
import org.apache.avro.file.{DataFileReader, DataFileWriter}
import org.apache.avro.io.{DecoderFactory, EncoderFactory}
import org.apache.avro.specific.{SpecificDatumReader, SpecificDatumWriter}

object AvroSerialize {
  //將序列化的結果返回為位元組陣列
  def serialize(user: User): Array[Byte] ={
    val bos = new ByteArrayOutputStream()
    val writer = new SpecificDatumWriter[User](User.getClassSchema)
    val encoder = EncoderFactory.get().binaryEncoder(bos, null)
    writer.write(user, encoder)
    encoder.flush()
    bos.close()
    bos.toByteArray
  }

  //將序列化後的位元組陣列反序列化為物件
  def deserialize(bytes: Array[Byte]): Any = {
    val reader = new SpecificDatumReader[User](User.getClassSchema)
    val decoder = DecoderFactory.get().binaryDecoder(bytes, null)
    var user: User = null
    user = reader.read(null, decoder)
    user
  }

  //將序列化的結果存入到檔案
  def serialize(user: User, path: String): Unit ={
    val userDatumWriter = new SpecificDatumWriter[User](User.getClassSchema)
    val dataFileWriter = new DataFileWriter[User](userDatumWriter)
    dataFileWriter.create(user.getSchema(),  new java.io.File(path))
    dataFileWriter.append(user)
    dataFileWriter.close()
  }

  //從檔案中反序列化為物件
  def deserialize(path: String): List[User] = {
    val  reader = new SpecificDatumReader[User](User.getClassSchema)
    val  dataFileReader = new DataFileReader[User](new java.io.File(path), reader)
    var users: List[User] = List[User]()
    while (dataFileReader.hasNext()) {
      users :+= dataFileReader.next()
    }
    users
  }
}

這裡提供了兩種方式,一種是通過二進位制,另一種是通過檔案。方法相對上面兩種有點複雜,在hadoop RPC中使用了這種序列化方式

4. Kryo序列化方式

依賴jar包:kryo-4.0.0.jar、minlog-1.2.jar、objenesis-2.6.jar、commons-codec-1.8.jar

程式碼示意:
import java.io.{ByteArrayOutputStream}
import com.esotericsoftware.kryo.{Kryo}
import com.esotericsoftware.kryo.io.{Input, Output}
import com.esotericsoftware.kryo.serializers.JavaSerializer
import org.objenesis.strategy.StdInstantiatorStrategy

object KryoSerialize {
  val kryo = new ThreadLocal[Kryo]() {
    override def initialValue(): Kryo = {
      val kryoInstance = new Kryo()
      kryoInstance.setReferences(false)
      kryoInstance.setRegistrationRequired(false)
      kryoInstance.setInstantiatorStrategy(new StdInstantiatorStrategy())
      kryoInstance.register(classOf[Serializable], new JavaSerializer())
      kryoInstance
    }
  }

  def serialize[T <: Serializable with AnyRef : Manifest](t: T): Array[Byte] = {
    val baos = new ByteArrayOutputStream()
    val output = new Output(baos)
    output.clear()

    try {
      kryo.get().writeClassAndObject(output, t)
    } catch {
      case e: Exception =>
        e.printStackTrace()
    } finally {

    }
   output.toBytes
  }

  def deserialize[T <: Serializable with AnyRef : Manifest](bytes: Array[Byte]): T = {

    val input = new Input()
    try {
      input.setBuffer(bytes)
      kryo.get().readClassAndObject(input).asInstanceOf[T]
    } finally {
    }
  }
}

這種方式經過我本地測試,速度是最快的,關鍵是做好對kryo物件的複用,因為大量建立會非常耗時,在這裡要處理好多執行緒情況下對kryo物件的使用,spark中也會使用到kryo

其實還有其他的序列化方式,比如protobuf、thrify,操作上也有一定複雜性,由於環境問題暫時未搞定,搞定了再發出來。