java使用多线程读取超大文件

接上次写的“JAVA读取超大文件”。在读取超过10G的文件时会发现一次读一行的速度实在是不能接受,想到使用多线程+FileChannel来做一个使用多线程版本。

基本思路如下:

1.计算出文件总大小

2.分段处理,计算出每个线程读取文件的开始与结束位置

  (文件大小/线程数)*N,N是指第几个线程,这样能得到每个线程在读该文件的大概起始位置

使用"大概起始位置",作为读文件的开始偏移量(fileChannel.position("大概起始位置")),来读取该文件,直到读到第一个换行符,记录下这个换行符的位置,作为该线程的准确起 始位置.同时它也是上一个线程的结束位置.最后一个线程的结束位置也直接设置为-1

3.启动线程,每个线程从开始位置读取到结束位置为止

代码如下:

读文件工具类

import java.io.*;

import java.nio.ByteBuffer;

import java.nio.channels.FileChannel;

import java.util.Observable;

/**

* Created with IntelliJ IDEA.

* User: okey

* Date: 14-4-2

* Time: 下午3:12

* 读取文件

*/

public class ReadFile extends Observable {

private int bufSize = 1024;

// 换行符

private byte key = "\n".getBytes()[0];

// 当前行数

private long lineNum = 0;

// 文件编码,默认为gb2312

private String encode = "gb2312";

// 具体业务逻辑监听器

private ReaderFileListener readerListener;

public void setEncode(String encode) {

this.encode = encode;

}

public void setReaderListener(ReaderFileListener readerListener) {

this.readerListener = readerListener;

}

/**

* 获取准确开始位置

* @param file

* @param position

* @return

* @throws Exception

*/

public long getStartNum(File file, long position) throws Exception {

long startNum = position;

FileChannel fcin = new RandomAccessFile(file, "r").getChannel();

fcin.position(position);

try {

int cache = 1024;

ByteBuffer rBuffer = ByteBuffer.allocate(cache);

// 每次读取的内容

byte[] bs = new byte[cache];

// 缓存

byte[] tempBs = new byte[0];

String line = "";

while (fcin.read(rBuffer) != -1) {

int rSize = rBuffer.position();

rBuffer.rewind();

rBuffer.get(bs);

rBuffer.clear();

byte[] newStrByte = bs;

// 如果发现有上次未读完的缓存,则将它加到当前读取的内容前面

if (null != tempBs) {

int tL = tempBs.length;

newStrByte = new byte[rSize + tL];

System.arraycopy(tempBs, 0, newStrByte, 0, tL);

System.arraycopy(bs, 0, newStrByte, tL, rSize);

}

// 获取开始位置之后的第一个换行符

int endIndex = indexOf(newStrByte, 0);

if (endIndex != -1) {

return startNum + endIndex;

}

tempBs = substring(newStrByte, 0, newStrByte.length);

startNum += 1024;

}

} catch (Exception e) {

e.printStackTrace();

} finally {

fcin.close();

}

return position;

}

/**

* 从设置的开始位置读取文件,一直到结束为止。如果 end设置为负数,刚读取到文件末尾

* @param fullPath

* @param start

* @param end

* @throws Exception

*/

public void readFileByLine(String fullPath, long start, long end) throws Exception {

File fin = new File(fullPath);

if (fin.exists()) {

FileChannel fcin = new RandomAccessFile(fin, "r").getChannel();

fcin.position(start);

try {

ByteBuffer rBuffer = ByteBuffer.allocate(bufSize);

// 每次读取的内容

byte[] bs = new byte[bufSize];

// 缓存

byte[] tempBs = new byte[0];

String line = "";

// 当前读取文件位置

long nowCur = start;

while (fcin.read(rBuffer) != -1) {

nowCur += bufSize;

int rSize = rBuffer.position();

rBuffer.rewind();

rBuffer.get(bs);

rBuffer.clear();

byte[] newStrByte = bs;

// 如果发现有上次未读完的缓存,则将它加到当前读取的内容前面

if (null != tempBs) {

int tL = tempBs.length;

newStrByte = new byte[rSize + tL];

System.arraycopy(tempBs, 0, newStrByte, 0, tL);

System.arraycopy(bs, 0, newStrByte, tL, rSize);

}

// 是否已经读到最后一位

boolean isEnd = false;

// 如果当前读取的位数已经比设置的结束位置大的时候,将读取的内容截取到设置的结束位置

if (end > 0 && nowCur > end) {

// 缓存长度 - 当前已经读取位数 - 最后位数

int l = newStrByte.length - (int) (nowCur - end);

newStrByte = substring(newStrByte, 0, l);

isEnd = true;

}

int fromIndex = 0;

int endIndex = 0;

// 每次读一行内容,以 key(默认为\n) 作为结束符

while ((endIndex = indexOf(newStrByte, fromIndex)) != -1) {

byte[] bLine = substring(newStrByte, fromIndex, endIndex);

line = new String(bLine, 0, bLine.length, encode);

lineNum++;

// 输出一行内容,处理方式由调用方提供

readerListener.outLine(line.trim(), lineNum, false);

fromIndex = endIndex + 1;

}

// 将未读取完成的内容放到缓存中

tempBs = substring(newStrByte, fromIndex, newStrByte.length);

if (isEnd) {

break;

}

}

// 将剩下的最后内容作为一行,输出,并指明这是最后一行

String lineStr = new String(tempBs, 0, tempBs.length, encode);

readerListener.outLine(lineStr.trim(), lineNum, true);

} catch (Exception e) {

e.printStackTrace();

} finally {

fcin.close();

}

} else {

throw new FileNotFoundException("没有找到文件:" + fullPath);

}

// 通知观察者,当前工作已经完成

setChanged();

notifyObservers(start+"-"+end);

}

/**

* 查找一个byte[]从指定位置之后的一个换行符位置

*

* @param src

* @param fromIndex

* @return

* @throws Exception

*/

private int indexOf(byte[] src, int fromIndex) throws Exception {

for (int i = fromIndex; i < src.length; i++) {

if (src[i] == key) {

return i;

}

}

return -1;

}

/**

* 从指定开始位置读取一个byte[]直到指定结束位置为止生成一个全新的byte[]

*

* @param src

* @param fromIndex

* @param endIndex

* @return

* @throws Exception

*/

private byte[] substring(byte[] src, int fromIndex, int endIndex) throws Exception {

int size = endIndex - fromIndex;

byte[] ret = new byte[size];

System.arraycopy(src, fromIndex, ret, 0, size);

return ret;

}

}

