佳木斯湛栽影视文化发展公司

主頁 > 知識(shí)庫 > 基于Redis延遲隊(duì)列的實(shí)現(xiàn)代碼

基于Redis延遲隊(duì)列的實(shí)現(xiàn)代碼

熱門標(biāo)簽:網(wǎng)站排名優(yōu)化 地方門戶網(wǎng)站 服務(wù)外包 AI電銷 鐵路電話系統(tǒng) 百度競(jìng)價(jià)排名 呼叫中心市場(chǎng)需求 Linux服務(wù)器

使用場(chǎng)景

工作中大家往往會(huì)遇到類似的場(chǎng)景:

1.對(duì)于紅包場(chǎng)景,賬戶 A 對(duì)賬戶 B 發(fā)出紅包通常在 1 天后會(huì)自動(dòng)歸還到原賬戶。

2.對(duì)于實(shí)時(shí)支付場(chǎng)景,如果賬戶 A 對(duì)商戶 S 付款 100 元,5秒后沒有收到支付方回調(diào)將自動(dòng)取消訂單。

解決方案分析

方案一:

采用通過定時(shí)任務(wù)采用數(shù)據(jù)庫/非關(guān)系型數(shù)據(jù)庫輪詢方案。

優(yōu)點(diǎn):

1. 實(shí)現(xiàn)簡單,對(duì)于項(xiàng)目前期這樣是最容易的解決方案。

缺點(diǎn):

1. DB 有效使用率低,需要將一部分的數(shù)據(jù)庫的QPS分配給 JOB 的無效輪詢。

2. 服務(wù)資源浪費(fèi),因?yàn)檩喸冃枰獙?duì)所有的數(shù)據(jù)做一次 SCAN 掃描 JOB 服務(wù)的資源開銷很大。

方案二:

采用延遲隊(duì)列:

優(yōu)點(diǎn):

1. 服務(wù)的資源使用率較高,能夠精確的實(shí)現(xiàn)超時(shí)任務(wù)的執(zhí)行。

2. 減少 DB 的查詢次數(shù),能夠降低數(shù)據(jù)庫的壓力

缺點(diǎn):

1. 對(duì)于延遲隊(duì)列來說本身設(shè)計(jì)比較復(fù)雜,目前沒有通用的比較好過的方案。

基于 Redis 的延遲隊(duì)列實(shí)現(xiàn)

基于以上的分析,我決定通過 Redis 來實(shí)現(xiàn)分布式隊(duì)列。

設(shè)計(jì)思路:

1. 第一步將需要發(fā)放的消息發(fā)送到延遲隊(duì)列中。

2. 延遲隊(duì)列將數(shù)據(jù)存入 Redis 的 ZSet 有序集合中score 為當(dāng)前時(shí)間戳,member 存入需要發(fā)送的數(shù)據(jù)。

3. 添加一個(gè) schedule 來進(jìn)行對(duì) Redis 有序隊(duì)列的輪詢。

4. 如果到達(dá)達(dá)到消息的執(zhí)行時(shí)間,那么就進(jìn)行業(yè)務(wù)的執(zhí)行。

5. 如果沒有達(dá)到消息的執(zhí)行是將,那么消息等待下輪執(zhí)行。

實(shí)現(xiàn)步驟:

由于本處篇幅有限,所以只列舉部分代碼,完整的代碼可以在本文最后訪問 GitHub 獲取。由于本人閱歷/水平有限,如有建議/或更正歡迎留言或提問。先在此謝謝大家駐足閱讀 👏 👏 👏。

需要注意的問題:

單個(gè) Redis 命令的執(zhí)行是原子性的,但 Redis 沒有在事務(wù)上增加任何維持原子性的機(jī)制,所以 Redis 事務(wù)的執(zhí)行并不是原子性的。

事務(wù)可以理解為一個(gè)打包的批量執(zhí)行腳本,但批量指令并非原子化的操作,中間某條指令的失敗不會(huì)導(dǎo)致前面已做指令的回滾,也不會(huì)造成后續(xù)的指令不做。

我們可以通過 Redis 的 eval 命令來執(zhí)行 lua 腳本來保證原子性實(shí)現(xiàn)Redis的事務(wù)。

實(shí)現(xiàn)步驟如下:

1. 延遲隊(duì)列接口

/**
 * 延遲隊(duì)列
 *
 * @author zhengsh
 * @date 2020-03-27
 */
public interface RedisDelayQueueE extends DelayMessage> {

