PySpark中的随机数生成

让我们从一个简单的函数开始,该函数总是返回一个随机整数:

import numpy as np

def f(x):

return np.random.randint(1000)

和RDD填充零并使用映射f

rdd = sc.parallelize([0] * 10).map(f)

由于以上RDD不会持久存在,我希望每次收集时都会得到不同的输出:

> rdd.collect()

[255, 512, 512, 512, 255, 512, 255, 512, 512, 255]

如果我们忽略了值的分布并不是真正随机的事实,那就或多或少会发生什么。当我们只考虑第一个要素时,问题就开始了:

assert len(set(rdd.first() for _ in xrange(100))) == 1

要么

assert len(set(tuple(rdd.take(1)) for _ in xrange(100))) == 1

似乎每次都返回相同的数字。我已经能够在使用Spark

1.2、1.3和1.4的两台不同机器上重现此行为。我在这里使用,np.random.randint但与的行为相同random.randint

这个问题与的非完全随机结果相同collect,似乎是特定于Python的,我无法使用Scala重现它:

def f(x: Int) = scala.util.Random.nextInt(1000)

val rdd = sc.parallelize(List.fill(10)(0)).map(f)

(1 to 100).map(x => rdd.first).toSet.size

rdd.collect()

我想念这里明显的东西吗?

原来,问题的根源是Python RNG实现。引用官方文档:

该模块提供的功能实际上是random.Random类的隐藏实例的绑定方法。您可以实例化自己的Random实例,以获取不共享状态的生成器。

我假设NumPy的工作方式相同,并f使用RandomState实例重写如下

import os

import binascii

def f(x, seed=None):

seed = (

seed if seed is not None

else int(binascii.hexlify(os.urandom(4)), 16))

rs = np.random.RandomState(seed)

return rs.randint(1000)

使速度变慢,但可以解决问题。

虽然上述解释说,从收集我不是随机的结果还是不明白它是如何影响first/take(1)多个动作之间。

回答:

因此,这里的实际问题相对简单。Python中的每个子进程都从其父级继承其状态:

len(set(sc.parallelize(range(4), 4).map(lambda _: random.getstate()).collect()))

# 1

由于父状态没有理由在这种特定情况下更改,并且工作人员的寿命有限,因此每个孩子的状态在每次运行中都将完全相同。

以上是 PySpark中的随机数生成 的全部内容, 来源链接: utcz.com/qa/417557.html

回到顶部