如何使用JAVA在Spark DataFrame上调用UDF?
与此处类似的问题,但在此处没有足够的评论要点。
根据最新的Spark 文档,udf
可以两种不同的方式使用,一种用于SQL,另一种用于DataFrame。我找到了多个如何udf
与sql
一起使用的示例,但还没有找到有关如何udf
直接在DataFrame上使用a的任何示例。
op所提供的解决方案,在上面链接的问题上使用__callUDF()__
,_deprecated_
根据Spark Java
API文档,该解决方案将在Spark 2.0中删除。在那里,它说:
“因为它在udf()中是多余的”
因此,这意味着我应该能够使用__udf()__
my进行校准udf
,但是我不知道该怎么做。我没有发现任何说明Java-
Spark程序语法的内容。我想念什么?
import org.apache.spark.sql.api.java.UDF1;.
.
UDF1 mode = new UDF1<String[], String>() {
public String call(final String[] types) throws Exception {
return types[0];
}
};
sqlContext.udf().register("mode", mode, DataTypes.StringType);
df.???????? how do I call my udf (mode) on a given column of my DataFrame df?
回答:
Scala风格udf
可以直接调用:
import static org.apache.spark.sql.functions.*;import org.apache.spark.sql.expressions.UserDefinedFunction;
UserDefinedFunction mode = udf(
(Seq<String> ss) -> ss.headOption(), DataTypes.StringType
);
df.select(mode.apply(col("vs"))).show();
即使我们假设您的UDF有用并且不能被简单的getItem
调用替换,它的签名也不正确。数组列是使用Scala
WrappedArray
而不是普通Java数组公开的,因此您必须调整签名:
UDF1 mode = new UDF1<Seq<String>, String>() { public String call(final Seq<String> types) throws Exception {
return types.headOption();
}
};
如果已经注册了UDF:
sqlContext.udf().register("mode", mode, DataTypes.StringType);
您可以简单地使用callUDF
(这是1.5中引入的新功能)按名称进行调用:
df.select(callUDF("mode", col("vs"))).show();
您也可以在中使用它selectExprs
:
df.selectExpr("mode(vs)").show();
以上是 如何使用JAVA在Spark DataFrame上调用UDF? 的全部内容, 来源链接: utcz.com/qa/405737.html