Java jdbc批量多线程读取CVS文件入库
需求是这样的:现在需要测试一个内存数据库的入库性能,要求测试每线程准备一个文件,10个线程入库总计100w记录数的单表入库性能。
知识点:jdbc + 多线程 + 批处理 + 文件读取
先来看看我的代码结构
说明:
files: 存放即将要读取的文件。
lib: 存放第三方的jar文件,例如数据库驱动包。
MemSqlTestMain: 这是工程的入口,就是主程序。
DBUtil: 这个类是数据库帮助类,主要读取数据库配置信息获取连接关闭连接等操作。
InsertUtil: 主要做的是读取数据文件生成sql并批量入库的一个类。
TableDataInfo: 主要对要插入的数据表的对象的一个类。
XMLUtil: 读取XML配置文件
config.xml: 配置要插入的表信息以及文件的路径等信息
dbconfig.properties: 主要对数据库的连接信息进行存储,包括URL,用户名密码等等。
话不多说直接上代码:
import java.util.ArrayList;
/**
* @param
* @author wu.lin
* @description 程序入口,启用线程读取文件并入库
* @create 2016年09月01日 15:12
* @throws
*/
public class MemSqlTestMain {
public static void main(String[] args) {
//通过读取配置文件读取要插入数据的表名
String tableName = XMLUtil.getTableName();
System.out.println(tableName);
//通过配置文件读取数据存放的文件的路径
ArrayList<String> fileNameList = XMLUtil.getFileNameList();
int len = fileNameList.size();
//针对每一个文件开启一个进程去执行读取并入库的操作
for (int i = 0; i < len; i++) {
String fileName = fileNameList.get(i);
System.out.println(fileName);
new Thread(new InsertUtil(fileName, tableName)).start();
}
}
}
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.sql.*;
/**
* @param
* @author wu.lin
* @description InsertUtil是一个线程类,主要读取数据文件组装Sql并执行入库操作
* @create 2016年09月01日 14:10
* @throws
*/
public class InsertUtil implements Runnable {
//文件路径
private String filePath;
//表名
private String tableName;
//.cvs文件数据以","分隔
private static String DELIMITERS = ",";
//获取数据库帮助类
DBUtil dbutil = DBUtil.getInstance();
public InsertUtil() {}
public InsertUtil(String filePath, String tableName) {
this.filePath = filePath;
this.tableName = tableName;
}
public static String getDELIMITERS() {
return DELIMITERS;
}
public static void setDELIMITERS(String delimiters) {
DELIMITERS = delimiters;
}
public String getFilePath() {
return filePath;
}
public void setFilePath(String filePath) {
this.filePath = filePath;
}
//读取文件并且批处理入库的方法
public boolean insertDB(String tablename, long rc, String filePath) {
if(filePath == null || "".equals(filePath)) {
System.out.println("文件路径为空");
return false;
}
if (rc < 1) {
rc = 100;
}
Connection conn = null;
boolean flag = false;
Statement pre = null;
String sql = "";
TableDataInfo tableInfo = new TableDataInfo();
try {
if(conn == null) {
conn = dbutil.getConnection();
}
pre = conn.createStatement();
conn.setAutoCommit(false);
int colCount = tableInfo.getTableColNums(tablename, conn);
int rowCount = 0;
File file = new File(filePath);
BufferedReader buf = null;
buf = new BufferedReader(new InputStreamReader(new FileInputStream(file)));
String line_record = buf.readLine();
long startTime = System.currentTimeMillis(); //开始计时
while (line_record != null) {
// 解析每一条记录
sql = "INSERT INTO " + tablename + " VALUES('";
String[] fields = line_record.split(DELIMITERS);
//对Insert语句的合法性进行判断
if(fields.length != colCount){
System.out.println("要插入的数据列数和表的数据列不相匹配,停止执行");
break;
}
for (int i = 0; i < fields.length; i++) {
sql += fields[i];
if (i < fields.length - 1) {
sql += "','";
}
}
sql += "');";
// 在控制台输出SQL语句
// System.out.println(sql);
//执行SQL语句
pre.addBatch(sql);
rowCount++;
line_record = buf.readLine();
if (rowCount >= rc) {
break;
}
}
pre.executeBatch();
conn.setAutoCommit(true);
pre.close();
System.out.println("共写入行数:" + rowCount);
long endTime = System.currentTimeMillis(); //停止计时
System.out.println("执行时间为:" + (endTime - startTime) + " ms");
} catch (Exception e) {
flag = false;
try {
//回滚
if(conn != null) {
conn.rollback();
}
} catch (SQLException e1) {
e1.printStackTrace();
}
e.printStackTrace();
} finally {
dbutil.close(null, pre, conn);
}
return flag;
}
public void run() {
this.insertDB(tableName, 500000, filePath);
}
}
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
/**
* @param
* @author wu.lin
* @description 数据库表实体
* @create 2016年09月01日 14:19
* @throws
*/
public class TableDataInfo {
DBUtil dbutil = DBUtil.getInstance();
/**
*
* @param m_TableName
* @param m_Connection
* @return 该表的列数
*/
public int getTableColNums(String m_TableName, Connection m_Connection) {
int colCount = 0;
try {
if (m_Connection == null) {
m_Connection = dbutil.getConnection();
}
DatabaseMetaData m_DBMetaData = m_Connection.getMetaData();
ResultSet tableRet = m_DBMetaData.getTables(null, "%", m_TableName,
new String[] { "TABLE" });
while (tableRet.next()) {
System.out.println("Table name is:"
+ tableRet.getString("TABLE_NAME"));
}
String columnName;
String columnType;
ResultSet colRet = m_DBMetaData.getColumns(null, "%", m_TableName,"%");
while (colRet.next()) {
columnName = colRet.getString("COLUMN_NAME");
columnType = colRet.getString("TYPE_NAME");
int dataSize = colRet.getInt("COLUMN_SIZE");
int digits = colRet.getInt("DECIMAL_DIGITS");
int nullable = colRet.getInt("NULLABLE");
String nullFlag;
if (nullable == 1) {
nullFlag = "Null";
} else {
nullFlag = "Not Null";
}
System.out.println(columnName + " " + columnType + "("
+ dataSize + "," + digits + ") " + nullFlag);
colCount++;
}
} catch (SQLException e) {
e.printStackTrace();
}
System.out.println("The number of column is: " + colCount);
return colCount;
}
}
接下来就剩下读取配置文件的代码了,先来看看配置文件内容(这里配置了数据库配置文件路径表名以及文件存放的相对路径):
<?xml version="1.0" encoding="utf-8" ?>
<config>
<db_file>src/dbconfig.properties</db_file>
<tableName>memtest</tableName>
<files>
<filePath>files/memtest.csv</filePath>
<filePath>files/memtest_1.csv</filePath>
<filePath>files/memtest_2.csv</filePath>
<filePath>files/memtest_3.csv</filePath>
<filePath>files/memtest_4.csv</filePath>
<filePath>files/memtest_5.csv</filePath>
<filePath>files/memtest_6.csv</filePath>
<filePath>files/memtest_7.csv</filePath>
<filePath>files/memtest_8.csv</filePath>
<filePath>files/memtest_9.csv</filePath>
<filePath>files/memtest_10.csv</filePath>
</files>
</config>
接下来是读取这个配置文件的内容,比较简单,所以只贴部分代码:
import javax.xml.parsers.*;
import org.w3c.dom.*;
import java.io.*;
import java.util.ArrayList;
/**
* @param
* @author wu.lin
* @description 读取配置信息
* @create 2016年09月01日 15:45
* @throws
*/
public class XMLUtil {
//该方法用于从XML配置文件中提取要插入的表名称,并返回该表名称
public static String getTableName() {
return getXmlProperties("tableName");
}
public static String getDatabaseUrl() {
return getXmlProperties("dataBaseUrl");
}
public static String getDbFilePath() {
return getXmlProperties("db_file");
}
private static String getXmlProperties(String proName) {
try {
Document doc = getDoc();
//获取包含品牌名称的文本节点
NodeList nl = doc.getElementsByTagName(proName);
Node classNode=nl.item(0).getFirstChild();
String tableName=classNode.getNodeValue().trim();
return tableName;
} catch(Exception e)
{
e.printStackTrace();
return null;
}
}
private static Document getDoc() throws Exception {
//创建文档对象
DocumentBuilderFactory dFactory = DocumentBuilderFactory.newInstance();
DocumentBuilder builder = dFactory.newDocumentBuilder();
Document doc;
doc = builder.parse(new File("src/config.xml"));
return doc;
}
}
数据库配置信息文档:
db.used=mysql
# driver class
oracle.jdbc.driver_class=oracle.jdbc.driver.OracleDriver
# URL
oracle.jdbc.url=jdbc:oracle:thin:@localhost:1521:ORCL
# username
oracle.jdbc.username=scott
# pwd
oracle.jdbc.pwd=tiger
#mysql connect config
mysql.jdbc.driver_class=com.mysql.jdbc.Driver
mysql.jdbc.url=jdbc:mysql://localhost:3306/mysqldb
mysql.jdbc.username=root
mysql.jdbc.pwd=
最后是数据库帮助类,比较常见:
import java.io.FileInputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;
/**
* @param
* @author wu.lin
* @description 数据库帮助类
* @create 2016年09月01日 18:56
* @throws
*/
public class DBUtil {
private static Properties env = new Properties();
private static DBUtil dbutil;
private static String dbname;
private static String driverClass_key;
private static String url_key;
private static String username_key;
private static String pwd_key;
private DBUtil(){}
// 单例模式
public static synchronized DBUtil getInstance() {
if (null == dbutil) {
dbutil = new DBUtil();
}
return dbutil;
}
/**
* 得到数据库连接
* @return
*/
public Connection getConnection() {
Connection conn = null;
try {
env.load(new FileInputStream(XMLUtil.getDbFilePath()));
dbname = env.getProperty("db.used").toLowerCase();
driverClass_key = dbname + ".jdbc.driver_class";
url_key = dbname + ".jdbc.url";
username_key = dbname + ".jdbc.username";
pwd_key = dbname + ".jdbc.pwd";
//加载连接数据库的驱动程序类文件
Class.forName(env.getProperty(driverClass_key));
conn = createConnection();
} catch (Exception e) {
e.printStackTrace();
}
return conn;
}
private Connection createConnection() throws SQLException {
Connection conn = null;
if ("oracle".equals(dbname)) {
conn = DriverManager.getConnection(env.getProperty(url_key), env.getProperty(username_key),
env.getProperty(pwd_key));
}
if ("sqlserver".equals(dbname)) {
conn = DriverManager.getConnection(env.getProperty(url_key), env.getProperty(username_key),
env.getProperty(pwd_key));
}
if ("mysql".equals(dbname)) {
// 其他数据库的连接语法
String url = env.getProperty(url_key);
String username = env.getProperty(username_key);
String pwd = env.getProperty(pwd_key);
if(username != null && !"".equals(username)) {
url += ("?user=" + username);
if(pwd != null && !"".equals(pwd)) {
url += ("&password=" + pwd);
}
}
conn = DriverManager.getConnection(url);
}
return conn;
}
//提供jdbc关闭连接的方法
public void close(ResultSet rs,Statement st,Connection conn){
try {
if(rs!=null)
rs.close();
if(st!=null)
st.close();
if(conn!=null)
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
最后的工作便是在文件目录存放相应的数据文件,然后通过配置文件配置好文件名、表名以及数据库连接的基本信息后,运行程序入口,便可以将程序跑起来啦。但是在这个过程中也遇到一些小问题,比如,我这边只有一个100w条数据的.csv格式的文件,但是要求读取十个文件,在这个时候我用到了一个小工具:
大家知道.csv格式的文件也可以用Excel软件打开,所以在这里转换一下用Excel分割器把文件分成十份,就完美的解决问题啦。
以上是 Java jdbc批量多线程读取CVS文件入库 的全部内容, 来源链接: utcz.com/z/349462.html