Apache Spark中的分层数据处理

我在Spark(v2.1.1)中有一个包含分层数据的3列(如下所示)的数据集。Apache Spark中的分层数据处理

  • 我的目标的目标是增量编号分配给基础上,父子层次的每一行。从图形上可以说,分层数据是一个树的集合。
  • 根据下表,我已经有基于'Global_ID'分组的行。现在我想以 的增量顺序生成'Value'列,但是基于 'Parent'和'Child'列的数据层次结构。


表格表示(数值是所需的输出):

+-----------+--------+-------+   +-----------+--------+-------+-------+ 

| Current Dataset | | Desired Dataset (Output) |

+-----------+--------+-------+ +-----------+--------+-------+-------+

| Global_ID | Parent | Child | | Global_ID | Parent | Child | Value |

+-----------+--------+-------+ +-----------+--------+-------+-------+

| 111 | 111 | 123 | | 111 | 111 | 111 | 1 |

| 111 | 135 | 246 | | 111 | 111 | 123 | 2 |

| 111 | 123 | 456 | | 111 | 123 | 789 | 3 |

| 111 | 123 | 789 | | 111 | 123 | 456 | 4 |

| 111 | 111 | 111 | | 111 | 111 | 135 | 5 |

| 111 | 135 | 468 | | 111 | 135 | 246 | 6 |

| 111 | 135 | 268 | | 111 | 135 | 468 | 7 |

| 111 | 268 | 321 | | 111 | 135 | 268 | 8 |

| 111 | 138 | 139 | | 111 | 268 | 321 | 9 |

| 111 | 111 | 135 | | 111 | 111 | 138 | 10 |

| 111 | 111 | 138 | | 111 | 138 | 139 | 11 |

| 222 | 222 | 654 | | 222 | 222 | 222 | 12 |

| 222 | 654 | 721 | | 222 | 222 | 987 | 13 |

| 222 | 222 | 222 | | 222 | 222 | 654 | 14 |

| 222 | 721 | 127 | | 222 | 654 | 721 | 15 |

| 222 | 222 | 987 | | 222 | 721 | 127 | 16 |

| 333 | 333 | 398 | | 333 | 333 | 333 | 17 |

| 333 | 333 | 498 | | 333 | 333 | 398 | 18 |

| 333 | 333 | 333 | | 333 | 333 | 498 | 19 |

| 333 | 333 | 598 | | 333 | 333 | 598 | 20 |

+-----------+--------+-------+ +-----------+--------+-------+-------+

树表示(期望值旁边的每个节点表示):

     +-----+           +-----+ 

1 | 111 | 17 | 333 |

+--+--+ +--+--+

| |

+---------------+--------+-----------------+ +----------+----------+

| | | | | |

+--v--+ +--v--+ +--v--+ +--v--+ +--v--+ +--v--+

2 | 123 | 5 | 135 | 10 | 138 | | 398 | | 498 | | 598 |

+--+--+ +--+--+ +--+--+ +--+--+ +--+--+ +--+--+

+-----+-----+ +--------+--------+ | 18 19 20

| | | | | |

+--v--+ +--v--+ +--v--+ +--v--+ +--v--+ +--v--+

| 789 | | 456 | | 246 | | 468 | | 268 | | 139 | +-----+

+-----+ +-----+ +-----+ +-----+ +--+--+ +-----+ 12 | 222 |

3 4 6 7 8 | 11 +--+--+

+--v--+ |

| 321 | +------+-------+

+--+--+ | |

9 +--v--+ +--v--+

13 | 987 | 14 | 654 |

+--+--+ +--+--+

|

+--v--+

15 | 721 |

+--+--+

|

+--v--+

16 | 127 |

+--+--+


代码段:

Dataset<Row> myDataset = spark 

.sql("select Global_ID, Parent, Child from RECORDS");

JavaPairRDD<Row,Long> finalDataset = myDataset.groupBy(new Column("Global_ID"))

.agg(functions.sort_array(functions.collect_list(new Column("Parent").as("parent_col"))),

functions.sort_array(functions.collect_list(new Column("Child").as("child_col"))))

.orderBy(new Column("Global_ID"))

.withColumn("vars", functions.explode(<Spark UDF>)

.select(new Column("vars"),new Column("parent_col"),new Column("child_col"))

.javaRDD().zipWithIndex();

// Sample UDF (TODO: Actual Implementation)

spark.udf().register("computeValue",

(<Column Names>) -> <functionality & implementation>,

DataTypes.<xxx>);


经过大量的调查研究,并通过博客,许多建议去,我曾尝试下面的方法,但无济于事,以实现我的方案的结果。

技术堆栈:

  • Apache的火花(V2.1。1)

  • 爪哇8

  • AWS EMR集群(火花应用部署)


数据量:

  • 大约〜Dataset中


20000000点方法下的行尝试:

  1. 星火GraphX + GraphFrames:

    • 使用这种组合,我只能实现顶点和边之间的关系,但它不适合我的用例。
      参考:https://graphframes.github.io/user-guide.html
  2. 星火GraphX预凝胶API:

    • 这是最接近我能得到实现预期的结果,但不幸的是我无法找到一个Java代码片断一样。 在其中一个博客中提供的示例是Scala,我不熟悉 。
      参考:https://dzone.com/articles/processing-hierarchical-data-using-spark-graphx-pr


替代品的任何建议(或)在当前的方法修改将是很有益的,因为我搞清楚这个用例的解决方案完全丢失。

感谢您的帮助!谢谢!

回答:

注意:下面的解决方案是scala spark。您可以轻松转换为Java代码。

检查了这一点。我试着用Spark Sql来做这件事,你可以得到一个想法。基本上的想法是在对它们进行聚合和分组的同时对孩子,父母和全球身份进行排序。一旦按globalid进行分组和排序,则展开其余部分。你会得到有序的结果表到以后你可以zipWithIndex添加等级(值)

import org.apache.spark.sql.SQLContext 

import org.apache.spark.sql.functions._

import org.apache.spark.sql.expressions.UserDefinedFunction

import org.apache.spark.sql.functions.udf

val sqlContext = new SQLContext(sc)

import sqlContext.implicits._

val t = Seq((111,111,123), (111,111,111), (111,123,789), (111,268,321), (222,222,654), (222,222,222), (222,721,127), (333,333,398), (333,333,333), (333,333,598))

val ddd = sc.parallelize(t).toDF

val zip = udf((xs: Seq[Int], ys: Seq[Int]) => xs zip ys)

val dd1 = ddd

.groupBy($"_1")

.agg(sort_array(collect_list($"_2")).as("v"),

sort_array(collect_list($"_3")).as("w"))

.orderBy(asc("_1"))

.withColumn("vars", explode(zip($"v", $"w")))

.select($"_1", $"vars._1", $"vars._2").rdd.zipWithIndex

dd1.collect

输出

res24: Array[(org.apache.spark.sql.Row, Long)] = Array(([111,111,111],0), ([111,111,123],1), ([111,123,321],2), 

([111,268,789],3), ([222,222,127],4), ([222,222,222],5), ([222,721,654],6),([333,333,333],7), ([333,333,398],8), ([333,333,598],9))

以上是 Apache Spark中的分层数据处理 的全部内容, 来源链接: utcz.com/qa/266528.html

回到顶部