0%

订阅 Redis 的 key 过期事件实现动态定时任务

一、需求

  1. 设置了生存时间的Key,在过期时能不能有所提示?
  2. 如果能对过期Key有个监听,如何对过期Key进行一个回调处理?
  3. 如何使用 Redis 来实现定时任务?

比如:

  • 处理订单过期自动取消,12306 购票系统超过30分钟没有成功支付的订单会被回收处理;
  • 购买商品15天后默认好评;
  • 外卖系统的送餐超时提醒;
  • 客服与顾客聊天,客服超过多长时间没回复,系统给客服发一个提醒消息;

这里的定时任务并不是 Crontab 这种如 0 0 23 * * ? (每日23点执行) 定死多长时间执行一次的, 而是某种特定动作触发创建的一个多长时间后执行的任务。比如有100个 用户触发了这个动作,那么就会创建100个定时任务,并且这100个任务由于触发创建的时间不同,执行的时间也很可能不在同一时间。

二、思路

在 Redis 的 2.8.0 版本之后,其推出了一个新的特性——键空间消息(Redis Keyspace Notifications),它配合 2.0.0 版本之后的 SUBSCRIBE 就能完成这个定时任务的操作了。

Redis 的键空间通知支持 订阅指定 Key 的所有事件 与 订阅指定事件 两种方式。

Keyspace notifications are implemented sending two distinct type of events for every operation affecting the Redis data space. For instance a DEL operation targeting the key named mykey in database 0 will trigger the delivering of two messages, exactly equivalent to the following two PUBLISH commands:

1
2
PUBLISH keyspace@0:mykey del
PUBLISH keyevent@0:del mykey

通过 Redis 的键空间通知(keyspace notification)可以做到:下单时将订单 id 写入 redis,设置过期时间30分钟,利用 redis 键过期回调提醒,30分钟后可以在回调函数里检查订单状态,如果未支付,则进行处理。

三、实现

1. 修改 redis.conf 开启redis key过期提醒

By default keyspace events notifications are disabled because while not very sensible the feature uses some CPU power. Notifications are enabled using the notify-keyspace-events of redis.conf or via the CONFIG SET.

由于键空间通知比较耗CPU, 所以 Redis默认是关闭键空间事件通知的, 需要手动开启 notify-keyspace-events 后才启作用。

1
2
3
4
5
6
7
8
9
10
11
K:keyspace事件,事件以__keyspace@<db>__为前缀进行发布;        
E:keyevent事件,事件以__keyevent@<db>__为前缀进行发布;        
g:一般性的,非特定类型的命令,比如del,expire,rename等;       
$:String 特定命令;        
l:List 特定命令;        
s:Set 特定命令;        
h:Hash 特定命令;        
z:Sorted 特定命令;        
x:过期事件,当某个键过期并删除时会产生该事件;        
e:驱逐事件,当某个键因maxmemore策略而被删除时,产生该事件;        
A:g$lshzxe的别名,因此”AKE”意味着所有事件。

notify-keyspace-events Ex 表示开启键过期事件提醒

2. 继承 JedisPubSub 实现一个消息监听器类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Component
public class RedisKeyExpiredListener extends JedisPubSub {

    private Logger logger = LoggerFactory.getLogger(RedisKeyExpiredListener.class);

    @Override
    public void onMessage(String channel, String message) {
    
       //message.toString()可以获取失效的key
      String expiredKey = message.toString();
      if(expiredKey.startsWith("key:prefix")){
            /**
             * TODO
             * 如果是自己想要监控的KEY, 则可以在这里处理业务
             */
        }
    }
}

由于每个key过期都会回调 onPMessage 方法, 所以不建议在 onPMessage 回调方法中直接处理业务, 这里可以通过 MQ 来做缓冲,在 onPMessage 中 把消息直接扔到 MQ 里, 然后再去监听队列消费消息处理具体的业务。

改进版如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Component
public class RedisKeyExpiredListener extends JedisPubSub {

    private Logger logger = LoggerFactory.getLogger(RedisKeyExpiredListener.class);

    @Resource
    private ICommonsMqService commonsMqService;

    @Override
    public void onMessage(String channel, String message) {

        try {
            commonsMqService.sendSingleMessageAsync("REDIS_TIMEOUT_KEY_QUEUE", message);
            logger.info("发送支付超时MQ消息成功:{}",message);
        }catch (Exception e){
            logger.error("发送支付超时MQ消息失败:{}",e.toString());
        }
    }
}

3. 订阅指定 db 的过期事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Component
@Order(value = 4)
public class SubscriberRedisKeyTimeout implements CommandLineRunner {

    private Logger logger = LoggerFactory.getLogger(SubscriberRedisKeyTimeout.class);

    @Resource
    RedisKeyExpiredListener redisKeyExpiredListener;

    @Override
    public void run(String... args) throws Exception {

        JedisPool pool = new JedisPool(new JedisPoolConfig(), "127.0.0.1", 8005);
        Jedis jedis = pool.getResource();

        /**
         * 订阅线程:接收消息
         * 由于订阅者(subscriber)在进入订阅状态后会阻塞线程,
         * 因此新起一个线程(new Thread())作为订阅线程
         */
        new Thread(new Runnable() {
            public void run() {
                try {
                    logger.info("Subscribing. This thread will be blocked.");
                    //使用subscriber订阅 db0上的key过期事件消息,这一句之后,线程进入订阅模式,阻塞。
                     jedis.subscribe(redisKeyExpiredListener, "__keyevent@0__:expired");
     
                    //当unsubscribe()方法被调用时,才执行以下代码
                    logger.info("Subscription ended.");
                } catch (Exception e) {
                    logger.error("Subscribing failed.", e);
                }
            }
        }).start();

    }
}

4. 测试

1
2
3
4
5
6
7
8
9
10
11
public class TestJedisExpipreNotice {

    public static void main(String[] args) {
        JedisPool pool = new JedisPool(new JedisPoolConfig(), "127.0.0.1", 8005);
        Jedis jedis = pool.getResource();

        jedis.setex("REDIS:EXPIPRE:NOTICE:TEST",5, "测试键过期事件回调");

    }
}

5秒后控制台打印如下:

1
2019-03-26 17:35:44.248  INFO 20464 --- [ Thread-127] c.p.c.r.b.r.RedisKeyExpiredListener  : 发送聊天会话超时MQ消息成功:REDIS:EXPIPRE:NOTICE:TEST

四、 subscribe/psubscibe 的区别

Redis 提供了 publish 和 subscribe/psubscibe 指令来实现发布/订阅模型,发布和订阅的目标称为通道(channel)。 subscribe/psubscribe 了一个或多个通道的客户端,可以收到其他客户端向这个通道publish的消息。subscribe和psubscribe的区别是,前者指定具体的通道名称,而后者可以指定一个正则表达式,匹配这个表达式的通道都被订阅。

上图展示了一个带有频道和模式的例子, 其中 tweet.shop.* 模式匹配了 tweet.shop.kindle 频道和 tweet.shop.ipad 频道, 并且有不同的客户端分别用 psubscibe 订阅它们三个:当有信息发送到 tweet.shop.kindle 频道时, 信息除了发送给 clientX 和 clientY 之外, 还会发送给订阅 tweet.shop.* 模式的 client123 和 client256。

五、参考文献

[1]Redis Pub/Sub

[2]Redis Keyspace Notifications

[3]Redis的Pub/Sub模式

[4]Redis设计与实现第一版-订阅与发布

[5]Redis实践操作之—— keyspace notification(键空间通知