前言
在学习过程中发现redis的zset还可以用来实现轻量级的延时消息队列功能,虽然可靠性还有待提高,但是对于一些对数据可靠性要求不那么高的功能要求完全可以实现。本次主要采用了redis中zset中的zadd, zrangebyscore 和 zdel来实现一个小demo。
提前准备 安装redis, redis-go
因为用的是macOS, 直接
$ brew install redis $ go get github.com/garyburd/redigo/redis
又因为比较懒,生成任务的唯一id时,直接采用了bson中的objectId,所以:
$ go get gopkg.in/mgo.v2/bson
唯一id不是必须有,但如果之后有实际应用需要携带,便于查找相应任务。
生产者
通过一个for循环生成10w个任务, 每一个任务有不同的时间
func producer() { count := 0 //生成100000个任务 for count < 100000 { count++ dealTime := int64(rand.Intn(5)) + time.Now().Unix() uuid := bson.NewObjectId().Hex() redis.Client.AddJob(&job.JobMessage{ Id: uuid, DealTime: dealTime, }, + int64(dealTime)) } }
其中AddJob函数在另一个包中, 将上一个函数中随机生成的时间作为需要处理的时间戳.
// 添加任务 func (client *RedisClient) AddJob(msg *job.JobMessage, dealTime int64) { conn := client.Get() defer conn.Close() key := "JOB_MESSAGE_QUEUE" conn.Do("zadd", key, dealTime, util.JsonEncode(msg)) }
消费者
消费者处理流程分为两个步骤:
- 获取小于等于当前时间戳的任务
- 通过删除当前任务来判断谁获得了当前任务
因为在获取小于等于当前时间戳的任务时,可能有多个go routine同时读到了当前任务,而只有一个任务可以来处理当前任务。因此我们需要通过一个方案来判断究竟由谁来处理这个任务(当然如果只有一个消费者可以读到就直接处理):这个时候可以通过redis的删除操作来获取,因为删除指定value时只有成功的操作才会返回不为0,所以我们可以认为删除当前队列成功的那个go routine拿到了当前的任务。
下面是代码:
// 消费者 func consumer() { // 启动10个go routine一起去拿 count := 0 for count < 10 { go func() { for { jobs := redis.Client.GetJob() if len(jobs) <= 0 { time.Sleep(time.Second * 1) continue } currentJob := jobs[0] // 如果当前抢redis队列成功, if redis.Client.DelJob(currentJob) > 0 { var jobMessage job.JobMessage util.JsonDecode(currentJob, &jobMessage) //自定义的json解析函数 handleMessage(&jobMessage) } } }() count++ } } // 处理任务用函数 func handleMessage(msg *job.JobMessage) { fmt.Printf("deal job: %s, require time: %d \n", msg.Id, msg.DealTime) go func() { countChan <- true }() }
redis部分的代码,获取任务和删除任务
// 获取任务 func (client *RedisClient) GetJob() []string { conn := client.Get() defer conn.Close() key := "JOB_MESSAGE_QUEUE" timeNow := time.Now().Unix() ret, err := redis.Strings(conn.Do("zrangebyscore", key, 0, timeNow, "limit", 0, 1)) if err != nil { panic(err) } return ret } // 删除当前任务, 用来判断是否抢到了当前任务 func (client *RedisClient) DelJob(value string) int { conn := client.Get() defer conn.Close() key := "JOB_MESSAGE_QUEUE" ret, err := redis.Int(conn.Do("zrem", key, value)) if err != nil { panic(err) } return ret }
代码大抵如此。最后跑起来之后,大概每3-4秒钟能够处理掉1w个任务,速度上确实是...
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
广告合作:本站广告合作请联系QQ:858582 申请时备注:广告合作(否则不回)
免责声明:本站资源来自互联网收集,仅供用于学习和交流,请遵循相关法律法规,本站一切资源不代表本站立场,如有侵权、后门、不妥请联系本站删除!
免责声明:本站资源来自互联网收集,仅供用于学习和交流,请遵循相关法律法规,本站一切资源不代表本站立场,如有侵权、后门、不妥请联系本站删除!
暂无评论...
更新日志
2024年05月20日
2024年05月20日
- 中唱唱片群星《好歌珍藏-士兵旋律》2CDWAV
- 出发吧麦芬各职业特点解析 玩什么职业好
- 《哈迪斯2》EA阶段至少持续到年底!此前将有重大更新
- 崩坏星穹铁道梦中之梦12平民满星攻略 梦中之梦12阵容搭配分享
- 钟志刚《淡淡君情》24K金限量头版[低速原抓WAV+CUE]
- 金山游戏封神再临视频首曝 预计年内上线
- IGN分享PC《对马岛之魂》28分钟实机:极致的画面表现
- 钟明秋《是时候HQ》头版限量编号[低速原抓WAV分轨]
- 蜀门手游五月大服龙城飞将开启 全新大逃杀玩法上线
- 崩坏星穹铁道平民神主日怎么打 神主日萌新通关攻略
- 赵传《我是一只小小鸟》日本东芝1A1版 [WAV+CUE][435M]
- 庄达菲《东张西望》[320K/MP3][40.28MB]
- 庄达菲《东张西望》[24bit 48kHz][FLAC/分轨][288.46MB]
- 金海心.-.[心感觉].专辑[原抓WAV+CUE]
- KOKIA心は?かり(2012K2HD2016Mora)[24bit96kHzFLAC]