spring-data-redis 动态切换数据源的方法

最近遇到了一个麻烦的需求,我们需要一个微服务应用同时访问两个不同的 Redis 集群。一般我们不会这么使用 Redis,但是这两个 Redis 本来是不同业务集群,现在需要一个微服务同时访问。

其实我们在实际业务开发的时候,可能还会遇到类似的场景。例如 Redis 读写分离,这个也是 spring-data-redis 没有提供的功能,底层连接池例如 Lettuce 或者 Jedis 都提供了获取只读连接的 API,但是缺陷有两个:

  • 上层 spring-data-redis 并没有封装这种接口
  • 基于 redis 的架构实现的,哨兵模式需要配置 sentinel 的地址,集群模式需要感知集群拓扑,在云原生环境中,这些都默认被云提供商隐藏了,暴露到外面的只有一个个动态 VIP 域名。

因此,我们需要在 spring-data-redis 的基础上实现一个动态切换 Redis 连接的机制。

spring-data-redis 的配置类为:org.springframework.boot.autoconfigure.data.redis.RedisProperties,可以配置单个 Redis 实例或者 Redis 集群的连接配置。根据这些配置,会生成统一的 Redis 连接工厂 RedisConnectionFactory

spring-data-redis 核心接口与背后的连接相关抽象关系为:

通过这个图,我们可以知道,我们实现一个可以动态返回不同 Redis 连接的 RedisConnectionFactory 即可,并且根据 spring-data-redis 的自动装载源码可以知道,框架内的所有 RedisConnectionFactory@ConditionalOnMissingBean 的,即我们可以使用我们自己实现的 RedisConnectionFactory 进行替换。

项目地址:https://github.com/JoJoTec/spring-boot-starter-redis-related

我们可以给 RedisProperties 配置外层封装一个多 Redis 连接的配置,即MultiRedisProperties

@Data

@NoArgsConstructor

@ConfigurationProperties(prefix = "spring.redis")

public class MultiRedisProperties {

/**

* 默认连接必须配置,配置 key 为 default

*/

public static final String DEFAULT = "default";

private boolean enableMulti = false;

private Map<String, RedisProperties> multi;

}

这个配置是在原有配置基础上的,也就是用户可以使用原有配置,也可以使用这种多 Redis 配置,就是需要配置 spring.redis.enable-multi=true。multi 这个 Map 中放入的 key 是数据源名称,用户可以在使用 RedisTemplate 或者 ReactiveRedisTemplate 之前,通过这个数据源名称指定用哪个 Redis。

接下来我们来实现 MultiRedisLettuceConnectionFactory,即可以动态切换 Redis 连接的 RedisConnectionFactory,我们的项目采用的 Redis 客户端是 Lettuce:

public class MultiRedisLettuceConnectionFactory

implements InitializingBean, DisposableBean, RedisConnectionFactory, ReactiveRedisConnectionFactory {

private final Map<String, LettuceConnectionFactory> connectionFactoryMap;

private static final ThreadLocal<String> currentRedis = new ThreadLocal<>();

public MultiRedisLettuceConnectionFactory(Map<String, LettuceConnectionFactory> connectionFactoryMap) {

this.connectionFactoryMap = connectionFactoryMap;

}

public void setCurrentRedis(String currentRedis) {

if (!connectionFactoryMap.containsKey(currentRedis)) {

throw new RedisRelatedException("invalid currentRedis: " + currentRedis + ", it does not exists in configuration");

}

MultiRedisLettuceConnectionFactory.currentRedis.set(currentRedis);

}

@Override

public void destroy() throws Exception {

connectionFactoryMap.values().forEach(LettuceConnectionFactory::destroy);

}

@Override

public void afterPropertiesSet() throws Exception {

connectionFactoryMap.values().forEach(LettuceConnectionFactory::afterPropertiesSet);

}

private LettuceConnectionFactory currentLettuceConnectionFactory() {

String currentRedis = MultiRedisLettuceConnectionFactory.currentRedis.get();

if (StringUtils.isNotBlank(currentRedis)) {

MultiRedisLettuceConnectionFactory.currentRedis.remove();

return connectionFactoryMap.get(currentRedis);

}

return connectionFactoryMap.get(MultiRedisProperties.DEFAULT);

}

@Override

public ReactiveRedisConnection getReactiveConnection() {

return currentLettuceConnectionFactory().getReactiveConnection();

}

@Override

public ReactiveRedisClusterConnection getReactiveClusterConnection() {

return currentLettuceConnectionFactory().getReactiveClusterConnection();

}

@Override

public RedisConnection getConnection() {

return currentLettuceConnectionFactory().getConnection();

}

@Override

public RedisClusterConnection getClusterConnection() {

return currentLettuceConnectionFactory().getClusterConnection();

}

@Override

public boolean getConvertPipelineAndTxResults() {

return currentLettuceConnectionFactory().getConvertPipelineAndTxResults();

}

@Override

public RedisSentinelConnection getSentinelConnection() {

return currentLettuceConnectionFactory().getSentinelConnection();

}

@Override

public DataAccessException translateExceptionIfPossible(RuntimeException ex) {

return currentLettuceConnectionFactory().translateExceptionIfPossible(ex);

}

}

