akka整合spark過程中踩的幾個小坑
阿新 • • 發佈:2019-01-05
多執行緒的一個坑
error:
ERROR (com.ximalaya.xqlserver.xql.engine.adapter.BatchSqlRunnerEngine:74) - executor result throw
java.lang.IllegalArgumentException: spark.sql.execution.id is already set
後面根據這個error找到了jira
,又跟蹤了一下原始碼,發現是ThreadLocal和ForkJoinPool不相容的問題,想了下,由於我是將spark的執行邏輯包在akka中的,而akka actor預設使用的執行緒池正是forkJoinPool,所以解決起來也有思路,那就是改變預設執行緒池就可以了
思路1:
將執行邏輯包裝在Future中,將Future的預設執行緒池改掉:
private val executor = Executors.newFixedThreadPool(poolSize)
private implicit val ex = ExecutionContext.fromExecutor(executor)
思路2:
配置akka conf中,額外配一個執行緒池,而新建Actor的時候選擇用我們自己額外配置執行緒池取建立
akka remote byte限制
由於我要將執行完的dataFrame從yarn返回給本地,而通訊用的akka remote,akka remote預設有包大小的限制,所以如果超過大小就會丟包
17:18:34 691 INFO (com.ximalaya.xql.communication.engine.RemoteSystemDriver$:44) - remote config is Config(SimpleConfigObject({"akka":{"actor":{"provider":"akka.remote.RemoteActorRefProvider","remote":{"enabled-transports":["akka.remote.netty.tcp"],"netty":{"tcp":{"maximum-frame-size":"30000000b","message-frame-size" :"30000000b","receive-buffer-size":"30000000b","send-buffer-size":"30000000b"}}},"serialization-bindings":{"akka.actor.ActorIdentity":"kryo","akka.actor.Identify":"kryo","akka.remote.RemoteWatcher$Rewatch":"kryo","com.ximalaya.xql.communication.common.bean.ResultXQL":"kryo"},"serializers":{"kryo":"com.twitter.chill.akka.AkkaSerializer"}},"daemonic":"off","loglevel":"INFO"}}))
[ERROR] [10/26/2016 17:19:13.613] [remoteSystem-akka.remote.default-remote-dispatcher-5] [akka.tcp://remoteSystem@192.168.17.75:2552/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FlocalSystem%40192.168.3.228%3A2222-0/endpointWriter] Transient association error (association remains live)
akka.remote.OversizedPayloadException: Discarding oversized payload sent to Actor[akka.tcp://localSystem@192.168.3.228:2222/user/localMaster#-1650884510]: max allowed size 128000 bytes, actual size of encoded class c was 133699 bytes.
跟蹤了下原始碼,後面看了一下配置,設定了下配置,解決了這個問題:
akka{
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
message-frame-size = 30000000b
send-buffer-size = 30000000b
receive-buffer-size = 30000000b
maximum-frame-size = 30000000b
}
}
}