亲宝软件园·资讯

展开

RocketMQ事务消息图文示例讲解

一个双子座的Java攻城狮 人气:0

RocketMQ 也允许我们像mysql 一样发送具有事务特征的消息

MQ 的事务流程(本地代码正常执行)

MQ 的消息补偿过程(当本地代码执行失败时)

MQ 消息的三种状态

注意:事务消息仅与生产者有关,与消费者无关

生产者代码(提交状态、回滚状态):

public class Producer {
    public static void main(String[] args) throws Exception{
        //事务消息使用的生产者是TransactionMQProducer
        TransactionMQProducer producer = new TransactionMQProducer("group1");
        producer.setNamesrvAddr("192.168.23.127:9876");
        //添加本地事务对应的监听
        producer.setTransactionListener(new TransactionListener() {
            //正常事务过程
            @Override
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                // 此处写本地事务处理业务
                // 如果成功,消息改为提交,如果失败改为 回滚,如果是多线程处理状态未知,就提交为未知等待事务补偿过程
                //事务提交状态
                return LocalTransactionState.COMMIT_MESSAGE;// 类似于msql 的 commit
                //return LocalTransactionState.ROLLBACK_MESSAGE;回滚状态
            }
            //事务补偿过程
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                return null;
            }
        });
        producer.start();
        Message msg = new Message("topic8",("事务消息:hello rocketmq ").getBytes("UTF-8"));
        SendResult result = producer.sendMessageInTransaction(msg,null);
        System.out.println("返回结果:"+result);
        producer.shutdown();
    }
}

生产者(中间状态):

public class Producer {
    public static void main(String[] args) throws Exception{
        //事务消息使用的生产者是TransactionMQProducer
        TransactionMQProducer producer = new TransactionMQProducer("group1");
        producer.setNamesrvAddr("192.168.23.127:9876");
        //添加本地事务对应的监听
        producer.setTransactionListener(new TransactionListener() {
            //正常事务过程
            @Override
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                return LocalTransactionState.UNKNOW;
            }
            //事务补偿过程
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                System.out.println("事务补偿过程执行");
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });
        producer.start();
        Message msg = new Message("topic8",("事务消息:hello rocketmq ").getBytes("UTF-8"));
        SendResult result = producer.sendMessageInTransaction(msg,null);
        System.out.println("返回结果:"+result);
        //事务补偿过程必须保障服务器在运行过程中,否则将无法进行正常的事务补偿
        //producer.shutdown();
    }
}

加载全部内容

相关教程
猜你喜欢
用户评论