逻辑非常简单,就是提供了设置 Redis 数据源的接口,并且放入了 ThreadLocal 中,并且仅对当前一次有效,读取后就清空。

然后,将 MultiRedisLettuceConnectionFactory 作为 Bean 注册到我们的 ApplicationContext 中:

@ConditionalOnProperty(prefix = "spring.redis", value = "enable-multi", matchIfMissing = false)

@Configuration(proxyBeanMethods = false)

public class RedisCustomizedConfiguration {

/**

* @param builderCustomizers

* @param clientResources

* @param multiRedisProperties

* @return

* @see org.springframework.boot.autoconfigure.data.redis.LettuceConnectionConfiguration

*/

@Bean

public MultiRedisLettuceConnectionFactory multiRedisLettuceConnectionFactory(

ObjectProvider<LettuceClientConfigurationBuilderCustomizer> builderCustomizers,

ClientResources clientResources,

MultiRedisProperties multiRedisProperties,

ObjectProvider<RedisSentinelConfiguration> sentinelConfigurationProvider,

ObjectProvider<RedisClusterConfiguration> clusterConfigurationProvider

) {

//读取配置

Map<String, LettuceConnectionFactory> connectionFactoryMap = Maps.newHashMap();

Map<String, RedisProperties> multi = multiRedisProperties.getMulti();

multi.forEach((k, v) -> {

//这个其实就是框架中原有的源码使用 RedisProperties 的方式,我们其实就是在 RedisProperties 外面包装了一层而已

LettuceConnectionConfiguration lettuceConnectionConfiguration = new LettuceConnectionConfiguration(

v,

sentinelConfigurationProvider,

clusterConfigurationProvider

);

LettuceConnectionFactory lettuceConnectionFactory = lettuceConnectionConfiguration.redisConnectionFactory(builderCustomizers, clientResources);

connectionFactoryMap.put(k, lettuceConnectionFactory);

});

return new MultiRedisLettuceConnectionFactory(connectionFactoryMap);

}

}

我们来测试下,使用 embedded-redis 来启动本地 redis,从而实现单元测试。我们启动两个 Redis,在两个 Redis 中放入不同的 Key,验证是否存在,并且测试同步接口,多线程调用同步接口,和多次异步接口无等待订阅从而测试有效性。:

import com.github.jojotech.spring.boot.starter.redis.related.lettuce.MultiRedisLettuceConnectionFactory;

import org.junit.jupiter.api.AfterAll;

import org.junit.jupiter.api.Assertions;

import org.junit.jupiter.api.BeforeAll;

import org.junit.jupiter.api.Test;

import org.junit.jupiter.api.extension.ExtendWith;

import org.redisson.api.RedissonClient;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.autoconfigure.EnableAutoConfiguration;

import org.springframework.boot.test.context.SpringBootTest;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.data.redis.core.ReactiveStringRedisTemplate;

import org.springframework.data.redis.core.StringRedisTemplate;

import org.springframework.test.context.junit.jupiter.SpringExtension;

import reactor.core.publisher.Mono;

import redis.embedded.RedisServer;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.atomic.AtomicBoolean;

@ExtendWith(SpringExtension.class)

@SpringBootTest(properties = {

"spring.redis.enable-multi=true",

"spring.redis.multi.default.host=127.0.0.1",

"spring.redis.multi.default.port=6379",

"spring.redis.multi.test.host=127.0.0.1",

"spring.redis.multi.test.port=6380",

})

