Java 之 JDBC线程池(源码版)

java

一、目录

二、代码

PoolConstant

package cn.kgc.kb08.jdbc.dao3.impl;

public interface PoolConstant {

String POOL_CORE_COUNT="coreCount";

String POOL_MAX_COUNT="maxCount";

String POOL_MAX_IDELE="maxIdel";

String POOL_MAX_WAIT="maxWait";

String POOL_RETRY_INTERVAL="retryInterval";

String POOL_MAX_RETRY_COUNT="maxRetryCount";

String POOL_EXIT_ON_ERR="exitOnErr";

String[] POOL={

POOL_CORE_COUNT,

POOL_MAX_COUNT,

POOL_MAX_IDELE,

POOL_MAX_WAIT,

POOL_RETRY_INTERVAL,

POOL_MAX_RETRY_COUNT,

POOL_EXIT_ON_ERR

};

String MYSQL_DRI="driver";

String MYSQL_URI="url";

String MYSQL_USER="username";

String MYSQL_PASS="password";

String[] MYSQL = {

MYSQL_DRI,

MYSQL_URI,

MYSQL_USER,

MYSQL_PASS

};

}

PoolUtil

package cn.kgc.kb08.jdbc.dao3.impl;

import cn.kgc.kb08.jdbc.dao2.Dao;

import cn.kgc.kb08.jdbc.dao2.impl.BaseDao;

import java.io.File;

import java.io.FileInputStream;

import java.io.IOException;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

public class PoolUtil {

private static Dao dao;

/**

* 解析数据源配置信息

* @param dataSource 数据源名称

* @return Map<String,String>

*/

protected static <T>Map<String,T> parse(Class<T> c,String dataSource, List<String> items){

// File config = new File("config/sys.properties");

// Properties pro = new Properties();

// try {

// pro.load(new FileInputStream(config));

// Map<String,T> map = new HashMap<>(items.size());

// for (String item : items) {

// String key = dataSource+"."+ item;

// if (!pro.containsKey(key)){

// throw new IOException("缺少配置项目"+item);

// }

// map.put(item,c.getConstructor(String.class).newInstance(pro.getProperty(key)));

// }

// } catch (Exception e) {

// e.printStackTrace();

// System.out.println("资源配置缺失,系统强制退出"+e.getMessage());

// System.exit(-1);

// }finally {

// if(null!=pro){

// pro.clear();

// pro = null;

//

// }

// }

//

// return null;

File config = new File("config/sys.properties");

Properties pro = new Properties();//Properties是一个文件

try {

pro.load(new FileInputStream(config));

//final String[] items = {"driver", "url", "username", "password"};

Map<String,T> map = new HashMap<>(items.size());

for (String item : items) {

String key = dataSource + "." + item;

if (!pro.containsKey(key)) {

throw new IOException("缺少配置项:" + item);//不包含,就是缺项了

}

map.put(item,c.getConstructor(String.class).newInstance(pro.getProperty(key)));

}

return map;

} catch (Exception e) {

System.err.println(dataSource+"数据源配置信息异常,系统强制退出:" + e.getMessage());

System.exit(-1);

} finally {

if (null != pro) {

pro.clear();

pro = null;

}

}

return null;

}

protected static void close(AutoCloseable...acs){

for (AutoCloseable ac : acs) {

if (null != ac) {

try {

ac.close();

} catch (Exception e) {

e.printStackTrace();

}

}

}}

}

***重点类**** ConPool

package cn.kgc.kb08.jdbc.dao3.impl;

import cn.kgc.kb08.jdbc.dao3.SelRtn;

import cn.kgc.kb08.jdbc.dao3.Dao;

import cn.kgc.kb08.jdbc.dao3.Pool;

import java.lang.reflect.Method;

import java.sql.*;

import java.util.*;

import java.util.concurrent.*;

import java.util.concurrent.locks.Condition;

import java.util.concurrent.locks.Lock;

import java.util.concurrent.locks.ReentrantLock;

