PySpark中的随机数生成
让我们从一个简单的函数开始,该函数总是返回一个随机整数:
import numpy as npdef f(x):
return np.random.randint(1000)
和RDD填充零并使用映射f
:
rdd = sc.parallelize([0] * 10).map(f)
由于以上RDD不会持久存在,我希望每次收集时都会得到不同的输出:
> rdd.collect()[255, 512, 512, 512, 255, 512, 255, 512, 512, 255]
如果我们忽略了值的分布并不是真正随机的事实,那就或多或少会发生什么。当我们只考虑第一个要素时,问题就开始了:
assert len(set(rdd.first() for _ in xrange(100))) == 1
要么
assert len(set(tuple(rdd.take(1)) for _ in xrange(100))) == 1
似乎每次都返回相同的数字。我已经能够在使用Spark
1.2、1.3和1.4的两台不同机器上重现此行为。我在这里使用,np.random.randint
但与的行为相同random.randint
。
这个问题与的非完全随机结果相同collect
,似乎是特定于Python的,我无法使用Scala重现它:
def f(x: Int) = scala.util.Random.nextInt(1000)val rdd = sc.parallelize(List.fill(10)(0)).map(f)
(1 to 100).map(x => rdd.first).toSet.size
rdd.collect()
我想念这里明显的东西吗?
:
原来,问题的根源是Python RNG实现。引用官方文档:
该模块提供的功能实际上是random.Random类的隐藏实例的绑定方法。您可以实例化自己的Random实例,以获取不共享状态的生成器。
我假设NumPy的工作方式相同,并f
使用RandomState
实例重写如下
import osimport binascii
def f(x, seed=None):
seed = (
seed if seed is not None
else int(binascii.hexlify(os.urandom(4)), 16))
rs = np.random.RandomState(seed)
return rs.randint(1000)
使速度变慢,但可以解决问题。
虽然上述解释说,从收集我不是随机的结果还是不明白它是如何影响first
/take(1)
多个动作之间。
回答:
因此,这里的实际问题相对简单。Python中的每个子进程都从其父级继承其状态:
len(set(sc.parallelize(range(4), 4).map(lambda _: random.getstate()).collect()))# 1
由于父状态没有理由在这种特定情况下更改,并且工作人员的寿命有限,因此每个孩子的状态在每次运行中都将完全相同。
以上是 PySpark中的随机数生成 的全部内容, 来源链接: utcz.com/qa/417557.html