读文件线程

/**

* Created with IntelliJ IDEA.

* User: okey

* Date: 14-4-2

* Time: 下午4:50

* To change this template use File | Settings | File Templates.

*/

public class ReadFileThread extends Thread {

private ReaderFileListener processPoiDataListeners;

private String filePath;

private long start;

private long end;

public ReadFileThread(ReaderFileListener processPoiDataListeners,long start,long end,String file) {

this.setName(this.getName()+"-ReadFileThread");

this.start = start;

this.end = end;

this.filePath = file;

this.processPoiDataListeners = processPoiDataListeners;

}

@Override

public void run() {

ReadFile readFile = new ReadFile();

readFile.setReaderListener(processPoiDataListeners);

readFile.setEncode(processPoiDataListeners.getEncode());

// readFile.addObserver();

try {

readFile.readFileByLine(filePath, start, end + 1);

} catch (Exception e) {

e.printStackTrace();

}

}

}

具体业务逻辑监听

/**

* Created with Okey

* User: Okey

* Date: 13-3-14

* Time: 下午3:19

* NIO逐行读数据回调方法

*/

public abstract class ReaderFileListener {

// 一次读取行数,默认为500

private int readColNum = 500;

private String encode;

private List<String> list = new ArrayList<String>();

/**

* 设置一次读取行数

* @param readColNum

*/

protected void setReadColNum(int readColNum) {

this.readColNum = readColNum;

}

public String getEncode() {

return encode;

}

public void setEncode(String encode) {

this.encode = encode;

}

/**

* 每读取到一行数据,添加到缓存中

* @param lineStr 读取到的数据

* @param lineNum 行号

* @param over 是否读取完成

* @throws Exception

*/

public void outLine(String lineStr, long lineNum, boolean over) throws Exception {

if(null != lineStr)

list.add(lineStr);

if (!over && (lineNum % readColNum == 0)) {

output(list);

list.clear();

} else if (over) {

output(list);

list.clear();

}

}

/**

* 批量输出

*

* @param stringList

* @throws Exception

*/

public abstract void output(List<String> stringList) throws Exception;

}

线程调度

import java.io.File;

import java.io.FileInputStream;

import java.io.IOException;

/**

* Created with IntelliJ IDEA.

* User: okey

* Date: 14-4-1

* Time: 下午6:03

* To change this template use File | Settings | File Templates.

*/

public class BuildData {

public static void main(String[] args) throws Exception {

File file = new File("E:\\1396341974289.csv");

FileInputStream fis = null;

try {

ReadFile readFile = new ReadFile();

fis = new FileInputStream(file);

int available = fis.available();

int maxThreadNum = 50;

// 线程粗略开始位置

int i = available / maxThreadNum;

for (int j = 0; j < maxThreadNum; j++) {

// 计算精确开始位置

long startNum = j == 0 ? 0 : readFile.getStartNum(file, i * j);

long endNum = j + 1 < maxThreadNum ? readFile.getStartNum(file, i * (j + 1)) : -2;

// 具体监听实现

ProcessDataByPostgisListeners listeners = new ProcessDataByPostgisListeners("gbk");

new ReadFileThread(listeners, startNum, endNum, file.getPath()).start();

}

} catch (IOException e) {

e.printStackTrace();

} catch (Exception e) {

e.printStackTrace();

}

}

}

现在就可以尽情的调整 maxThreadNum来享受风一般的速度吧!

以上是 java使用多线程读取超大文件 的全部内容, 来源链接: utcz.com/z/339337.html

回到顶部