spark 批量寫入redis (pipeline + 分批提交)
spark 和redis互動的文章網上太多,涉及批量到寫redis大都用pipline模式。總感覺美中不足,因為pipeline是將命令進行打包處理,大資料量下pipline()慢且導致記憶體不足。
花了一點時間實現了redis批量提交,做個筆記備忘。
package com.hooray.spark.sass2.program
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.atomic.AtomicInteger
import com.hooray.spark.sass2.utils.InternalRedisClient
import org.apache.spark.sql.{Row, SparkSession}
import scala.collection.mutable.ListBuffer
import scala.util.control.Breaks
/**
*
* Redis 批量提交
* @Autor: liujianjun
* @Description:
* @Date : Created in 1:35 2017/9/28
*
*/
object RedisBathWrite extends App{
val spark = SparkSession.builder().appName(“ProgramUser”)
// .master(CommonProperties.master)
.master(“local[*]”)
.config(“spark.sql.warehouse.dir”, “file:///tmp/redis”)
.enableHiveSupport() //使用hive
.getOrCreate()
import spark.implicits._
val listBuffer: ListBuffer[Row] = new ListBuffer[Row]
val data = “1,2,3,4,5,6,7,8,9,10,11,12,13”.split(“,”).toSeq.map((_.toString)).toDF(“name”).collect()
// val queue = mutable.QueueRow
val queue = new LinkedBlockingQueue[Row]() data.par.foreach(queue.offer(_) ) val batchSize = 2 val break = new Breaks for( i <- 1 to Math.ceil( data.size / batchSize.toFloat ).toInt ){ println( "i==" + i) break.breakable{ var count:AtomicInteger = new AtomicInteger(0) val jedis = InternalRedisClient.getRedis() jedis.select(1) while ( !queue.isEmpty){ count.getAndIncrement() println( "j==" + queue.size ) val pipeline = jedis.pipelined() val row = queue.poll() pipeline.lpush("liu_test5",row.getString(0)) if( count.get() == batchSize || queue.size < batchSize ){ pipeline.sync() InternalRedisClient.release(jedis) count.set(0) break.break() } }
}
package com.hooray.spark.sass2.utils
import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import redis.clients.jedis._
/**
*
*Redis 工具類
* @Autor: liujianjun
* @Description:
* @Date : Created in 1:41 2017/9/28
*
*/
object InternalRedisClient extends Serializable {
private val pro = PropertiUtils.init(“redis.properties”)
@transient private var pool: JedisPool = null
def release(jedis: Jedis): Unit = {
assert(pool != null)
pool.returnBrokenResource(jedis)
}
def getRedis(): Jedis = {
makePool()
assert(pool != null)
pool.getResource
}
private def makePool(): Unit = {
if (pool == null) {
val poolConfig = new GenericObjectPoolConfig()
poolConfig.setMaxTotal(pro.getProperty(“max_total”).toInt)
poolConfig.setMaxIdle(pro.getProperty(“max_idle”).toInt)
poolConfig.setMinIdle(pro.getProperty(“min_idle”).toInt)
poolConfig.setTestOnBorrow(true)
poolConfig.setTestOnReturn(false)
poolConfig.setMaxWaitMillis(pro.getProperty(“max_wait_millis”).toLong)
pool = new JedisPool(poolConfig,
pro.getProperty(“redis_host”),
pro.getProperty(“redis_port”).toInt)
val hook = new Thread {
override def run = pool.destroy()
}
sys.addShutdownHook(hook.run)
}
}
}