Java 之 JDBC线程池(源码版)
一、目录
二、代码
PoolConstant
package cn.kgc.kb08.jdbc.dao3.impl;public interface PoolConstant {
String POOL_CORE_COUNT="coreCount";
String POOL_MAX_COUNT="maxCount";
String POOL_MAX_IDELE="maxIdel";
String POOL_MAX_WAIT="maxWait";
String POOL_RETRY_INTERVAL="retryInterval";
String POOL_MAX_RETRY_COUNT="maxRetryCount";
String POOL_EXIT_ON_ERR="exitOnErr";
String[] POOL={
POOL_CORE_COUNT,
POOL_MAX_COUNT,
POOL_MAX_IDELE,
POOL_MAX_WAIT,
POOL_RETRY_INTERVAL,
POOL_MAX_RETRY_COUNT,
POOL_EXIT_ON_ERR
};
String MYSQL_DRI="driver";
String MYSQL_URI="url";
String MYSQL_USER="username";
String MYSQL_PASS="password";
String[] MYSQL = {
MYSQL_DRI,
MYSQL_URI,
MYSQL_USER,
MYSQL_PASS
};
}
PoolUtil
package cn.kgc.kb08.jdbc.dao3.impl;import cn.kgc.kb08.jdbc.dao2.Dao;
import cn.kgc.kb08.jdbc.dao2.impl.BaseDao;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
public class PoolUtil {
private static Dao dao;
/**
* 解析数据源配置信息
* @param dataSource 数据源名称
* @return Map<String,String>
*/
protected static <T>Map<String,T> parse(Class<T> c,String dataSource, List<String> items){
// File config = new File("config/sys.properties");
// Properties pro = new Properties();
// try {
// pro.load(new FileInputStream(config));
// Map<String,T> map = new HashMap<>(items.size());
// for (String item : items) {
// String key = dataSource+"."+ item;
// if (!pro.containsKey(key)){
// throw new IOException("缺少配置项目"+item);
// }
// map.put(item,c.getConstructor(String.class).newInstance(pro.getProperty(key)));
// }
// } catch (Exception e) {
// e.printStackTrace();
// System.out.println("资源配置缺失,系统强制退出"+e.getMessage());
// System.exit(-1);
// }finally {
// if(null!=pro){
// pro.clear();
// pro = null;
//
// }
// }
//
// return null;
File config = new File("config/sys.properties");
Properties pro = new Properties();//Properties是一个文件
try {
pro.load(new FileInputStream(config));
//final String[] items = {"driver", "url", "username", "password"};
Map<String,T> map = new HashMap<>(items.size());
for (String item : items) {
String key = dataSource + "." + item;
if (!pro.containsKey(key)) {
throw new IOException("缺少配置项:" + item);//不包含,就是缺项了
}
map.put(item,c.getConstructor(String.class).newInstance(pro.getProperty(key)));
}
return map;
} catch (Exception e) {
System.err.println(dataSource+"数据源配置信息异常,系统强制退出:" + e.getMessage());
System.exit(-1);
} finally {
if (null != pro) {
pro.clear();
pro = null;
}
}
return null;
}
protected static void close(AutoCloseable...acs){
for (AutoCloseable ac : acs) {
if (null != ac) {
try {
ac.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}}
}
***重点类**** ConPool
package cn.kgc.kb08.jdbc.dao3.impl;import cn.kgc.kb08.jdbc.dao3.SelRtn;
import cn.kgc.kb08.jdbc.dao3.Dao;
import cn.kgc.kb08.jdbc.dao3.Pool;
import java.lang.reflect.Method;
import java.sql.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 弹性连接池:生产和管理对象的
*/
public final class ConPool implements Pool {
// pool.maxIdel=30是什么 => 请看文档,官方会写
// pool.retryInterval=50
// pool.maxRetryCount=8
/**
* 池中连接
*/
class PoolCon {
boolean free = true;
boolean core;
Connection con;
long idleBegin;
public PoolCon(boolean core, Connection con) {
this.core = core;
this.con = con;
restIdle();
}
public void restIdle() {
if (!core) {
this.idleBegin = System.currentTimeMillis();
}
}
}
private ConcurrentMap<Integer, PoolCon> pool;
private Map<String, Integer> cnfPool;
private Map<String, String> cnfCon;
/**
* 执行定期清理线程池
* 检查核心连接对象的有效性,无效则创建新核心连接对象覆盖
* 检查临时连接对象是否超时,超时则关闭并移除
*/
private ScheduledExecutorService schedule;
private ExecutorService service;
private Lock lock;
private Condition cond;
private boolean clearing;
public ConPool() {
initCnf();
initPool();
startClear();
}
// 塞进pool和mysql的配置:比如Map中driver:xxx的键值对
private void initCnf() {
cnfPool = PoolUtil.parse(Integer.class, "pool",
Arrays.asList(PoolConstant.POOL));
cnfCon = PoolUtil.parse(String.class, "mysql01",
Arrays.asList(PoolConstant.MYSQL));
}
// 初始化连接池
private void initPool() {
final int MAX_COUNT = cnfPool.get(PoolConstant.POOL_MAX_COUNT);
service = Executors.newFixedThreadPool(MAX_COUNT * 2);
schedule = Executors.newSingleThreadScheduledExecutor();
lock = new ReentrantLock(true);
cond = lock.newCondition();
//分段锁的集合
pool = new ConcurrentHashMap<>(MAX_COUNT);
// 池中连接
PoolCon pc;
final int CORE_COUNT = cnfPool.get(PoolConstant.POOL_CORE_COUNT);
for (Integer i = 0, j = 1; i <= CORE_COUNT; i++) {
pc = makePoolCon(true);
if (null != pc) {
// 给核心连接一个编号
pool.put(j++, pc);
}
}
if (pool.size() == 0) {
System.err.println("连接池初始化失败,系统强制退出");
System.exit(-1);
}
// 如果配置让你失败便退出,且核心池数量小于一半
if (cnfPool.get(PoolConstant.POOL_EXIT_ON_ERR) == 1
&& pool.size() <= CORE_COUNT / 2) {
System.err.println("连接池初始化过半异常,系统强制退出");
System.exit(-1);
}
}
/**
* 创建一个池中的连接对象
*
* @param core 池对象类型,true:核心对象,false:临时对象
* @return
*/
private PoolCon makePoolCon(boolean core) {
PoolCon pc = null;
// 最大重试次数,创建n次,创建出一个连接对象
for (int i = 0; i <= cnfPool.get(PoolConstant.POOL_MAX_RETRY_COUNT); i++) {
try {
Connection con = DriverManager.getConnection(
cnfCon.get(PoolConstant.MYSQL_URI),
cnfCon.get(PoolConstant.MYSQL_USER),
cnfCon.get(PoolConstant.MYSQL_PASS)
);
pc = new PoolCon(core, con);
} catch (SQLException e) {
try {
// 创建失败就休息片刻再创建(重试)
TimeUnit.SECONDS.sleep(cnfPool.get(PoolConstant.POOL_RETRY_INTERVAL));
continue;
} catch (InterruptedException e1) {
e1.printStackTrace();
System.out.println("cuocuocuo");
}
e.printStackTrace();
}
}
return pc;
}
/**
* 验证核心连接对象是否有效
*
* @param pc
* @return
*/
private boolean isPCValid(PoolCon pc) {
try {
pc.con.createStatement().executeQuery("select 1");
return true;
} catch (SQLException e) {
return false;
}
}
/**
* 验证临时连接对象是否过期
*
* @param pc 池连接对象
* @return true:过期,false:没过期
*/
private boolean isExpired(PoolCon pc) {
return TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - pc.idleBegin) >=
cnfPool.get(PoolConstant.POOL_MAX_IDELE);
}
/**
* 验证用户是否超出配置最大时限
* @param waitBegin 计算参考起点时间
* @return
*/
private boolean isWaitExpired(long waitBegin){
return TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()-waitBegin)>=
cnfPool.get(PoolConstant.POOL_MAX_WAIT);
}
/**
* 开启定期清理任务
* maxIdle,最长闲置时间
*/
private void startClear() {
int delay = cnfPool.get(PoolConstant.POOL_MAX_IDELE);
schedule.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
lock.lock();
clearing = true;
for (Integer key : pool.keySet()) {
PoolCon pc = pool.get(key);
if (!pc.free) {
continue;
}
if (pc.core) {
if (!isPCValid(pc)) {
pool.put(key, makePoolCon(true));
}
} else {
if (isExpired(pc) || !isPCValid(pc)) {
pool.remove(key);
}
}
}
clearing = false;
cond.signalAll();
lock.unlock();
}
}, delay, delay, TimeUnit.SECONDS);
}
/**
* 连接池销毁
*/
@Override
public void destory() {
while (pool.size() > 0) {
for (Integer key : pool.keySet()) {
PoolCon pc = pool.get(key);
if (pc.free) {
pc.free = false;
PoolUtil.close(pc.con);
pool.remove(key);
}
}
try {
TimeUnit.MILLISECONDS.sleep(cnfPool.get(PoolConstant.POOL_RETRY_INTERVAL));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
*
* @return
*/
private PoolCon fetch() {
long waitBegin = System.currentTimeMillis();
for (Integer i = 0; i <= cnfPool.get(PoolConstant.POOL_MAX_RETRY_COUNT); i++) {
try {
lock.lock();
if (clearing) {
cond.await();
}
for (Integer key : pool.keySet()) {
PoolCon pc = pool.get(key);
if (pc.free && isPCValid(pc)) {
pc.free = false;
return pc;
}
}
if(isWaitExpired(waitBegin)){
return null;
}
TimeUnit.MILLISECONDS.sleep(cnfPool.get(PoolConstant.POOL_RETRY_INTERVAL));
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
if(pool.size()< cnfPool.get(PoolConstant.POOL_MAX_COUNT)){
PoolCon pc = makePoolCon(false);
if (null != pc){
pc.free = false;
pool.put(pool.size()+1,pc);
return pc;
}
}
return null;
}
private void giveback(PoolCon pc){
if(null==pc){
return;
}
if(!pc.core){
pc.restIdle();
}
pc.free = true;
}
@Override
public Dao newDao(){
return new Dao() {
private PreparedStatement getPst(Connection con, final String SQL, Object... params) throws SQLException {
PreparedStatement pst = con.prepareStatement(SQL);
if (null != params && params.length > 0) {
for (int i = 0; i < params.length; i++) {
pst.setObject(i + 1, params[i]);
}
}
return pst;
}
private int update(PreparedStatement pst) throws SQLException {
return pst.executeUpdate();
}
private ResultSet query(PreparedStatement pst) throws SQLException {
return pst.executeQuery();
}
private Map<String, Method> parseMethod(Class c) {
Map<String, Method> mapMethod = new HashMap<>();
final String PREFIX = "set";
for (Method method : c.getDeclaredMethods()) {
String name = method.getName();
if (!name.startsWith(PREFIX)) {
continue;
}
name = name.substring(3);
name = name.substring(0, 1).toLowerCase() + name.substring(1);
mapMethod.put(name, method);
}
return mapMethod;
}
private String[] parseStruct(ResultSetMetaData md) throws SQLException {
String[] names = new String[md.getColumnCount()];
for (int i = 0; i < names.length; i++) {
names[i] = md.getColumnLabel(i + 1);
}
return names;
}
@Override
public int exeUpd(final String SQL, final Object... params) {
try {
return service.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int rst = 0;
PoolCon pc = null;
// Connection con = null;
PreparedStatement pst = null;
try {
pc = fetch();
if (null != pc) {
pst = getPst(pc.con, SQL, params);
rst = update(pst);
}
} catch (SQLException e) {
rst = -1;
} finally {
PoolUtil.close(pst);
giveback(pc);
}
return rst;
}
}).get();
} catch (Exception e) {
return -1;
}
}
@Override
public <T> SelRtn exeSingle(final Class<T> c, final String SQL, final Object... params) {
try {
return service.submit(new Callable<SelRtn>() {
@Override
public SelRtn call() throws Exception {
PoolCon pc = null;
PreparedStatement pst = null;
ResultSet rst = null;
try {
pc = fetch();
pst = getPst(pc.con, SQL, params);
rst = query(pst);
if (null != rst && rst.next()) {
// 调用类型(非Character基本类型包装类)c的,带有唯一字符串参数的构造方法
// c.getConstructor(String.class)//基本类型创建对象
return SelRtn.succeed(
c.getConstructor(String.class).newInstance(rst.getObject(1).toString()));
} else {
return SelRtn.succeed(null);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// close(rst, pst, con);
PoolUtil.close(rst, pst);
giveback(pc);
}
return SelRtn.fail();
}
}).get();
} catch (Exception e) {
return SelRtn.fail();
}
}
@Override
public <T> SelRtn exeQuery(final Class<T> c, final String SQL, final Object... params) {
try {
return service.submit(new Callable<SelRtn>() {
@Override
public SelRtn call() throws Exception {
PoolCon pc = null;
PreparedStatement pst = null;
ResultSet rst = null;
try {
pst = getPst(pc.con, SQL, params);
rst = query(pst);
if (null != rst && rst.next()) {
List<T> list = new ArrayList<>();
Map<String, Method> map = parseMethod(c);
String[] names = parseStruct(rst.getMetaData());
do {
T t = c.newInstance();
for (String name : names) {
map.get(name).invoke(t, rst.getObject(name));
}
list.add(t);
} while (rst.next());
return SelRtn.succeed(list);
} else {
return SelRtn.succeed(null);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
PoolUtil.close(rst, pst);
giveback(pc);
}
return SelRtn.fail();
}
}).get();
} catch (Exception e) {
return SelRtn.fail();
}
}
};
}
}
Pool
public interface Pool {void destory();
Dao newDao();
}
PoolFactory
package cn.kgc.kb08.jdbc.dao3;import cn.kgc.kb08.jdbc.dao3.impl.ConPool;
public abstract class PoolFactory {
private static Dao dao;
private static synchronized void init(){
if(null==dao){
dao = new ConPool().newDao();
}
}
public static Dao get(){
if(null==dao){
init();
}
return dao;
}
}
SelRtn
package cn.kgc.kb08.jdbc.dao3;/**
* 完善查询操作返回类型,对于异常的缺失
*/
public final class SelRtn {
private boolean err = false;
private Object rtn;
public static SelRtn succeed(Object rtn){
return new SelRtn(rtn);
}
public static SelRtn fail(){
return new SelRtn();
}
private SelRtn(Object rtn) {
this.rtn = rtn;
}
private SelRtn() {
this.err = true;
}
public boolean isErr(){
return this.err;
}
public <T> T getRtn(){
return (T) rtn;
}
}
以上是 Java 之 JDBC线程池(源码版) 的全部内容, 来源链接: utcz.com/z/390289.html