    String META_TOPIC_WAIT = "delay:meta:topic:wait";
    String META_TOPIC_ACTIVE = "delay:meta:topic:active";
    String TOPIC_ACTIVE = "delay:active:9999";
    /**
     * 拉取消息
     */
    void poll();

    /**
     * 推送延遲消息
     *
     * @param e
     */
    void push(E e);
}

2. 延遲隊(duì)列消息

/**
 * 消息體
 *
 * @author zhengsh
 * @date 2020-03-27
 */
@Setter
@Getter
public class DelayMessage {
    /**
     * 消息唯一標(biāo)識(shí)
     */
    private String id;
    /**
     * 消息主題
     */
    private String topic = "default";
    /**
     * 具體消息 json
     */
    private String body;
    /**
     * 延時(shí)時(shí)間, 格式為時(shí)間戳: 當(dāng)前時(shí)間戳 + 實(shí)際延遲毫秒數(shù)
     */
    private Long delayTime = System.currentTimeMillis() + 30000L;
    /**
     * 消息發(fā)送時(shí)間
     */
    private LocalDateTime createTime;
}

3. 延遲隊(duì)列實(shí)現(xiàn)

/**
 * 延遲隊(duì)列實(shí)現(xiàn)
 *
 * @author zhengsh
 * @date 2020-03-27
 */
@Component
public class RedisDelayQueueImplE extends DelayMessage> implements RedisDelayQueueE> {
    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Override
    public void poll() {
        // todo
    }

    /**
     * 發(fā)送消息
     *
     * @param e
     */
    @SneakyThrows
    @Override
    public void push(E e) {
        try {
            String jsonStr = JSON.toJSONString(e);
            String topic = e.getTopic();
            String zkey = String.format("delay:wait:%s", topic);
            String u =
                    "redis.call('sadd', KEYS[1], ARGV[1])\n" +
                            "redis.call('zadd', KEYS[2], ARGV[2], ARGV[3])\n" +
                            "return 1";

            Object[] keys = new Object[]{serialize(META_TOPIC_WAIT), serialize(zkey)};
            Object[] values = new Object[]{ serialize(zkey), serialize(String.valueOf(e.getDelayTime())),serialize(jsonStr)};

            Long result = redisTemplate.execute((RedisCallbackLong>) connection -> {
                Object nativeConnection = connection.getNativeConnection();

                if (nativeConnection instanceof RedisAsyncCommands) {
                    RedisAsyncCommands commands = (RedisAsyncCommands) nativeConnection;
                    return (Long) commands.getStatefulConnection().sync().eval(u, ScriptOutputType.INTEGER, keys, values);
                } else if (nativeConnection instanceof RedisAdvancedClusterAsyncCommands) {
                    RedisAdvancedClusterAsyncCommands commands = (RedisAdvancedClusterAsyncCommands) nativeConnection;
                    return (Long) commands.getStatefulConnection().sync().eval(u, ScriptOutputType.INTEGER, keys, values);
                }
                return 0L;
            });
            logger.info("延遲隊(duì)列[1],消息推送成功進(jìn)入等待隊(duì)列({}), topic: {}", result != null  result > 0, e.getTopic());
        } catch (Throwable t) {
            t.printStackTrace();
        }
    }

    private byte[] serialize(String key) {
        RedisSerializerString> stringRedisSerializer =
                (RedisSerializerString>) redisTemplate.getKeySerializer();
        //lettuce連接包下序列化鍵值,否則無法用默認(rèn)的ByteArrayCodec解析
        return stringRedisSerializer.serialize(key);
    }
}

4. 定時(shí)任務(wù)

/**
 * 分發(fā)任務(wù)
 */
@Component
public class DistributeTask {

    private static final String LUA_SCRIPT;
    private Logger logger = LoggerFactory.getLogger(getClass());
    @Autowired
    private StringRedisTemplate redisTemplate;

    static {
        StringBuilder sb = new StringBuilder(128);
        sb.append("local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1], 'limit', 0, 1)\n");
        sb.append("if(next(val) ~= nil) then\n");
        sb.append("    redis.call('sadd', KEYS[2], ARGV[2])\n");
        sb.append("    redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)\n");
        sb.append("    for i = 1, #val, 100 do\n");
        sb.append("        redis.call('rpush', KEYS[3], unpack(val, i, math.min(i+99, #val)))\n");
        sb.append("    end\n");
        sb.append("    return 1\n");
        sb.append("end\n");
        sb.append("return 0");
        LUA_SCRIPT = sb.toString();
    }