/**

* 弹性连接池:生产和管理对象的

*/

public final class ConPool implements Pool {

// pool.maxIdel=30是什么 => 请看文档,官方会写

// pool.retryInterval=50

// pool.maxRetryCount=8

/**

* 池中连接

*/

class PoolCon {

boolean free = true;

boolean core;

Connection con;

long idleBegin;

public PoolCon(boolean core, Connection con) {

this.core = core;

this.con = con;

restIdle();

}

public void restIdle() {

if (!core) {

this.idleBegin = System.currentTimeMillis();

}

}

}

private ConcurrentMap<Integer, PoolCon> pool;

private Map<String, Integer> cnfPool;

private Map<String, String> cnfCon;

/**

* 执行定期清理线程池

* 检查核心连接对象的有效性,无效则创建新核心连接对象覆盖

* 检查临时连接对象是否超时,超时则关闭并移除

*/

private ScheduledExecutorService schedule;

private ExecutorService service;

private Lock lock;

private Condition cond;

private boolean clearing;

public ConPool() {

initCnf();

initPool();

startClear();

}

// 塞进pool和mysql的配置:比如Map中driver:xxx的键值对

private void initCnf() {

cnfPool = PoolUtil.parse(Integer.class, "pool",

Arrays.asList(PoolConstant.POOL));

cnfCon = PoolUtil.parse(String.class, "mysql01",

Arrays.asList(PoolConstant.MYSQL));

}

// 初始化连接池

private void initPool() {

final int MAX_COUNT = cnfPool.get(PoolConstant.POOL_MAX_COUNT);

service = Executors.newFixedThreadPool(MAX_COUNT * 2);

schedule = Executors.newSingleThreadScheduledExecutor();

lock = new ReentrantLock(true);

cond = lock.newCondition();

//分段锁的集合

pool = new ConcurrentHashMap<>(MAX_COUNT);

// 池中连接

PoolCon pc;

final int CORE_COUNT = cnfPool.get(PoolConstant.POOL_CORE_COUNT);

for (Integer i = 0, j = 1; i <= CORE_COUNT; i++) {

pc = makePoolCon(true);

if (null != pc) {

// 给核心连接一个编号

pool.put(j++, pc);

}

}

if (pool.size() == 0) {

System.err.println("连接池初始化失败,系统强制退出");

System.exit(-1);

}

// 如果配置让你失败便退出,且核心池数量小于一半

if (cnfPool.get(PoolConstant.POOL_EXIT_ON_ERR) == 1

&& pool.size() <= CORE_COUNT / 2) {

System.err.println("连接池初始化过半异常,系统强制退出");

System.exit(-1);

}

}

/**

* 创建一个池中的连接对象

*

* @param core 池对象类型,true:核心对象,false:临时对象

* @return

*/

private PoolCon makePoolCon(boolean core) {

PoolCon pc = null;

// 最大重试次数,创建n次,创建出一个连接对象

for (int i = 0; i <= cnfPool.get(PoolConstant.POOL_MAX_RETRY_COUNT); i++) {

try {

Connection con = DriverManager.getConnection(

cnfCon.get(PoolConstant.MYSQL_URI),

cnfCon.get(PoolConstant.MYSQL_USER),

cnfCon.get(PoolConstant.MYSQL_PASS)

);

pc = new PoolCon(core, con);

} catch (SQLException e) {

try {

// 创建失败就休息片刻再创建(重试)

TimeUnit.SECONDS.sleep(cnfPool.get(PoolConstant.POOL_RETRY_INTERVAL));

continue;

} catch (InterruptedException e1) {

e1.printStackTrace();

System.out.println("cuocuocuo");

}

e.printStackTrace();

}

}

return pc;

}

/**

* 验证核心连接对象是否有效

*

* @param pc

* @return

*/

private boolean isPCValid(PoolCon pc) {

try {

pc.con.createStatement().executeQuery("select 1");

return true;

} catch (SQLException e) {

return false;

}

}

/**

* 验证临时连接对象是否过期

*

* @param pc 池连接对象

* @return true:过期,false:没过期

*/

private boolean isExpired(PoolCon pc) {

return TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - pc.idleBegin) >=

cnfPool.get(PoolConstant.POOL_MAX_IDELE);

}

