Spark - 为什么ArrayBuffer似乎获取尚未遍历的元素

为什么MapPartition中的ArrayBuffer似乎具有尚未遍历的元素?Spark - 为什么ArrayBuffer似乎获取尚未遍历的元素

例如,我看这段代码的方式,第一项应该有1个元素,第二个2,第三个3等等。第一个ArrayBuffer输出可能有9个项目。这似乎意味着在第一次输出之前有9次迭代,但收益计数清楚地表明这是第一次迭代。

val a = ArrayBuffer[Int]() 

for(i <- 1 to 9) a += i

for(i <- 1 to 9) a += 9-i

val rdd1 = sc.parallelize(a.toArray())

def timePivotWithLoss(iter: Iterator[Int]) : Iterator[Row] = {

val currentArray = ArrayBuffer[Int]()

var loss = 0

var yields = 0

for (item <- iter) yield {

currentArray += item

//var left : Int = -1

yields += 1

Row(yields, item.toString(), currentArray)

}

}

rdd1.mapPartitions(it => timePivotWithLoss(it)).collect()

输出 -

[1,1,ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9)] 

[2,2,ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9)]

[3,3,ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9)]

[4,4,ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9)]

[5,5,ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9)]

[6,6,ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9)]

[7,7,ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9)]

[8,8,ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9)]

[9,9,ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9)]

[1,8,ArrayBuffer(8, 7, 6, 5, 4, 3, 2, 1, 0)]

[2,7,ArrayBuffer(8, 7, 6, 5, 4, 3, 2, 1, 0)]

[3,6,ArrayBuffer(8, 7, 6, 5, 4, 3, 2, 1, 0)]

[4,5,ArrayBuffer(8, 7, 6, 5, 4, 3, 2, 1, 0)]

[5,4,ArrayBuffer(8, 7, 6, 5, 4, 3, 2, 1, 0)]

[6,3,ArrayBuffer(8, 7, 6, 5, 4, 3, 2, 1, 0)]

[7,2,ArrayBuffer(8, 7, 6, 5, 4, 3, 2, 1, 0)]

[8,1,ArrayBuffer(8, 7, 6, 5, 4, 3, 2, 1, 0)]

[9,0,ArrayBuffer(8, 7, 6, 5, 4, 3, 2, 1, 0)]

回答:

这是因为在分隔使用参照同可变对象的所有行。溢出到光盘可能进一步使它不确定,某些对象被序列化并且不能反映这些变化。

可以使用可变引用和不可变对象:

def timePivotWithLoss(iter: Iterator[Int]) : Iterator[Row] = { 

var currentArray = Vector[Int]()

var loss = 0

var yields = 0

for (item <- iter) yield {

currentArray = currentArray :+ item

yields += 1

Row(yields, item.toString(), currentArray)

}

}

但总体可变状态和火花都没有很好的搭配。

以上是 Spark - 为什么ArrayBuffer似乎获取尚未遍历的元素 的全部内容, 来源链接: utcz.com/qa/266404.html

回到顶部