Spark上的Redis:任务不可序列化

我们在Spark上使用Redis来缓存键值对,这是代码:

import com.redis.RedisClient

val r = new RedisClient("192.168.1.101", 6379)

val perhit = perhitFile.map(x => {

val arr = x.split(" ")

val readId = arr(0).toInt

val refId = arr(1).toInt

val start = arr(2).toInt

val end = arr(3).toInt

val refStr = r.hmget("refStr", refId).get(refId).split(",")(1)

val readStr = r.hmget("readStr", readId).get(readId)

val realend = if(end > refStr.length - 1) refStr.length - 1 else end

val refOneStr = refStr.substring(start, realend)

(readStr, refOneStr, refId, start, realend, readId)

})

但是编译器给了我这样的反馈:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable

at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)

at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)

at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)

at org.apache.spark.rdd.RDD.map(RDD.scala:270)

at com.ynu.App$.main(App.scala:511)

at com.ynu.App.main(App.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)

at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)

at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)

at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Caused by: java.io.NotSerializableException: com.redis.RedisClient

at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)

at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)

at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)

at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)

at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)

at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)

at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)

at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)

at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)

... 12 more

有人可以告诉我如何序列化从Redis获得的数据。非常感谢。

回答:

在Spark中,RDDs(如此map处)上的函数被序列化并发送给执行程序进行处理。这意味着这些操作中包含的所有元素都应该可序列化。

Redis连接不可序列化,因为它打开了到目标DB的TCP连接,该TCP连接已绑定到创建它的机器。

解决方案是在本地执行上下文中的执行器上创建那些连接。做到这一点的方法很少。我想到的两个是:

  • rdd.mapPartitions:可让您一次处理整个分区,从而分摊创建连接的成本)
  • 单例连接管理器:每个执行者创建一次连接

mapPartitions 仅需对程序结构进行少量更改即可轻松实现:

val perhit = perhitFile.mapPartitions{partition => 

val r = new RedisClient("192.168.1.101", 6379) // create the connection in the context of the mapPartition operation

val res = partition.map{ x =>

...

val refStr = r.hmget(...) // use r to process the local data

}

r.close // take care of resources

res

}

可以使用持有对连接的延迟引用的对象对单例连接管理器进行建模(注意:可变引用也将起作用)。

object RedisConnection extends Serializable {

lazy val conn: RedisClient = new RedisClient("192.168.1.101", 6379)

}

然后可以使用该对象实例化每个辅助JVM的1个连接,并用作Serializable操作闭包中的对象。

val perhit = perhitFile.map{x => 

val param = f(x)

val refStr = RedisConnection.conn.hmget(...) // use RedisConnection to get a connection to the local data

}

}

使用单例对象的优点是开销较小,因为连接仅由JVM创建一次(而不是每个RDD分区1个)

还有一些缺点:

  • 连接的清理很棘手(关机挂钩/计时器)
  • 必须确保共享资源的线程安全

(*)代码用于说明目的。未经编译或测试。

以上是 Spark上的Redis:任务不可序列化 的全部内容, 来源链接: utcz.com/qa/425235.html

回到顶部