public class MultiRedisTest {

//启动两个 redis

private static RedisServer redisServer;

private static RedisServer redisServer2;

@BeforeAll

public static void setUp() throws Exception {

System.out.println("start redis");

redisServer = RedisServer.builder().port(6379).setting("maxheap 200m").build();

redisServer2 = RedisServer.builder().port(6380).setting("maxheap 200m").build();

redisServer.start();

redisServer2.start();

System.out.println("redis started");

}

@AfterAll

public static void tearDown() throws Exception {

System.out.println("stop redis");

redisServer.stop();

redisServer2.stop();

System.out.println("redis stopped");

}

@EnableAutoConfiguration

@Configuration

public static class App {

}

@Autowired

private StringRedisTemplate redisTemplate;

@Autowired

private ReactiveStringRedisTemplate reactiveRedisTemplate;

@Autowired

private MultiRedisLettuceConnectionFactory multiRedisLettuceConnectionFactory;

private void testMulti(String suffix) {

//使用默认连接,设置 "testDefault" + suffix, "testDefault" 键值对

redisTemplate.opsForValue().set("testDefault" + suffix, "testDefault");

//使用 test 连接,设置 "testSecond" + suffix, "testDefault" 键值对

multiRedisLettuceConnectionFactory.setCurrentRedis("test");

redisTemplate.opsForValue().set("testSecond" + suffix, "testSecond");

//使用默认连接,验证 "testDefault" + suffix 存在,"testSecond" + suffix 不存在

Assertions.assertTrue(redisTemplate.hasKey("testDefault" + suffix));

Assertions.assertFalse(redisTemplate.hasKey("testSecond" + suffix));

//使用 test 连接,验证 "testDefault" + suffix 不存在,"testSecond" + suffix 存在

multiRedisLettuceConnectionFactory.setCurrentRedis("test");

Assertions.assertFalse(redisTemplate.hasKey("testDefault" + suffix));

multiRedisLettuceConnectionFactory.setCurrentRedis("test");

Assertions.assertTrue(redisTemplate.hasKey("testSecond" + suffix));

}

//单次验证

@Test

public void testMultiBlock() {

testMulti("");

}

//多线程验证

@Test

public void testMultiBlockMultiThread() throws InterruptedException {

Thread thread[] = new Thread[50];

AtomicBoolean result = new AtomicBoolean(true);

for (int i = 0; i < thread.length; i++) {

int finalI = i;

thread[i] = new Thread(() -> {

try {

testMulti("" + finalI);

} catch (Exception e) {

e.printStackTrace();

result.set(false);

}

});

}

for (int i = 0; i < thread.length; i++) {

thread[i].start();

}

for (int i = 0; i < thread.length; i++) {

thread[i].join();

}

Assertions.assertTrue(result.get());

}

//reactive 接口验证

private Mono<Boolean> reactiveMulti(String suffix) {

return reactiveRedisTemplate.opsForValue().set("testReactiveDefault" + suffix, "testReactiveDefault")

.flatMap(b -> {

multiRedisLettuceConnectionFactory.setCurrentRedis("test");

return reactiveRedisTemplate.opsForValue().set("testReactiveSecond" + suffix, "testReactiveSecond");

}).flatMap(b -> {

return reactiveRedisTemplate.hasKey("testReactiveDefault" + suffix);

}).map(b -> {

Assertions.assertTrue(b);

System.out.println(Thread.currentThread().getName());

return b;

}).flatMap(b -> {

return reactiveRedisTemplate.hasKey("testReactiveSecond" + suffix);

}).map(b -> {

Assertions.assertFalse(b);

System.out.println(Thread.currentThread().getName());

return b;

}).flatMap(b -> {

multiRedisLettuceConnectionFactory.setCurrentRedis("test");

return reactiveRedisTemplate.hasKey("testReactiveDefault" + suffix);

}).map(b -> {

Assertions.assertFalse(b);

System.out.println(Thread.currentThread().getName());

return b;

}).flatMap(b -> {

multiRedisLettuceConnectionFactory.setCurrentRedis("test");

return reactiveRedisTemplate.hasKey("testReactiveSecond" + suffix);

}).map(b -> {

Assertions.assertTrue(b);

return b;

});

}

//多次调用 reactive 验证,并且 subscribe,这本身就是多线程的

@Test

public void testMultiReactive() throws InterruptedException {

for (int i = 0; i < 10000; i++) {

reactiveMulti("" + i).subscribe(System.out::println);

}

TimeUnit.SECONDS.sleep(10);

}

}

运行测试,通过。

到此这篇关于spring-data-redis 动态切换数据源的文章就介绍到这了,更多相关spring-data-redis 动态切换数据源内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!

以上是 spring-data-redis 动态切换数据源的方法 的全部内容, 来源链接: utcz.com/p/248361.html

回到顶部