Spring Data Redis包中封装了一系列Redis的操作,其中有部分是与发布订阅模式有关的操作。最近在业务上需要使用消息队列来实现消息异步推送,因此研究了下在Springboot里如何配置Redis发布/订阅模式。
创建StringRedisPublisher
StringRedisPublisher主要实现发布消息。从类名可以看出主要发布String类型的消息。
package com.study.redis;
import org.springframework.data.redis.core.StringRedisTemplate;
public class StringRedisPublisher implements MessagePublisher {
private StringRedisTemplate stringRedisTemplate;
public StringRedisPublisher(StringRedisTemplate redisTemplate) {
this.stringRedisTemplate = redisTemplate;
}
@Override
public void publish(String topic, String message) {
stringRedisTemplate.convertAndSend(topic, message);
}
}
创建 MessageReceiver
MessageReceiver主要实现订阅者获取到数据后的处理逻辑。该类定义了一个receiveMessage方法,接收String类型的消息,并对其进行处理。
package com.study.redis;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MessageReceiver {
private static final Logger logger = LoggerFactory.getLogger(MessageReceiver.class);
public void receiveMessage(String message) {
logger.info("received message from " + message);
}
}
注册监听器
package com.study.redis;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
@Configuration
public class RedisConfiguration {
@Bean
public MessageReceiver messageReceiver() {
return new MessageReceiver();
}
/**
* 创建channel listener
*/
@Bean
public MessageListener messageListener(MessageReceiver messageReceiver) {
MessageListenerAdapter messageListener = new MessageListenerAdapter(messageReceiver, "receiveMessage");
return messageListener;
}
/**
* 注册监听器至redisContainer
*/
@Bean
public RedisMessageListenerContainer redisContainer(RedisConnectionFactory redisConnectionFactory,
MessageListener messageListener) {
RedisMessageListenerContainer redisContainer = new RedisMessageListenerContainer();
redisContainer.setConnectionFactory(redisConnectionFactory);
redisContainer.addMessageListener(messageListener, new ChannelTopic("test"));
return redisContainer;
}
/**
* 创建redisPublisher
*/
@Bean
public StringRedisPublisher redisPublisher(StringRedisTemplate stringRedisTemplate) {
return new StringRedisPublisher(stringRedisTemplate);
}
}
完成上述配置后,即可使用redisPublisher发布消息。一有消息发布至Redis后,messageReceiver会处理对应的数据。发布消息示例代码如下。
@Test
void testRedisPublisher() {
for(int i = 0; i < 1000; i++) {
redisPublisher.publish("test", "message " + i);
}
}