1. 程式人生 > >spark 輸出結果壓縮(gz)

spark 輸出結果壓縮(gz)

     如果不想往下看,可以直接看結果:maxCallRdd.repartition(3).saveAsTextFile(path,GzipCodec.class); 恩,沒錯。就只這麼一行簡單的程式碼實現了gz壓縮,但是為什麼網上一直沒找到呢,這個我不太清楚, 可能是他們沒碰到吧。

    最近專案的需求,其實很簡單,就是將輸出結果以.gz的格式壓縮,每個壓縮包的大小不能超過100M,而且壓縮包的名稱也有要求,就是要以時間來命名,如下:

 


如 果一個檔案大小超過400m,小於500m,則分成5個.gz檔案來壓縮,如果這個是用java平常的專案和apache的壓縮工具api來弄,是一件非 常easy的事情,但是,需求一波三折,最終的最終要求是spark計算完,結果落地的時候,把資料壓縮好儲存到hdfs中。恩,領導的想法總是對的,我 一向這樣認為。需求定下來,剩下的事情就是實現了。

1)首先把結果壓縮落地
     其實spark是支援結果壓縮落地的,具體api是void

org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(String path, Class<? extends CompressionCodec> codec)
跟saveAsTextFile(String path)相比只是多了一個引數,這個引數就是指定壓縮演算法的。這裡有個問題,就是壓縮演算法類必須是CompressionCodec介面的實現類Class<? extends CompressionCodec>,那麼就要看看CompressionCodec是什麼以及有哪些實現類了。其實這裡有個坑,如果不檢視原始碼的話,說不定你會一直在哪裡問為什麼,而且一直死迴圈。下面說說這個坑,說完了再說具體實現。先上圖:

可 以看到這個介面在hadoop和spark-core都有。但是具體用的是那個呢?毫無疑問,當然是用spark自帶的了,但是如果真的這樣想也不怪你, 因為自身帶有的肯定是最好的,就好像幫親不幫理一樣。首先看看spark自身帶的是什麼形式的,直接上程式碼:想詳細瞭解這個類的,可以從頭看一遍,不想看 的可以直接略過程式碼,直接看我囉嗦幾句,其實就是定義了一個叫CompressionCodec,然後在裡面定義了一些實現類, 如:LZFCompressionCodec(conf: SparkConf) extends CompressionCodec,class LZ4CompressionCodec(conf: SparkConf) extends CompressionCodec,然後你會想:既然已經有了實現類了,那麼我直接在maxCallRdd.saveAsTextFile(path, SnappyCompressionCodec.class)就完事收工了,嘻嘻,如果真的是這樣,那我還寫這篇東東干嘛,你會看到下面的情況:

看 到沒,如果你使用spark-core自帶的CompressionCodec的實現類是錯誤的,錯誤的,錯誤的。重要的事情說三遍,然後肯定會想打死 spark的開發人員,什麼玩意啊,自己寫的方法,居然不相容自己的實現類。這個就是坑了,不知道是我說明還沒到家還是怎麼樣,反正我覺得吧,你自己框架 的方法引數居然不是自己的實現類。找打是吧,也有一個可能是因為我是用java寫的api,所以用的不是spark的實現類,因為我查了網上的很多文章都 是用spark的CompressionCodec 實現了,而且用的很歡,一點問題也沒有。好,說完了坑了,說說怎麼填吧,輕輕跳過下面的程式碼。
包名是package org.apache.spark.io
@DeveloperApi
trait CompressionCodec {

  def compressedOutputStream(s: OutputStream): OutputStream

  def compressedInputStream(s: InputStream): InputStream
}

private[spark] object CompressionCodec {

  private val configKey = "spark.io.compression.codec"

  private[spark] def supportsConcatenationOfSerializedStreams(codec: CompressionCodec): Boolean = {
    (codec.isInstanceOf[SnappyCompressionCodec] || codec.isInstanceOf[LZFCompressionCodec]
      || codec.isInstanceOf[LZ4CompressionCodec])
  }

  private val shortCompressionCodecNames = Map(
    "lz4" -> classOf[LZ4CompressionCodec].getName,
    "lzf" -> classOf[LZFCompressionCodec].getName,
    "snappy" -> classOf[SnappyCompressionCodec].getName)

  def getCodecName(conf: SparkConf): String = {
    conf.get(configKey, DEFAULT_COMPRESSION_CODEC)
  }

  def createCodec(conf: SparkConf): CompressionCodec = {
    createCodec(conf, getCodecName(conf))
  }

  def createCodec(conf: SparkConf, codecName: String): CompressionCodec = {
    val codecClass = shortCompressionCodecNames.getOrElse(codecName.toLowerCase, codecName)
    val codec = try {
      val ctor = Utils.classForName(codecClass).getConstructor(classOf[SparkConf])
      Some(ctor.newInstance(conf).asInstanceOf[CompressionCodec])
    } catch {
      case e: ClassNotFoundException => None
      case e: IllegalArgumentException => None
    }
    codec.getOrElse(throw new IllegalArgumentException(s"Codec [$codecName] is not available. " +
      s"Consider setting $configKey=$FALLBACK_COMPRESSION_CODEC"))
  }

  /**
   * Return the short version of the given codec name.
   * If it is already a short name, just return it.
   */
  def getShortName(codecName: String): String = {
    if (shortCompressionCodecNames.contains(codecName)) {
      codecName
    } else {
      shortCompressionCodecNames
        .collectFirst { case (k, v) if v == codecName => k }
        .getOrElse { throw new IllegalArgumentException(s"No short name for codec $codecName.") }
    }
  }

  val FALLBACK_COMPRESSION_CODEC = "snappy"
  val DEFAULT_COMPRESSION_CODEC = "lz4"
  val ALL_COMPRESSION_CODECS = shortCompressionCodecNames.values.toSeq
}

