如何将较小的ORC文件合并或合并为较大的ORC文件?

SO和Web上的大多数问题/答案都讨论了如何使用Hive将一堆小的ORC文件组合成一个更大的文件,但是,我的ORC文件是日志文件,每天都分开,因此我需要将它们分开。我只想每天“汇总”

ORC文件(它们是HDFS中的目录)。

我最有可能需要用Java编写解决方案,并且遇到过OrcFileMergeOperator,这可能是我需要使用的内容,但还为时过早。

解决此问题的最佳方法是什么?

回答:

这里有很好的答案,但是没有一个答案允许我进行日常工作,以便我可以进行日常汇总。我们记录了每天写入HDFS的日志文件,我不想每天进来时在Hive中运行查询。

我最终所做的事情对我来说似乎更加直接。我编写了一个Java程序,该程序使用ORC库扫描目录中的所有文件并创建这些文件的列表。然后打开一个新的Writer,它是“组合”文件(以“。”开头,因此从Hive中隐藏了,否则Hive将失败)。然后,程序将打开列表中的每个文件,并读取内容并将其写出到组合文件中。读取所有文件后,它将删除文件。我还添加了在需要时一次运行一个目录的功能。

注意:您将需要一个架构文件。日志日志可以在json“ journalctl -o json”中输出,然后可以使用Apache

ORC工具生成模式文件,也可以手动执行一个。ORC的自动生成很好,但手动总是更好。

注意:要按原样使用此代码,您将需要一个有效的keytab并在类路径中添加-Dkeytab =。

import java.io.FileNotFoundException;

import java.io.IOException;

import java.io.InputStream;

import java.net.InetAddress;

import java.util.ArrayList;

import java.util.List;

import org.apache.commons.io.IOUtils;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileStatus;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;

import org.apache.hadoop.security.UserGroupInformation;

import org.apache.orc.OrcFile;

import org.apache.orc.Reader;

import org.apache.orc.RecordReader;

import org.apache.orc.TypeDescription;

import org.apache.orc.Writer;

import com.cloudera.org.joda.time.LocalDate;

public class OrcFileRollUp {

private final static String SCHEMA = "journald.schema";

private final static String UTF_8 = "UTF-8";

private final static String HDFS_BASE_LOGS_DIR = "/<baseDir>/logs";

private static final String keytabLocation = System.getProperty("keytab");

private static final String kerberosUser = "<userName>";

private static Writer writer;

public static void main(String[] args) throws IOException {

Configuration conf = new Configuration();

conf.set("hadoop.security.authentication", "Kerberos");

InetAddress myHost = InetAddress.getLocalHost();

String kerberosPrincipal = String.format("%s/%s", kerberosUser, myHost.getHostName());

UserGroupInformation.setConfiguration(conf);

UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, keytabLocation);

int currentDay = LocalDate.now().getDayOfMonth();

int currentMonth = LocalDate.now().getMonthOfYear();

int currentYear = LocalDate.now().getYear();

Path path = new Path(HDFS_BASE_LOGS_DIR);

FileSystem fileSystem = path.getFileSystem(conf);

System.out.println("The URI is: " + fileSystem.getUri());

//Get Hosts:

List<String> allHostsPath = getHosts(path, fileSystem);

TypeDescription schema = TypeDescription.fromString(getSchema(SCHEMA)

.replaceAll("\n", ""));

//Open each file for reading and write contents

for(int i = 0; i < allHostsPath.size(); i++) {

String outFile = "." + currentYear + "_" + currentMonth + "_" + currentDay + ".orc.working"; //filename: .2018_04_24.orc.working

//Create list of files from directory and today's date OR pass a directory in via the command line in format

//hdfs://<namenode>:8020/HDFS_BASE_LOGS_DIR/<hostname>/2018/4/24/

String directory = "";

Path outFilePath;

Path argsPath;

List<String> orcFiles;

if(args.length == 0) {

directory = currentYear + "/" + currentMonth + "/" + currentDay;

outFilePath = new Path(allHostsPath.get(i) + "/" + directory + "/" + outFile);

try {

orcFiles = getAllFilePath(new Path(allHostsPath.get(i) + "/" + directory), fileSystem);

} catch (Exception e) {

continue;

}

} else {

outFilePath = new Path(args[0] + "/" + outFile);

argsPath = new Path(args[0]);

try {

orcFiles = getAllFilePath(argsPath, fileSystem);

} catch (Exception e) {

continue;

}

}

//Create List of files in the directory

FileSystem fs = outFilePath.getFileSystem(conf);

//Writer MUST be below ^^ or the combination file will be deleted as well.

if(fs.exists(outFilePath)) {

System.out.println(outFilePath + " exists, delete before continuing.");

} else {

writer = OrcFile.createWriter(outFilePath, OrcFile.writerOptions(conf)

.setSchema(schema));

}

for(int j = 0; j < orcFiles.size(); j++ ) {

Reader reader = OrcFile.createReader(new Path(orcFiles.get(j)), OrcFile.readerOptions(conf));

VectorizedRowBatch batch = reader.getSchema().createRowBatch();

RecordReader rows = reader.rows();

while (rows.nextBatch(batch)) {

if (batch != null) {

writer.addRowBatch(batch);

}

}

rows.close();

fs.delete(new Path(orcFiles.get(j)), false);

}

//Close File

writer.close();

//Remove leading "." from ORC file to make visible to Hive

outFile = fileSystem.getFileStatus(outFilePath)

.getPath()

.getName();

if (outFile.startsWith(".")) {

outFile = outFile.substring(1);

int lastIndexOf = outFile.lastIndexOf(".working");

outFile = outFile.substring(0, lastIndexOf);

}

Path parent = outFilePath.getParent();

fileSystem.rename(outFilePath, new Path(parent, outFile));

if(args.length != 0)

break;

}

}

private static String getSchema(String resource) throws IOException {

try (InputStream input = OrcFileRollUp.class.getResourceAsStream("/" + resource)) {

return IOUtils.toString(input, UTF_8);

}

}

public static List<String> getHosts(Path filePath, FileSystem fs) throws FileNotFoundException, IOException {

List<String> hostsList = new ArrayList<String>();

FileStatus[] fileStatus = fs.listStatus(filePath);

for (FileStatus fileStat : fileStatus) {

hostsList.add(fileStat.getPath().toString());

}

return hostsList;

}

private static List<String> getAllFilePath(Path filePath, FileSystem fs) throws FileNotFoundException, IOException {

List<String> fileList = new ArrayList<String>();

FileStatus[] fileStatus = fs.listStatus(filePath);

for (FileStatus fileStat : fileStatus) {

if (fileStat.isDirectory()) {

fileList.addAll(getAllFilePath(fileStat.getPath(), fs));

} else {

fileList.add(fileStat.getPath()

.toString());

}

}

for(int i = 0; i< fileList.size(); i++) {

if(!fileList.get(i).endsWith(".orc"))

fileList.remove(i);

}

return fileList;

}

}

以上是 如何将较小的ORC文件合并或合并为较大的ORC文件? 的全部内容, 来源链接: utcz.com/qa/434696.html

回到顶部