    /**
     * 2秒鐘掃描一次執(zhí)行隊(duì)列
     */
    @Scheduled(cron = "0/5 * * * * ?")
    public void scheduledTaskByCorn() {
        try {
            SetString> members = redisTemplate.opsForSet().members(META_TOPIC_WAIT);
            assert members != null;
            for (String k : members) {
                if (!redisTemplate.hasKey(k)) {
                    // 如果 KEY 不存在元數(shù)據(jù)中刪除
                    redisTemplate.opsForSet().remove(META_TOPIC_WAIT, k);
                    continue;
                }

                String lk = k.replace("delay:wait", "delay:active");
                Object[] keys = new Object[]{serialize(k), serialize(META_TOPIC_ACTIVE), serialize(lk)};
                Object[] values = new Object[]{serialize(String.valueOf(System.currentTimeMillis())), serialize(lk)};
                Long result = redisTemplate.execute((RedisCallbackLong>) connection -> {
                    Object nativeConnection = connection.getNativeConnection();

                    if (nativeConnection instanceof RedisAsyncCommands) {
                        RedisAsyncCommands commands = (RedisAsyncCommands) nativeConnection;
                        return (Long) commands.getStatefulConnection().sync().eval(LUA_SCRIPT, ScriptOutputType.INTEGER, keys, values);
                    } else if (nativeConnection instanceof RedisAdvancedClusterAsyncCommands) {
                        RedisAdvancedClusterAsyncCommands commands = (RedisAdvancedClusterAsyncCommands) nativeConnection;
                        return (Long) commands.getStatefulConnection().sync().eval(LUA_SCRIPT, ScriptOutputType.INTEGER, keys, values);
                    }
                    return 0L;
                });
                logger.info("延遲隊(duì)列[2],消息到期進(jìn)入執(zhí)行隊(duì)列({}): {}", result != null  result > 0, TOPIC_ACTIVE);
            }
        } catch (Throwable t) {
            t.printStackTrace();
        }
    }

    private byte[] serialize(String key) {
        RedisSerializerString> stringRedisSerializer =
                (RedisSerializerString>) redisTemplate.getKeySerializer();
        //lettuce連接包下序列化鍵值,否則無法用默認(rèn)的ByteArrayCodec解析
        return stringRedisSerializer.serialize(key);
    }
}

GitHub 地址

https://github.com/zhengsh/redis-delay-queue

參考地址

1.https://www.runoob.com/redis/redis-transactions.html

到此這篇關(guān)于基于Redis延遲隊(duì)列的實(shí)現(xiàn)代碼的文章就介紹到這了,更多相關(guān)Redis 延遲隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

您可能感興趣的文章:
  • SpringBoot集成Redisson實(shí)現(xiàn)延遲隊(duì)列的場(chǎng)景分析
  • php使用redis的有序集合zset實(shí)現(xiàn)延遲隊(duì)列應(yīng)用示例
  • Redis延遲隊(duì)列和分布式延遲隊(duì)列的簡答實(shí)現(xiàn)

標(biāo)簽:湘潭 銅川 黃山 衡水 仙桃 湖南 蘭州 崇左

巨人網(wǎng)絡(luò)通訊聲明:本文標(biāo)題《基于Redis延遲隊(duì)列的實(shí)現(xiàn)代碼》,本文關(guān)鍵詞  ;如發(fā)現(xiàn)本文內(nèi)容存在版權(quán)問題,煩請(qǐng)?zhí)峁┫嚓P(guān)信息告之我們,我們將及時(shí)溝通與處理。本站內(nèi)容系統(tǒng)采集于網(wǎng)絡(luò),涉及言論、版權(quán)與本站無關(guān)。
  • 相關(guān)文章
  • 收縮
    • 微信客服
    • 微信二維碼
    • 電話咨詢

    • 400-1100-266
    华容县| 顺昌县| 黎城县| 资兴市| 陇川县| 屯昌县| 明溪县| 绵竹市| 嵊州市| 平泉县| 防城港市| 东山县| 潞西市| 荔波县| 琼中| 平邑县| 措美县| 达孜县| 肥乡县| 延庆县| 聊城市| 英德市| 都匀市| 罗山县| 正蓝旗| 临汾市| 儋州市| 韶关市| 扎赉特旗| 台中县| 当涂县| 鹿邑县| 西乌珠穆沁旗| 广水市| 延安市| 木里| 巴林右旗| 临沭县| 玉田县| 威海市| 鹤岗市|