/**
 * :: DeveloperApi ::
 * LZ4 implementation of [[org.apache.spark.io.CompressionCodec]].
 * Block size can be configured by `spark.io.compression.lz4.blockSize`.
 *
 * Note: The wire protocol for this codec is not guaranteed to be compatible across versions
 *       of Spark. This is intended for use as an internal compression utility within a single Spark
 *       application.
 */
@DeveloperApi
class LZ4CompressionCodec(conf: SparkConf) extends CompressionCodec {

  override def compressedOutputStream(s: OutputStream): OutputStream = {
    val blockSize = conf.getSizeAsBytes("spark.io.compression.lz4.blockSize", "32k").toInt
    new LZ4BlockOutputStream(s, blockSize)
  }

  override def compressedInputStream(s: InputStream): InputStream = new LZ4BlockInputStream(s)
}


/**
 * :: DeveloperApi ::
 * LZF implementation of [[org.apache.spark.io.CompressionCodec]].
 *
 * Note: The wire protocol for this codec is not guaranteed to be compatible across versions
 *       of Spark. This is intended for use as an internal compression utility within a single Spark
 *       application.
 */
@DeveloperApi
class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec {

  override def compressedOutputStream(s: OutputStream): OutputStream = {
    new LZFOutputStream(s).setFinishBlockOnFlush(true)
  }

  override def compressedInputStream(s: InputStream): InputStream = new LZFInputStream(s)
}


/**
 * :: DeveloperApi ::
 * Snappy implementation of [[org.apache.spark.io.CompressionCodec]].
 * Block size can be configured by `spark.io.compression.snappy.blockSize`.
 *
 * Note: The wire protocol for this codec is not guaranteed to be compatible across versions
 *       of Spark. This is intended for use as an internal compression utility within a single Spark
 *       application.
 */
@DeveloperApi
class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec {
  val version = SnappyCompressionCodec.version

  override def compressedOutputStream(s: OutputStream): OutputStream = {
    val blockSize = conf.getSizeAsBytes("spark.io.compression.snappy.blockSize", "32k").toInt
    new SnappyOutputStreamWrapper(new SnappyOutputStream(s, blockSize))
  }

  override def compressedInputStream(s: InputStream): InputStream = new SnappyInputStream(s)
}

/**
 * Object guards against memory leak bug in snappy-java library:
 * (https://github.com/xerial/snappy-java/issues/131).
 * Before a new version of the library, we only call the method once and cache the result.
 */
private final object SnappyCompressionCodec {
  private lazy val version: String = try {
    Snappy.getNativeLibraryVersion
  } catch {
    case e: Error => throw new IllegalArgumentException(e)
  }
}

/**
 * Wrapper over [[SnappyOutputStream]] which guards against write-after-close and double-close
 * issues. See SPARK-7660 for more details. This wrapping can be removed if we upgrade to a version
 * of snappy-java that contains the fix for https://github.com/xerial/snappy-java/issues/107.
 */
private final class SnappyOutputStreamWrapper(os: SnappyOutputStream) extends OutputStream {

  private[this] var closed: Boolean = false

  override def write(b: Int): Unit = {
    if (closed) {
      throw new IOException("Stream is closed")
    }
    os.write(b)
  }

  override def write(b: Array[Byte]): Unit = {
    if (closed) {
      throw new IOException("Stream is closed")
    }
    os.write(b)
  }

  override def write(b: Array[Byte], off: Int, len: Int): Unit = {
    if (closed) {
      throw new IOException("Stream is closed")
    }
    os.write(b, off, len)
  }

  override def flush(): Unit = {
    if (closed) {
      throw new IOException("Stream is closed")
    }
    os.flush()
  }

  override def close(): Unit = {
    if (!closed) {
      closed = true
      os.close()
    }
  }
}

生活就是從一個坑跳出來,正要準備大笑兩聲以抒懷的時候發現自己又跳進了另外一個更大的坑,這時只想說一個字,***。嗯,跑題了。。。。。直接看hadoop包裡面的介面和實現:
public interface CompressionCodec
{

    public abstract CompressionOutputStream createOutputStream(OutputStream outputstream)
        throws IOException;

    public abstract CompressionOutputStream createOutputStream(OutputStream outputstream, Compressor compressor)
        throws IOException;

    public abstract Class getCompressorType();

    public abstract Compressor createCompressor();

    public abstract CompressionInputStream createInputStream(InputStream inputstream)
        throws IOException;

    public abstract CompressionInputStream createInputStream(InputStream inputstream, Decompressor decompressor)
        throws IOException;

    public abstract Class getDecompressorType();

    public abstract Decompressor createDecompressor();

    public abstract String getDefaultExtension();
}
實現類有:

 

 可以看到是沒有GzipCodec實現的,所以這個真的只能靠經驗啊,不然不會知道GzipCodec的存在的

2)重新命名

    其實一開始接觸spark的時候,就覺得輸出內容以park-00001命名就覺得醜,想自定義,但是卻發現無從下手。所以只能另闢捷徑了,使用hadoop自帶的api來操作

   FileStatus[] listStatus = fileSystem.listStatus(new Path(path));

    for(......)

    fileSystem.rename(fileStatus.getPath(), new Path(path+"/"+fileName)); 

   ok,搞定。