redispub/sub使用redis完成发布订阅

编程

一、需要3.0以上版本的redis

二、redis-cli实现发布订阅

  1. 先开启一个redis-cli(S1),并监听着china这个channel

subcribe china

此时A1处于监听状态

  1. 然后再开启一个redis-cli(P1),并向china这个channel发布helloWorld消息

publish china helloWorld

这是我们会发现A1监听到了这个helloWorld消息

同样,我们开启多个监听窗口,这时我们会发现,他们都可以收到这个helloWorld消息,这和MQ中间件中的发布订阅相同,只有在发布的时刻监听的监听者可以消费到这条消息。

三、Jedis实现发布订阅

<dependency>

<groupId>redis.clients</groupId>

<artifactId>jedis</artifactId>

<version>2.8.0</version>

</dependency>

<dependency>

<groupId>log4j</groupId>

<artifactId>log4j</artifactId>

<version>1.2.17</version>

</dependency>

Jedis中的JedisPubSub抽象类提供了订阅和取消的功能。想处理订阅和取消订阅某些channel的相关事件,我们得扩展JedisPubSub类并实现相关的方法:

import org.apache.log4j.Logger;

import redis.clients.jedis.JedisPubSub;

public class Subscriber extends JedisPubSub {//注意这里继承了抽象类JedisPubSub

private static final Logger LOGGER = Logger.getLogger(Subscriber.class);

@Override

public void onMessage(String channel, String message) {

LOGGER.info(String.format("Message. Channel: %s, Msg: %s", channel, message));

}

@Override

public void onPMessage(String pattern, String channel, String message) {

LOGGER.info(String.format("PMessage. Pattern: %s, Channel: %s, Msg: %s",

pattern, channel, message));

}

@Override

public void onSubscribe(String channel, int subscribedChannels) {

LOGGER.info("onSubscribe");

}

@Override

public void onUnsubscribe(String channel, int subscribedChannels) {

LOGGER.info("onUnsubscribe");

}

@Override

public void onPUnsubscribe(String pattern, int subscribedChannels) {

LOGGER.info("onPUnsubscribe");

}

@Override

public void onPSubscribe(String pattern, int subscribedChannels) {

LOGGER.info("onPSubscribe");

}

}

有了订阅者,我们还需要一个发布者:

import java.io.BufferedReader;

import java.io.IOException;

import java.io.InputStreamReader;

import org.apache.log4j.Logger;

import redis.clients.jedis.Jedis;

public class Publisher {

private static final Logger LOGGER = Logger.getLogger(Publisher.class);

private final Jedis publisherJedis;

private final String channel;

public Publisher(Jedis publisherJedis, String channel) {

this.publisherJedis = publisherJedis;

this.channel = channel;

}

/**

* 不停的读取输入,然后发布到channel上面,遇到quit则停止发布。

*/

public void startPublish() {

LOGGER.info("Type your message (quit for terminate)");

try {

BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));

while (true) {

String line = reader.readLine();

if (!"quit".equals(line)) {

publisherJedis.publish(channel, line);

} else {

break;

}

}

} catch (IOException e) {

LOGGER.error("IO failure while reading input", e);

}

}

}

为简单起见,这个发布者接收控制台的输入,然后将输入的消息发布到指定的channel上面,如果输入quit,则停止发布消息。

接下来是主函数:

import org.apache.log4j.Logger;

import redis.clients.jedis.Jedis;

import redis.clients.jedis.JedisPool;

import redis.clients.jedis.JedisPoolConfig;

public class Program {

public static final String CHANNEL_NAME = "MyChannel";

//我这里的Redis是一个集群,192.168.56.101和192.168.56.102都可以使用

public static final String REDIS_HOST = "192.168.56.101";

public static final int REDIS_PORT = 7000;

private final static Logger LOGGER = Logger.getLogger(Program.class);

private final static JedisPoolConfig POOL_CONFIG = new JedisPoolConfig();

private final static JedisPool JEDIS_POOL =

new JedisPool(POOL_CONFIG, REDIS_HOST, REDIS_PORT, 0);

public static void main(String[] args) throws Exception {

final Jedis subscriberJedis = JEDIS_POOL.getResource();

final Jedis publisherJedis = JEDIS_POOL.getResource();

final Subscriber subscriber = new Subscriber();

//订阅线程:接收消息

new Thread(new Runnable() {

public void run() {

try {

LOGGER.info("Subscribing to "MyChannel". This thread will be blocked.");

//使用subscriber订阅CHANNEL_NAME上的消息,这一句之后,线程进入订阅模式,阻塞。

subscriberJedis.subscribe(subscriber, CHANNEL_NAME);

//当unsubscribe()方法被调用时,才执行以下代码

LOGGER.info("Subscription ended.");

} catch (Exception e) {

LOGGER.error("Subscribing failed.", e);

}

}

}).start();

//主线程:发布消息到CHANNEL_NAME频道上

new Publisher(publisherJedis, CHANNEL_NAME).startPublish();

publisherJedis.close();

//Unsubscribe

subscriber.unsubscribe();

subscriberJedis.close();

}

}

打印出来的结果:

以上是 redispub/sub使用redis完成发布订阅 的全部内容, 来源链接: utcz.com/z/511095.html

回到顶部