从Flink 1.3.2升级到1.4.0 hadoop文件系统和路径问题

我最近尝试从Flink 1.3.2升级到1.4.0,我遇到了一些问题,无法再导入org.apache.hadoop.fs.{FileSystem, Path}。问题是发生在两个地方:从Flink 1.3.2升级到1.4.0 hadoop文件系统和路径问题

ParquetWriter:

import org.apache.avro.Schema 

import org.apache.avro.generic.GenericRecord

import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.flink.streaming.connectors.fs.Writer

import org.apache.parquet.avro.AvroParquetWriter

import org.apache.parquet.hadoop.ParquetWriter

import org.apache.parquet.hadoop.metadata.CompressionCodecName

class AvroWriter[T <: GenericRecord]() extends Writer[T] {

@transient private var writer: ParquetWriter[T] = _

@transient private var schema: Schema = _

override def write(element: T): Unit = {

schema = element.getSchema

writer.write(element)

}

override def duplicate(): AvroWriter[T] = new AvroWriter[T]()

override def close(): Unit = writer.close()

override def getPos: Long = writer.getDataSize

override def flush(): Long = writer.getDataSize

override def open(fs: FileSystem, path: Path): Unit = {

writer = AvroParquetWriter.builder[T](path)

.withSchema(schema)

.withCompressionCodec(CompressionCodecName.SNAPPY)

.build()

}

}

CustomBucketer:

import org.apache.flink.streaming.connectors.fs.bucketing.Bucketer 

import org.apache.flink.streaming.connectors.fs.Clock

import org.apache.hadoop.fs.{FileSystem, Path}

import java.io.ObjectInputStream

import java.text.SimpleDateFormat

import java.util.Date

import org.apache.avro.generic.GenericRecord

import scala.reflect.ClassTag

class RecordFieldBucketer[T <: GenericRecord: ClassTag](dateField: String = null, dateFieldFormat: String = null, bucketOrder: Seq[String]) extends Bucketer[T] {

@transient var dateFormatter: SimpleDateFormat = _

private def readObject(in: ObjectInputStream): Unit = {

in.defaultReadObject()

if (dateField != null && dateFieldFormat != null) {

dateFormatter = new SimpleDateFormat(dateFieldFormat)

}

}

override def getBucketPath(clock: Clock, basePath: Path, element: T): Path = {

val partitions = bucketOrder.map(field => {

if (field == dateField) {

field + "=" + dateFormatter.format(new Date(element.get(field).asInstanceOf[Long]))

} else {

field + "=" + element.get(field)

}

}).mkString("/")

new Path(basePath + "/" + partitions)

}

}

我注意到,弗林克现在有:

import org.apache.flink.core.fs.{FileSystem, Path} 

但新Path不似乎与AvroParquetWritergetBucketPath方法。我知道Flink的FileSystem和Hadoop依赖关系发生了一些变化,我只是不确定需要导入的内容才能让我的代码再次运行。

我是否甚至需要使用Hadoop依赖关系,或者现在有不同的方式来编写Parquet文件并将其分区为s3?

build.sbt:

val flinkVersion = "1.4.0" 

libraryDependencies ++= Seq(

"org.apache.flink" %% "flink-scala" % flinkVersion % Provided,

"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided,

"org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,

"org.apache.flink" %% "flink-connector-filesystem" % flinkVersion,

"org.apache.flink" % "flink-metrics-core" % flinkVersion,

"org.apache.flink" % "flink-metrics-graphite" % flinkVersion,

"org.apache.kafka" %% "kafka" % "0.10.0.1",

"org.apache.avro" % "avro" % "1.7.7",

"org.apache.parquet" % "parquet-hadoop" % "1.8.1",

"org.apache.parquet" % "parquet-avro" % "1.8.1",

"io.confluent" % "kafka-avro-serializer" % "3.2.2",

"com.fasterxml.jackson.core" % "jackson-core" % "2.9.2"

)

回答:

在hadoop-commons项目中找到必要的org.apache.hadoop.fs.{FileSystem, Path}类。

回答:

建设 “Hadoop的自由弗林克” 是1.4版本的一个主要特点。 所有您需要做的是包括Hadoop的依赖关系到类路径或引用的changelogs:

...这也意味着,在情况下,您使用的连接器到HDFS,如BucketingSink或RollingSink,你现在必须确保您使用带有捆绑Hadoop依赖项的Flink分发版,或者在为应用程序构建jar文件时确保包含Hadoop依赖项。

以上是 从Flink 1.3.2升级到1.4.0 hadoop文件系统和路径问题 的全部内容, 来源链接: utcz.com/qa/266689.html

回到顶部