亲宝软件园·资讯

展开

借助Redis完成延时任务

Catcher8 人气:0
## 背景 相信我们或多或少的会遇到类似下面这样的需求: 第三方给了一批数据给我们处理,我们处理好之后就通知他们处理结果。 大概就是下面这个图说的。 ![](https://mmbiz.qpic.cn/mmbiz_png/yrY77YblI8LcWL5ECHd9nmh78icUYTRY9nm5nrzDiaYCZwq2aQSHM2pOcnkbsEabkBibwFGg28T0TGt1mIq2j84vA/0?wx_fmt=png) 本来在处理完数据之后,我们就会马上把处理结果返回给对方,但是对方要求我们处理速度不能过快,要有一种人为处理的效果。 换句话就是说,就算是处理好了,也要晚一点再执行通知操作。 这就是一个典型的延时任务。 延时,那还不简单,执行完之后,让它`Sleep`一下就好了,这样就达到目标了。 `Sleep`一下确定是最容易实现的一种方案,但是试想一下,数据的数量不断的增加,这样`Sleep`真的好吗?答案是否定的。 延时队列,是处理这个场景最为妥当的方案。 RabbitMQ,RocketMQ,Cmq等都可以直接或间接的达到相应的效果。 如果不具备队列条件,又要怎么处理呢?还可以借助Redis来完成这项工作。 MQ不一定每个公司都会用,但Redis应该80%以上的都会用吧。 ## 处理方案 Redis这边,可用的方案有两种,下面分别来介绍一下。 ### #1 键的过期时间 在设置缓存的时候,我们比较多情况下都会设置一个缓存的过期时间,这个时间过期后,会重新去数据源拿数据回来。 可以基于这个过期时间结合Redis的**keyspace notifications**共同完成。 keyspace notifications里面包含了非常多的事件,这里只关注`EXPIRE`,这个是和过期有关的。 只要订阅了`__keyevent@0__:expired`这个主题,当有key过期的时候,就会收到对应的信息。 >注:主题@后面的0,指的是db 0. 要想使用这个特性,必不可少的一步是修改Redis默认的配置,把`notify-keyspace-events`设置成`Ex`。 ```conf ############################# Event notification ############################## # Redis can notify Pub/Sub clients about events happening in the key space. # This feature is documented at http://redis.io/topics/notifications # # ......... # # By default all notifications are disabled because most users don't need # this feature and the feature has some overhead. Note that if you don't # specify at least one of K or E, no events will be delivered. notify-keyspace-events "Ex" ``` 其中 E 指的是键事件通知,x 指的是过期事件。 根据这个特性,重新调整一下流程图: ![](https://mmbiz.qpic.cn/mmbiz_png/yrY77YblI8LcWL5ECHd9nmh78icUYTRY9Qs3jcmaaOO4g76X7gMr7WVVZ1DUsZWaPxXiaQPXr0nsV4mNVNNwxicKQ/0?wx_fmt=png) 应该也比较好懂,下面通过简单的代码来实现一下这种方案。 首先是处理完数据及往Redis写数据。 ```cs public async Task DoTaskAsync() { // 数据处理 // ... // 后续操作要延时,把Id记录下来 var taskId = new Random().Next(1, 10000); // 要延迟的时间 int sec = new Random().Next(1, 5); // 可以加个重试机制,预防单次执行失败。 await RedisHelper.SetAsync($"task:{taskId}", "1", sec); } ``` 还需要回传结果的后台任务,这个任务就是去订阅上面说的键过期事件,然后回传结果。 这里可以借助`BackgroundService`来订阅处理。 ```cs public class SubscribeTaskBgTask : BackgroundService { protected override Task ExecuteAsync(CancellationToken stoppingToken) { stoppingToken.ThrowIfCancellationRequested(); var keyPrefix = "task:"; RedisHelper.Subscribe( ("__keyevent@0__:expired", arg => { var msg = arg.Body; Console.WriteLine($"recive {msg}"); if (msg.StartsWith(keyPrefix)) { // 取到任务Id var val = msg.Substring(keyPrefix.Length); Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} begin to do task {val}"); // 回传处理结果给第三方,这里可以考虑这个并发锁,避免多实例都处理了这个任务。 // .... } } )); return Task.CompletedTask; } } ``` 这里有一个要注意的地方,要在key里面包含任务的Id,因为订阅处理的时候,只能拿到一个key,后续能做的操作也只是基于这个key。 上面的例子,是用了`task:任务Id`的形式,所以在订阅处理的时候,只处理以`task:`开头的那些key。 效果如下: ![](https://mmbiz.qpic.cn/mmbiz_png/yrY77YblI8LcWL5ECHd9nmh78icUYTRY9pZBI4p5k1cXNNZDOiaJdfSJQSFJxZKJHL1n0Dz3HYibaibCCNrjAAV5yA/0?wx_fmt=png) 这种方案,直观上是非常简单的,不过这种方案会遇到一个小问题。 当一个key过期后,并不一定会马上收到通知,这个也是会有一定的延时的,取决于Redis的内部机制。 Redis Keyspace Notifications文档的最后一段也提到了这个问题。 ![](https://mmbiz.qpic.cn/mmbiz_png/yrY77YblI8LcWL5ECHd9nmh78icUYTRY9QLDLm6N9OaWUdfkIpKa8drEmBtKtL8xPIVPo3JQnxAf1nLJ1TOxzXQ/0?wx_fmt=png) 所以用这种方案的时候,要考虑一下,你的延时是不是要及时~~ ### #2 有序集合 有序集合是Redis中一种十分有用的数据结构,它的本质其实就是集合加了一个排序的功能,每个集合里面的元素还会有一个分值的属性。 它提供了一个可以获取指定分值范围内的元素,这个也就是我们的出发点。 在这个场景下,什么东西可能作为这个分值呢?现在只有一个处理任务的Id还有一个延迟的时间,Id肯定不行,那么也只能是延迟时间来作这个分值了。 延迟1秒,5秒,1分钟,这个都是比较大粒度的时间,这里要转化一下,用时间戳来代替这些延迟的时间。 假设现在的时间戳是 `1584171520`, 要延迟5秒执行,那么执行任务的时间就是 `1584171525`,在当前时间戳的基础上加个5秒,就是最终要执行的了。 到时有序集合中存的元素就会是这样的 ``` 任务Id-1 1584171525 任务Id-2 1584171528 任务Id-3 1584171530 ``` 接下来就是要怎么取出这些任务的问题了! 把当前时间戳当成是取数的最大分值,0作为最小分值,这个时候取出的元素就是应该要执行回传的任务了。 根据这个方案,重新调整一下流程图: ![](https://mmbiz.qpic.cn/mmbiz_png/yrY77YblI8LcWL5ECHd9nmh78icUYTRY9Mfj0ibwNcj2JIgCwfEcBBOicqib9ibedfibxOh0b18DFPoBg3QnOGOsiayJg/0?wx_fmt=png) 交代清楚了思路,再来点代码,加深一下理解。 首先还是处理完数据后往Redis写数据。 ```cs public async Task DoTaskAsync() { // 数据处理 // ... // 后续操作要延时,把Id记录下来 var taskId = new Random().Next(1, 10000); var cacheKey = "task:delay"; int sec = new Random().Next(1, 5); // 要执行这个任务的时间戳 var time = DateTimeOffset.Now.AddSeconds(sec).ToUnixTimeSeconds(); await RedisHelper.ZAddAsync(cacheKey, (time, taskId)); Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} done {taskId} here - {sec}"); } ``` 后面就是轮训有序集合里面的元素了,这里同样是借助`BackgroundService`来处理。 ```cs public class SubscribeTaskBgTask : BackgroundService { protected override async Task ExecuteAsync(CancellationToken stoppingToken) { stoppingToken.ThrowIfCancellationRequested(); var cacheKey = "task:delay"; while (true) { // 先取,后删,不具备原子性,可考虑用lua脚本来保证原子性。 var vals = await RedisHelper.ZRangeByScoreAsync(cacheKey, -1, DateTimeOffset.Now.ToUnixTimeSeconds(), 1, 0); if (vals != null && vals.Length > 0) { var val = vals[0]; var rmCount = await RedisHelper.ZRemAsync(cacheKey, vals); if (rmCount > 0) { // 要把这个元素先删除成功了,再执行任务,不然会重复 Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} begin to do task {val}"); // 回传处理结果给第三方,这里可以考虑这个并发锁,避免多实例都处理了这个任务。 // .... } } else { // 没有数据,休眠500ms,避免CPU空转 await Task.Delay(500); } } } } ``` 效果如下: ![](https://mmbiz.qpic.cn/mmbiz_png/yrY77YblI8LcWL5ECHd9nmh78icUYTRY9Ke3fqWKcHzRFXglvR5ftqibDBDau9XIPm7ZgUeYJUXreJbatTO7gQmA/0?wx_fmt=png) ## 参考文章 [https://redis.io/topics/notifications](https://redis.io/topics/notifications) [https://zhuanlan.zhihu.com/p/87113913](https://zhuanlan.zhihu.com/p/87113913)

加载全部内容

相关教程
猜你喜欢
用户评论