什么时候合并发生在用户定义的聚合函数中Spark中的UDAF

我想知道Spark将在哪些情况下作为UDAF函数的一部分执行合并。什么时候合并发生在用户定义的聚合函数中Spark中的UDAF

动机: 我在Spark项目的窗口中使用了很多UDAF函数。我经常想回答这样一个问题:

信用卡交易在同一个国家与30天窗口中的当前交易进行了多少次?

该窗口将从当前事务开始,但不会将其包括在计数中。它需要通过当前交易的价值来了解过去30天内哪个国家/地区的数量。

val rollingWindow = Window 

.partitionBy(partitionByColumn)

.orderBy(orderByColumn.desc)

.rangeBetween(0, windowSize)

df.withColumn(

outputColumnName,

customUDAF(inputColumn, orderByColumn).over(rollingWindow))

我写了我的customUDAF来做计数。我总是使用.orderBy(orderByColumn.desc)并感谢.desc当前交易在计算期间在窗口中首先显示。

UDAF函数需要实现merge函数,该函数在并行计算中合并两个中间聚合缓冲区。如果发生任何合并,我的current transaction对于不同的缓冲区可能会不同,并且UDAF的结果将不正确。

我写了一个UDAF函数来计算我的数据集合并的数量,并且只保留窗口中的第一个事务与当前事务进行比较。

class FirstUDAF() extends UserDefinedAggregateFunction { 

def inputSchema = new StructType().add("x", StringType)

.add("y", StringType)

def bufferSchema = new StructType()

.add("first", StringType)

.add("numMerge", IntegerType)

def dataType = new StructType()

.add("firstCode", StringType)

.add("numMerge", IntegerType)

def deterministic = true

def initialize(buffer: MutableAggregationBuffer) = {

buffer(0) = ""

buffer(1) = 1

}

def update(buffer: MutableAggregationBuffer, input: Row): Unit = {

if (buffer.getString(0) == "")

buffer(0) = input.getString(0)

}

def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {

buffer1(1) = buffer1.getInt(1) + buffer2.getInt(1)

}

def evaluate(buffer: Row) = buffer

}

当我在本地主用16个CPU与火花2.0.1运行它,也有从未在窗口中的任何兼并和第一笔交易始终是当前事务。这就是我要的。在不久的将来,我将在x100更大的数据集和真正的分布式Spark群集上运行我的代码,并想知道是否可以在那里发生合并。

问题:

  • 在何种情况下/ conditons兼并发生在UDAF?
  • 使用orderBy执行Windows有没有兼并?
  • 可以告诉Spark不要做兼并吗?

回答:

在UDAF中发生哪些情况/条件合并?

merge在聚合函数(“map side aggregation”)的部分应用程序在shuffle(“reduce side aggregation”)之后合并时调用。

使用orderBy执行Windows有没有兼并?

当前执行从来没有。至于窗口函数只是看中了groupByKey,并没有部分聚合。这当然是实施细节,将来可能会在没有进一步通知的情况下进行更改。

是否有可能告诉Spark不做兼并?

不是。但是,如果数据已通过聚合密钥分区,则不需要merge,只使用combine

最后:

多少次信用卡交易是在同一个国家提出在30天的窗口中的当前交易?

不要求UDAFs或窗口函数。我可能会用o.a.s.sql.functions.window创建滚动窗口,按用户,国家和窗口进行汇总,然后返回输入。

以上是 什么时候合并发生在用户定义的聚合函数中Spark中的UDAF 的全部内容, 来源链接: utcz.com/qa/262382.html

回到顶部