如何使用JAVA在Spark DataFrame上调用UDF?

与此处类似的问题,但在此处没有足够的评论要点。

根据最新的Spark 文档,udf可以两种不同的方式使用,一种用于SQL,另一种用于DataFrame。我找到了多个如何udf与sql

一起使用的示例,但还没有找到有关如何udf直接在DataFrame上使用a的任何示例。

op所提供的解决方案,在上面链接的问题上使用__callUDF()___deprecated_根据Spark Java

API文档,该解决方案将在Spark 2.0中删除。在那里,它说:

“因为它在udf()中是多余的”

因此,这意味着我应该能够使用__udf()__my进行校准udf,但是我不知道该怎么做。我没有发现任何说明Java-

Spark程序语法的内容。我想念什么?

import org.apache.spark.sql.api.java.UDF1;

.

.

UDF1 mode = new UDF1<String[], String>() {

public String call(final String[] types) throws Exception {

return types[0];

}

};

sqlContext.udf().register("mode", mode, DataTypes.StringType);

df.???????? how do I call my udf (mode) on a given column of my DataFrame df?

回答:

Scala风格udf可以直接调用:

import static org.apache.spark.sql.functions.*;

import org.apache.spark.sql.expressions.UserDefinedFunction;

UserDefinedFunction mode = udf(

(Seq<String> ss) -> ss.headOption(), DataTypes.StringType

);

df.select(mode.apply(col("vs"))).show();

即使我们假设您的UDF有用并且不能被简单的getItem调用替换,它的签名也不正确。数组列是使用Scala

WrappedArray而不是普通Java数组公开的,因此您必须调整签名:

UDF1 mode = new UDF1<Seq<String>, String>() {

public String call(final Seq<String> types) throws Exception {

return types.headOption();

}

};

如果已经注册了UDF:

sqlContext.udf().register("mode", mode, DataTypes.StringType);

您可以简单地使用callUDF(这是1.5中引入的新功能)按名称进行调用:

df.select(callUDF("mode", col("vs"))).show();

您也可以在中使用它selectExprs

df.selectExpr("mode(vs)").show();

以上是 如何使用JAVA在Spark DataFrame上调用UDF? 的全部内容, 来源链接: utcz.com/qa/405737.html

回到顶部