1. 程式人生 > >akka整合spark過程中踩的幾個小坑

akka整合spark過程中踩的幾個小坑

多執行緒的一個坑

error:


 ERROR (com.ximalaya.xqlserver.xql.engine.adapter.BatchSqlRunnerEngine:74) - executor result throw 
java.lang.IllegalArgumentException: spark.sql.execution.id is already set

後面根據這個error找到了jira
,又跟蹤了一下原始碼,發現是ThreadLocal和ForkJoinPool不相容的問題,想了下,由於我是將spark的執行邏輯包在akka中的,而akka actor預設使用的執行緒池正是forkJoinPool,所以解決起來也有思路,那就是改變預設執行緒池就可以了

思路1:

將執行邏輯包裝在Future中,將Future的預設執行緒池改掉:

  private val executor = Executors.newFixedThreadPool(poolSize)
  private implicit val ex = ExecutionContext.fromExecutor(executor)

思路2:
配置akka conf中,額外配一個執行緒池,而新建Actor的時候選擇用我們自己額外配置執行緒池取建立

akka remote byte限制

由於我要將執行完的dataFrame從yarn返回給本地,而通訊用的akka remote,akka remote預設有包大小的限制,所以如果超過大小就會丟包

17:18:34 691  INFO (com.ximalaya.xql.communication.engine.RemoteSystemDriver$:44) - remote config is Config(SimpleConfigObject({"akka":{"actor":{"provider":"akka.remote.RemoteActorRefProvider","remote":{"enabled-transports":["akka.remote.netty.tcp"],"netty":{"tcp":{"maximum-frame-size":"30000000b","message-frame-size"
:"30000000b","receive-buffer-size":"30000000b","send-buffer-size":"30000000b"}}},"serialization-bindings":{"akka.actor.ActorIdentity":"kryo","akka.actor.Identify":"kryo","akka.remote.RemoteWatcher$Rewatch":"kryo","com.ximalaya.xql.communication.common.bean.ResultXQL":"kryo"},"serializers":{"kryo":"com.twitter.chill.akka.AkkaSerializer"}},"daemonic":"off","loglevel":"INFO"}})) [ERROR] [10/26/2016 17:19:13.613] [remoteSystem-akka.remote.default-remote-dispatcher-5] [akka.tcp://remoteSystem@192.168.17.75:2552/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FlocalSystem%40192.168.3.228%3A2222-0/endpointWriter] Transient association error (association remains live) akka.remote.OversizedPayloadException: Discarding oversized payload sent to Actor[akka.tcp://localSystem@192.168.3.228:2222/user/localMaster#-1650884510]: max allowed size 128000 bytes, actual size of encoded class c was 133699 bytes.

跟蹤了下原始碼,後面看了一下配置,設定了下配置,解決了這個問題:

akka{
   remote {
      enabled-transports = ["akka.remote.netty.tcp"]
      netty.tcp {
        message-frame-size =  30000000b
        send-buffer-size =  30000000b
        receive-buffer-size =  30000000b
        maximum-frame-size = 30000000b
      }
    }
}