从S3并行读取多个文件(Spark,Java)

我对此进行了一些讨论,但还不太了解正确的解决方案:我想将S3中的数百个文件加载到RDD中。这是我现在的做法:

ObjectListing objectListing = s3.listObjects(new ListObjectsRequest().

withBucketName(...).

withPrefix(...));

List<String> keys = new LinkedList<>();

objectListing.getObjectSummaries().forEach(summery -> keys.add(summery.getKey())); // repeat while objectListing.isTruncated()

JavaRDD<String> events = sc.parallelize(keys).flatMap(new ReadFromS3Function(clusterProps));

ReadFromS3Function不使用实际的阅读AmazonS3客户端:

    public Iterator<String> call(String s) throws Exception {

AmazonS3 s3Client = getAmazonS3Client(properties);

S3Object object = s3Client.getObject(new GetObjectRequest(...));

InputStream is = object.getObjectContent();

List<String> lines = new LinkedList<>();

String str;

try {

BufferedReader reader = new BufferedReader(new InputStreamReader(is));

if (is != null) {

while ((str = reader.readLine()) != null) {

lines.add(str);

}

} else {

...

}

} finally {

...

}

return lines.iterator();

我从在Scala中针对相同问题看到的答案中“翻译”了一下。我认为也可以将整个路径列表传递给sc.textFile(...),但是我不确定哪种是最佳做法。

回答:

根本的问题是,在s3中列出对象的速度确实很慢,并且每当执行树遍历时,看起来像目录树的方式都会降低性能(就像路径的通配符模式处理一样)。

帖子中的代码正在列出所有子对象,这些方法可提供更好的性能,本质上是Hadoop 2.8和s3a

listFiles(路径,递归)附带的内容,请参见HADOOP-13208。

获得该清单后,您将获得对象路径的字符串,然后可以将其映射到s3a / s3n路径以将spark用作文本文件输入,然后将其应用于

val files = keys.map(key -> s"s3a://$bucket/$key").mkString(",")

sc.textFile(files).map(...)

并按要求提供以下Java代码。

String prefix = "s3a://" + properties.get("s3.source.bucket") + "/";

objectListing.getObjectSummaries().forEach(summary -> keys.add(prefix+summary.getKey()));

// repeat while objectListing truncated

JavaRDD<String> events = sc.textFile(String.join(",", keys))

请注意,我已将s3n切换为s3a,因为只要在CP上具有hadoop-awsamazon-

sdkJAR,s3a连接器就是您应该使用的连接器。更好,它是一种针对人员(我)针对火花工作负载进行维护和测试的工具。请参阅Hadoop

S3连接器的历史。

以上是 从S3并行读取多个文件(Spark,Java) 的全部内容, 来源链接: utcz.com/qa/415608.html

回到顶部