如何将嵌套的Avro GenericRecord转换为行

我有一个代码可以使用功能将我的avro记录转换为Row avroToRowConverter()

directKafkaStream.foreachRDD(rdd -> {

JavaRDD<Row> newRDD= rdd.map(x->{

Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(SchemaRegstryClient.getLatestSchema("poc2"));

return avroToRowConverter(recordInjection.invert(x._2).get());

});

此功能不适用于嵌套模式(TYPE= UNION)

private static Row avroToRowConverter(GenericRecord avroRecord) {

if (null == avroRecord) {

return null;

}

//GenericData

Object[] objectArray = new Object[avroRecord.getSchema().getFields().size()];

StructType structType = (StructType) SchemaConverters.toSqlType(avroRecord.getSchema()).dataType();

for (Schema.Field field : avroRecord.getSchema().getFields()) {

if(field.schema().getType().toString().equalsIgnoreCase("STRING") || field.schema().getType().toString().equalsIgnoreCase("ENUM")){

objectArray[field.pos()] = ""+avroRecord.get(field.pos());

}else {

objectArray[field.pos()] = avroRecord.get(field.pos());

}

}

return new GenericRowWithSchema(objectArray, structType);

}

谁能建议我如何将复杂的架构转换为ROW?

回答:

有,SchemaConverters.createConverterToSQL但是不幸的是私人的。有一些PR可以将其公开,但是它们从未合并:

  • https://github.com/databricks/spark-avro/pull/89
  • https://github.com/databricks/spark-avro/pull/132

尽管我们使用了一种解决方法。

您可以通过在com.databricks.spark.avro包中创建一个类来公开它:

package com.databricks.spark.avro

import org.apache.avro.Schema

import org.apache.avro.generic.GenericRecord

import org.apache.spark.sql.Row

import org.apache.spark.sql.types.DataType

object MySchemaConversions {

def createConverterToSQL(avroSchema: Schema, sparkSchema: DataType): (GenericRecord) => Row =

SchemaConverters.createConverterToSQL(avroSchema, sparkSchema).asInstanceOf[(GenericRecord) => Row]

}

然后,您可以在代码中使用它,如下所示:

final DataType myAvroType = SchemaConverters.toSqlType(MyAvroRecord.getClassSchema()).dataType();

final Function1<GenericRecord, Row> myAvroRecordConverter =

MySchemaConversions.createConverterToSQL(MyAvroRecord.getClassSchema(), myAvroType);

Row[] convertAvroRecordsToRows(List<GenericRecord> records) {

return records.stream().map(myAvroRecordConverter::apply).toArray(Row[]::new);

}

对于一条记录,您可以这样称呼它:

final Row row = myAvroRecordConverter.apply(record);

以上是 如何将嵌套的Avro GenericRecord转换为行 的全部内容, 来源链接: utcz.com/qa/410185.html

回到顶部