如何计算pyspark中每行某些列的最大值

我在pyspark中使用sqlContext.sql函数读取了一个数据框。它包含4个数字列,每个客户都有信息(这是键ID)。我需要计算每个客户端的最大值并将此值加入数据框:

+--------+-------+-------+-------+-------+

|ClientId|m_ant21|m_ant22|m_ant23|m_ant24|

+--------+-------+-------+-------+-------+

| 0| null| null| null| null|

| 1| null| null| null| null|

| 2| null| null| null| null|

| 3| null| null| null| null|

| 4| null| null| null| null|

| 5| null| null| null| null|

| 6| 23| 13| 17| 8|

| 7| null| null| null| null|

| 8| null| null| null| null|

| 9| null| null| null| null|

| 10| 34| 2| 4| 0|

| 11| 0| 0| 0| 0|

| 12| 0| 0| 0| 0|

| 13| 0| 0| 30| 0|

| 14| null| null| null| null|

| 15| null| null| null| null|

| 16| 37| 29| 29| 29|

| 17| 0| 0| 16| 0|

| 18| 0| 0| 0| 0|

| 19| null| null| null| null|

+--------+-------+-------+-------+-------+

在这种情况下,客户端“ six”的最大值为23,而客户端“ ten”的最大值为30。“ null”在新列中自然为null。

请帮助我显示如何执行此操作。

回答:

我认为将值组合到列表中而不是找到最大值将是最简单的方法。

from pyspark.sql.types import *

schema = StructType([

StructField("ClientId", IntegerType(), True),

StructField("m_ant21", IntegerType(), True),

StructField("m_ant22", IntegerType(), True),

StructField("m_ant23", IntegerType(), True),

StructField("m_ant24", IntegerType(), True)

])

df = spark\

.createDataFrame(

data=[(0, None, None, None, None),

(1, 23, 13, 17, 99),

(2, 0, 0, 0, 1),

(3, 0, None, 1, 0)],

schema=schema)

import pyspark.sql.functions as F

def agg_to_list(m21,m22,m23,m24):

return [m21,m22,m23,m24]

u_agg_to_list = F.udf(agg_to_list, ArrayType(IntegerType()))

df2 = df.withColumn('all_values', u_agg_to_list('m_ant21', 'm_ant22', 'm_ant23', 'm_ant24'))\

.withColumn('max', F.sort_array("all_values", False)[0])\

.select('ClientId', 'max')

df2.show()

输出:

+--------+----+

|ClientId|max |

+--------+----+

|0 |null|

|1 |99 |

|2 |1 |

|3 |1 |

+--------+----+

以上是 如何计算pyspark中每行某些列的最大值 的全部内容, 来源链接: utcz.com/qa/428602.html

回到顶部