/**

* 验证用户是否超出配置最大时限

* @param waitBegin 计算参考起点时间

* @return

*/

private boolean isWaitExpired(long waitBegin){

return TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()-waitBegin)>=

cnfPool.get(PoolConstant.POOL_MAX_WAIT);

}

/**

* 开启定期清理任务

* maxIdle,最长闲置时间

*/

private void startClear() {

int delay = cnfPool.get(PoolConstant.POOL_MAX_IDELE);

schedule.scheduleWithFixedDelay(new Runnable() {

@Override

public void run() {

lock.lock();

clearing = true;

for (Integer key : pool.keySet()) {

PoolCon pc = pool.get(key);

if (!pc.free) {

continue;

}

if (pc.core) {

if (!isPCValid(pc)) {

pool.put(key, makePoolCon(true));

}

} else {

if (isExpired(pc) || !isPCValid(pc)) {

pool.remove(key);

}

}

}

clearing = false;

cond.signalAll();

lock.unlock();

}

}, delay, delay, TimeUnit.SECONDS);

}

/**

* 连接池销毁

*/

@Override

public void destory() {

while (pool.size() > 0) {

for (Integer key : pool.keySet()) {

PoolCon pc = pool.get(key);

if (pc.free) {

pc.free = false;

PoolUtil.close(pc.con);

pool.remove(key);

}

}

try {

TimeUnit.MILLISECONDS.sleep(cnfPool.get(PoolConstant.POOL_RETRY_INTERVAL));

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

/**

*

* @return

*/

private PoolCon fetch() {

long waitBegin = System.currentTimeMillis();

for (Integer i = 0; i <= cnfPool.get(PoolConstant.POOL_MAX_RETRY_COUNT); i++) {

try {

lock.lock();

if (clearing) {

cond.await();

}

for (Integer key : pool.keySet()) {

PoolCon pc = pool.get(key);

if (pc.free && isPCValid(pc)) {

pc.free = false;

return pc;

}

}

if(isWaitExpired(waitBegin)){

return null;

}

TimeUnit.MILLISECONDS.sleep(cnfPool.get(PoolConstant.POOL_RETRY_INTERVAL));

} catch (Exception e) {

e.printStackTrace();

} finally {

lock.unlock();

}

}

if(pool.size()< cnfPool.get(PoolConstant.POOL_MAX_COUNT)){

PoolCon pc = makePoolCon(false);

if (null != pc){

pc.free = false;

pool.put(pool.size()+1,pc);

return pc;

}

}

return null;

}

private void giveback(PoolCon pc){

if(null==pc){

return;

}

if(!pc.core){

pc.restIdle();

}

pc.free = true;

}

@Override

public Dao newDao(){

return new Dao() {

private PreparedStatement getPst(Connection con, final String SQL, Object... params) throws SQLException {

PreparedStatement pst = con.prepareStatement(SQL);

if (null != params && params.length > 0) {

for (int i = 0; i < params.length; i++) {

pst.setObject(i + 1, params[i]);

}

}

return pst;

}

private int update(PreparedStatement pst) throws SQLException {

return pst.executeUpdate();

}

private ResultSet query(PreparedStatement pst) throws SQLException {

return pst.executeQuery();

}

private Map<String, Method> parseMethod(Class c) {

Map<String, Method> mapMethod = new HashMap<>();

final String PREFIX = "set";

for (Method method : c.getDeclaredMethods()) {

String name = method.getName();

if (!name.startsWith(PREFIX)) {

continue;

}

name = name.substring(3);

name = name.substring(0, 1).toLowerCase() + name.substring(1);

mapMethod.put(name, method);

}

return mapMethod;

}

private String[] parseStruct(ResultSetMetaData md) throws SQLException {

String[] names = new String[md.getColumnCount()];

for (int i = 0; i < names.length; i++) {

names[i] = md.getColumnLabel(i + 1);

}

return names;

}

@Override

public int exeUpd(final String SQL, final Object... params) {

try {

return service.submit(new Callable<Integer>() {

@Override

public Integer call() throws Exception {

int rst = 0;

PoolCon pc = null;

// Connection con = null;

PreparedStatement pst = null;

try {

pc = fetch();

if (null != pc) {

pst = getPst(pc.con, SQL, params);

rst = update(pst);

}

} catch (SQLException e) {

rst = -1;

} finally {

PoolUtil.close(pst);

giveback(pc);

}

return rst;

}

}).get();

} catch (Exception e) {

return -1;

}

}

@Override

public <T> SelRtn exeSingle(final Class<T> c, final String SQL, final Object... params) {

try {

return service.submit(new Callable<SelRtn>() {

@Override

public SelRtn call() throws Exception {

PoolCon pc = null;

PreparedStatement pst = null;

ResultSet rst = null;

try {

pc = fetch();

pst = getPst(pc.con, SQL, params);

rst = query(pst);

if (null != rst && rst.next()) {

// 调用类型(非Character基本类型包装类)c的,带有唯一字符串参数的构造方法

// c.getConstructor(String.class)//基本类型创建对象

return SelRtn.succeed(

c.getConstructor(String.class).newInstance(rst.getObject(1).toString()));

} else {

return SelRtn.succeed(null);

}

} catch (Exception e) {

e.printStackTrace();

} finally {

// close(rst, pst, con);

PoolUtil.close(rst, pst);

giveback(pc);

}

return SelRtn.fail();

}

}).get();

} catch (Exception e) {

return SelRtn.fail();

}

}

@Override

public <T> SelRtn exeQuery(final Class<T> c, final String SQL, final Object... params) {

try {

return service.submit(new Callable<SelRtn>() {

@Override

public SelRtn call() throws Exception {

PoolCon pc = null;

PreparedStatement pst = null;

ResultSet rst = null;

try {

pst = getPst(pc.con, SQL, params);

rst = query(pst);

if (null != rst && rst.next()) {

List<T> list = new ArrayList<>();

Map<String, Method> map = parseMethod(c);

String[] names = parseStruct(rst.getMetaData());

do {

T t = c.newInstance();

for (String name : names) {

map.get(name).invoke(t, rst.getObject(name));

}

list.add(t);

} while (rst.next());

return SelRtn.succeed(list);

} else {

return SelRtn.succeed(null);

}

} catch (Exception e) {

e.printStackTrace();

} finally {

PoolUtil.close(rst, pst);

giveback(pc);

}

return SelRtn.fail();

}

}).get();

} catch (Exception e) {

return SelRtn.fail();

}

}

};

}

}

Pool

public interface Pool {

void destory();

Dao newDao();

}

PoolFactory

package cn.kgc.kb08.jdbc.dao3;

import cn.kgc.kb08.jdbc.dao3.impl.ConPool;

public abstract class PoolFactory {

private static Dao dao;

private static synchronized void init(){

if(null==dao){

dao = new ConPool().newDao();

}

}

public static Dao get(){

if(null==dao){

init();

}

return dao;

}

}

SelRtn

package cn.kgc.kb08.jdbc.dao3;

/**

* 完善查询操作返回类型,对于异常的缺失

*/

public final class SelRtn {

private boolean err = false;

private Object rtn;

public static SelRtn succeed(Object rtn){

return new SelRtn(rtn);

}

public static SelRtn fail(){

return new SelRtn();

}

private SelRtn(Object rtn) {

this.rtn = rtn;

}

private SelRtn() {

this.err = true;

}

public boolean isErr(){

return this.err;

}

public <T> T getRtn(){

return (T) rtn;

}

}

以上是 Java 之 JDBC线程池(源码版) 的全部内容, 来源链接: utcz.com/z/390289.html

回到顶部