ShardingSphere分库分表利器
1. 分库分表利器 Sharding Sphere 介绍
功能:
- 数据分片
- 分库分表
- 读写分离
- 分片策略定制
- 无中心化分布式主键
- 分布式事务
- 标准化事务接口
- XA 强一致性事务
- 柔性事务
- 数据库治理
- 分布式治理
- 弹性伸缩
- 可视化链路追踪
- 数据加密
组件:
- sharding jdbc: 应用本地数据库驱动增强版,可直接理解为数据库驱动
- sharding proxy: 类似Mycat数据库服务器代理,服务端通过逻辑数据库实现分库分表操作
- sharding sidecar: K8S生态以sidecar形式提供数据库服务器代理
2。 Sharding jdbc案例
DatabaseConfig: 数据库配置类
@Datapublic class DatabaseConfig {
/** 数据库名称 **/
private String name;
/** 数据库驱动名 **/
private String driverClassName;
/** 数据库连接地址 **/
private String url;
/** 用户名 **/
private String username;
/** 密码 **/
private String password;
}
Order: 订单
@Datapublic class Order {
/**
* 订单号
*/
private Long orderId;
/**
* 用户id
*/
private Long userId;
/**
* 订单状态
*/
private String status;
}
OrderItem: 订单详情
@Data@NoArgsConstructor
@AllArgsConstructor
public class OrderItem {
/**
* 订单详情ID
*/
private Long orderItemId;
/**
* 订单号
*/
private Long orderId;
/**
* 用户id
*/
private Long userId;
}
ShardingDataSourceProvider: 工具类
public class ShardingDataSourceProvider { /**
* 构建数据源
* @param databaseConfigs
* @param tableRuleConfigurations
* @return
* @throws SQLException
*/
public static DataSource createShardingDatasource(List<DatabaseConfig> databaseConfigs, List<TableRuleConfiguration> tableRuleConfigurations) throws SQLException {
if(CollectionUtils.isEmpty(databaseConfigs)){
return null;
}
Map<String, DataSource> dataSourceMap = new HashMap<>();
databaseConfigs.forEach(databaseConfig -> {
BasicDataSource basicDataSource = new BasicDataSource();
basicDataSource.setDriverClassName(databaseConfig.getDriverClassName());
basicDataSource.setUrl(databaseConfig.getUrl());
basicDataSource.setUsername(databaseConfig.getUsername());
basicDataSource.setPassword(databaseConfig.getPassword());
dataSourceMap.put(databaseConfig.getName(), basicDataSource);
});
// 配置分片规则
ShardingRuleConfiguration shardingRuleConfig = new ShardingRuleConfiguration();
if(CollectionUtils.isNotEmpty(tableRuleConfigurations)){
tableRuleConfigurations.forEach(tableRuleConfiguration -> shardingRuleConfig.getTableRuleConfigs().add(tableRuleConfiguration));
}
// 获取数据源对象
return ShardingDataSourceFactory.createDataSource(dataSourceMap, shardingRuleConfig, new Properties());
}
}
ShardingJdbcTest: 测试类
@Slf4jpublic class ShardingJdbcTest {
private static final String className = "com.mysql.cj.jdbc.Driver";
private static final String jdbcUrl01 = "jdbc:mysql://172.25.87.200:3306/db0?createDatabaseIfNotExist=true&serverTimezone=GMT%2B8&useSSL=false&allowPublicKeyRetrieval=true";
private static final String jdbcUrl02 = "jdbc:mysql://172.25.87.200:3306/db1?createDatabaseIfNotExist=true&serverTimezone=GMT%2B8&useSSL=false&allowPublicKeyRetrieval=true";
private static final String username = "sample";
private static final String password = "sample";
private DataSource shardingDataSource;
private Connection connection;
private PreparedStatement preparedStatement;
private ResultSet resultSet;
private Statement statement;
/**
* 数据源初始化
* @throws SQLException
*/
@Before
public void init() throws SQLException {
//-------------------------------- 数据源配置 --------------------------------------
List<DatabaseConfig> databaseConfigs = new ArrayList<>();
DatabaseConfig databaseConfig1 = new DatabaseConfig();
databaseConfig1.setName("db0");
databaseConfig1.setDriverClassName(className);
databaseConfig1.setUrl(jdbcUrl01);
databaseConfig1.setUsername(username);
databaseConfig1.setPassword(password);
databaseConfigs.add(databaseConfig1);
DatabaseConfig databaseConfig2 = new DatabaseConfig();
databaseConfig2.setName("db1");
databaseConfig2.setDriverClassName(className);
databaseConfig2.setUrl(jdbcUrl02);
databaseConfig2.setUsername(username);
databaseConfig2.setPassword(password);
databaseConfigs.add(databaseConfig2);
//--------------------------------- 准备分片规则 --------------------------------------------
List<TableRuleConfiguration> tableRuleConfigurations = new ArrayList<>(2);
// 配置 t_order 表规则: 逻辑表名称 + 区域范围(行表达式: ${begin..end}:表示范围区间)
TableRuleConfiguration orderTableRuleConfig = new TableRuleConfiguration("t_order", "db${0..1}.t_order${0..1}");
// 配置分库 + 分表策略
orderTableRuleConfig.setDatabaseShardingStrategyConfig(new InlineShardingStrategyConfiguration("user_id", "db${user_id % 2}"));
orderTableRuleConfig.setTableShardingStrategyConfig(new InlineShardingStrategyConfiguration("order_id", "t_order${order_id % 2}"));
tableRuleConfigurations.add(orderTableRuleConfig);
// 配置 t_order_item 表规则...
TableRuleConfiguration orderItemTableRuleConfig = new TableRuleConfiguration("t_order_item", "db${0..1}.t_order_item${0..1}");
orderItemTableRuleConfig.setDatabaseShardingStrategyConfig(new InlineShardingStrategyConfiguration("user_id", "db${user_id % 2}"));
orderItemTableRuleConfig.setTableShardingStrategyConfig(new InlineShardingStrategyConfiguration("order_item_id", "t_order_item${order_item_id % 2}"));
tableRuleConfigurations.add(orderItemTableRuleConfig);
shardingDataSource = ShardingDataSourceProvider.createShardingDatasource(databaseConfigs, tableRuleConfigurations);
connection = shardingDataSource.getConnection();
}
/**
* 创建数据库表
* @throws SQLException
*/
@Test
public void createDatabase() throws SQLException {
execute("CREATE TABLE IF NOT EXISTS t_order (order_id BIGINT NOT NULL AUTO_INCREMENT, user_id INT NOT NULL, status VARCHAR(50), PRIMARY KEY (order_id))");
execute("CREATE TABLE IF NOT EXISTS t_order_item (order_item_id BIGINT NOT NULL AUTO_INCREMENT, order_id BIGINT NOT NULL, user_id INT NOT NULL, PRIMARY KEY (order_item_id))");
}
/**
* 插入数据
* @throws SQLException
*/
@Test
public void insertData() throws SQLException {
for (int i = 1; i <= 10; i++) {
Integer orderId = new Random().nextInt(10000);
Integer orderItemId = new Random().nextInt(10000);
execute(String.format("INSERT INTO t_order (order_id, user_id, status) VALUES (%d, %d, "INIT")", orderId, i));
execute(String.format("INSERT INTO t_order_item (order_item_id, order_id, user_id) VALUES (%d, %d, %d)", orderItemId, orderId, i));
}
}
/**
* 数据查询
* @throws SQLException
*/
@Test
public void queryWithEqual() throws SQLException {
String sql = "SELECT i.* FROM t_order o JOIN t_order_item i ON o.order_id=i.order_id WHERE o.user_id=?";
PreparedStatement preparedStatement = connection.prepareStatement(sql);
preparedStatement.setInt(1, 1);
executeQuery(preparedStatement);
}
/**
* 数据分页
* @throws SQLException
*/
@Test
public void queryByPage() throws SQLException {
String sql = "SELECT i.* FROM t_order o JOIN t_order_item i ON o.order_id=i.order_id limit ?, ?";
PreparedStatement preparedStatement = connection.prepareStatement(sql);
preparedStatement.setInt(1, 0);
preparedStatement.setInt(2, 5);
executeQuery(preparedStatement);
}
/**
* 数据查询
*
* @throws SQLException
*/
@Test
public void queryWithIn() throws SQLException {
String sql = "SELECT i.* FROM t_order o JOIN t_order_item i ON o.order_id=i.order_id WHERE o.user_id IN (?, ?)";
PreparedStatement preparedStatement = connection.prepareStatement(sql);
preparedStatement.setInt(1, 1);
preparedStatement.setInt(2, 2);
executeQuery(preparedStatement);
}
/**
* 删除表
*
* @throws SQLException
*/
@Test
public void dropTable() throws SQLException {
execute("DROP TABLE t_order_item");
execute("DROP TABLE t_order");
}
/**
* 执行sql
* @param sql
* @throws SQLException
*/
private void execute(final String sql) throws SQLException {
statement = connection.createStatement();
statement.execute(sql);
}
/**
* 执行sql
* @param preparedStatement
* @throws SQLException
*/
private void executeQuery(final PreparedStatement preparedStatement) throws SQLException {
resultSet = preparedStatement.executeQuery();
while (resultSet.next()) {
OrderItem orderItem = new OrderItem(resultSet.getLong(1), resultSet.getLong(2), resultSet.getLong(3));
log.info("t_order_item --> {}", orderItem.toString());
}
}
@After
public void close() throws SQLException {
if(null != resultSet){
resultSet.close();
}
if(null != preparedStatement){
preparedStatement.close();
}
if(null != statement){
statement.close();
}
if(null != connection){
connection.close();
}
}
}
3. sharding proxy
定位:实现数据库二进制通讯协议,作为应用与数据库服务的翻译层,相当于创建新的数据库负责处理真正的分区分表业务(逻辑),类似产品:阿里Mycat
3.1 sharding proxy 安装
- 下载sharding proxy 下载
- 修改配置:conf/server.yaml(通用配置) + config-xxx 表示参考案例(例如:分片、影子数据库、主从读写分离等)
- 启动:bin/start.sh
Sharding Proxy 代理数据库访问:
- 将mysql驱动拷贝到 ext/lib/
- 启动访问:mysql -u root -h 127.0.0.1 -P 3307 (连接sharding proxy 代理的逻辑数据库:SchemaName)
3.2 sharding proxy 配置
conf/server.yaml
authentication: users:
sample:
password: sample
authorizedSchemas: sharding_db # 定义用户sample授权使用的schema
props:
max.connections.size.per.query: 1
acceptor.size: 16
executor.size: 16
proxy.frontend.flush.threshold: 128
# LOCAL: Proxy will run with LOCAL transaction. 本地事务
# XA: Proxy will run with XA transaction. XA分布式事务:基于Atomikos实现
# BASE: Proxy will run with B.A.S.E transaction. 柔性事务:seata 实现
proxy.transaction.type: LOCAL
proxy.opentracing.enabled: false
proxy.hint.enabled: false
query.with.cipher.column: true
sql.show: true
allow.range.query.with.inline.sharding: false
config-sharding.yaml
# sharding proxy 代理 逻辑数据库schemaName: sharding_db
dataSources:
db0:
url: jdbc:mysql://localhost:3306/db0?createDatabaseIfNotExist=true&serverTimezone=GMT%2B8&useSSL=false&allowPublicKeyRetrieval=true
username: sample
password: sample
connectionTimeoutMilliseconds: 30000
idleTimeoutMilliseconds: 60000
maxLifetimeMilliseconds: 1800000
maxPoolSize: 50
db1:
url: jdbc:mysql://localhost:3306/db1?createDatabaseIfNotExist=true&serverTimezone=GMT%2B8&useSSL=false&allowPublicKeyRetrieval=true
username: sample
password: sample
connectionTimeoutMilliseconds: 30000
idleTimeoutMilliseconds: 60000
maxLifetimeMilliseconds: 1800000
maxPoolSize: 50
shardingRule: # 分片规则
tables: # 作用表范围
t_order: # 作用表
actualDataNodes: db${0..1}.t_order${0..1}
databaseStrategy: # 分库规则
inline:
shardingColumn: user_id
algorithmExpression: db${user_id % 2}
tableStrategy: # 分表规则
inline:
shardingColumn: order_id
algorithmExpression: t_order${order_id % 2}
bindingTables:
- t_order
效果:
sharding_db 数据库代理db0db1 里面的表t_order, 只是这里是逻辑表表示而已
以上是 ShardingSphere分库分表利器 的全部内容, 来源链接: utcz.com/z/534773.html