Springboot配置redis发布/订阅模式

Spring Data Redis包中封装了一系列Redis的操作,其中有部分是与发布订阅模式有关的操作。最近在业务上需要使用消息队列来实现消息异步推送,因此研究了下在Springboot里如何配置Redis发布/订阅模式。

创建StringRedisPublisher

StringRedisPublisher主要实现发布消息。从类名可以看出主要发布String类型的消息。

package com.study.redis;

import org.springframework.data.redis.core.StringRedisTemplate;

public class StringRedisPublisher implements MessagePublisher {

	private StringRedisTemplate stringRedisTemplate;

	public StringRedisPublisher(StringRedisTemplate redisTemplate) {
		this.stringRedisTemplate = redisTemplate;
	}
	
	@Override
	public void publish(String topic, String message) {
		stringRedisTemplate.convertAndSend(topic, message);
	}
}

创建 MessageReceiver

MessageReceiver主要实现订阅者获取到数据后的处理逻辑。该类定义了一个receiveMessage方法,接收String类型的消息,并对其进行处理。

package com.study.redis;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MessageReceiver {
    private static final Logger logger = LoggerFactory.getLogger(MessageReceiver.class);

    public void receiveMessage(String message) {
        logger.info("received message from " + message);
    }
}
注册监听器
package com.study.redis;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

@Configuration
public class RedisConfiguration {
	
    @Bean
    public MessageReceiver messageReceiver() {
        return new MessageReceiver();
    }
    
    /**
     * 创建channel listener
     */
	@Bean
	public MessageListener messageListener(MessageReceiver messageReceiver) {
		MessageListenerAdapter messageListener = new MessageListenerAdapter(messageReceiver, "receiveMessage");
		
		return messageListener;
	}
	
    /**
     * 注册监听器至redisContainer
     */
	@Bean
	public RedisMessageListenerContainer redisContainer(RedisConnectionFactory redisConnectionFactory,
			MessageListener messageListener) {
		RedisMessageListenerContainer redisContainer = new RedisMessageListenerContainer();
		redisContainer.setConnectionFactory(redisConnectionFactory);
		
		redisContainer.addMessageListener(messageListener, new ChannelTopic("test"));
		return redisContainer;
	}
	
    /**
     * 创建redisPublisher
     */
	@Bean
	public StringRedisPublisher redisPublisher(StringRedisTemplate stringRedisTemplate) {
		return new StringRedisPublisher(stringRedisTemplate);
	}
}

完成上述配置后,即可使用redisPublisher发布消息。一有消息发布至Redis后,messageReceiver会处理对应的数据。发布消息示例代码如下。

    @Test
	void testRedisPublisher() {
		for(int i = 0; i < 1000; i++) {
			redisPublisher.publish("test", "message " + i);
		}
	}
参考资料
  1. Spring Data Redis docs.
  2. Getting Started | Messaging with Redis.

留下评论