RocketMQ5.0中的mqadmin在创建延时消息的主题时,大家有遇到过么?怎么解决的呢?[阿里云消息队列MQ]

RocketMQ5.0中的mqadmin在创建延时消息的主题时不支持官方文档中说的添加额外属性指定主题类型。大家有遇到过么?怎么解决的呢?

「点点赞赏,手留余香」

    还没有人赞赏,快来当第一个赞赏的人吧!
=====这是一个广告位,招租中,联系qq 78315851====
1 条回复 A 作者 M 管理员
  1. 在RocketMQ 5.0中,确实存在无法通过mqadmin添加额外属性指定主题类型的问题。这是因为在5.0版本中,延时消息的主题类型默认为DELAY,无法通过mqadmin命令行工具进行修改。

    如果您需要创建其他类型的主题(如NORMAL),可以通过编程方式使用RocketMQ提供的Java SDK来创建主题,并在创建主题时指定类型。

    以下是使用Java SDK创建主题并指定类型的示例代码:

    import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.common.admin.TopicStatus;import org.apache.rocketmq.common.protocol.ResponseCode;import org.apache.rocketmq.remoting.exception.RemotingException;import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;import org.apache.rocketmq.tools.admin.MQAdminExt;import org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand;public class CreateTopicExample {    public static void main(String[] args) throws RemotingException, MQClientException, InterruptedException {        // 创建MQAdminExt对象        MQAdminExt admin = new DefaultMQAdminExt();        admin.start();        // 创建主题        String topic = "your_topic"; // 替换成您的主题名称        String brokerAddr = "127.0.0.1:9876"; // 替换成您的Broker地址        int queueNum = 4; // 主题队列数        int topicType = TopicStatus.NORMAL_TOPIC; // 主题类型,NORMAL_TOPIC为普通类型        try {            // 创建主题            admin.createAndUpdateTopicConfig(brokerAddr, topic, queueNum, topicType);            // 更新主题配置            UpdateTopicSubCommand cmd = new UpdateTopicSubCommand();            cmd.setAdminTool(admin);            cmd.execute(new String[]{"-b", brokerAddr, "-t", topic});        } catch (Exception e) {            e.printStackTrace();        } finally {            // 关闭MQAdminExt对象            admin.shutdown();        }    }}

    上述代码示例中,通过创建DefaultMQAdminExt对象来与RocketMQ进行交互。首先使用createAndUpdateTopicConfig方法创建主题,并指定主题类型为NORMAL_TOPIC。然后使用UpdateTopicSubCommand类的execute方法更新主题配置。

    您需要将代码中的brokerAddrtopic替换为您实际使用的Broker地址和主题名称。

    通过编程方式创建主题并指定类型可以解决mqadmin无法添加额外属性指定主题类型的问题。但是需要注意,这种方式需要您具备Java编程的基础,并且需要在您的项目中引入RocketMQ的Java SDK。

  2. 消息仅支持在MessageType为Delay的主题内使用,即定时消息只能发送至类型为定时消息的主题中,发送的消息的类型必须和主题的类型一致。

    https://help.aliyun.com/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-5-x-series/developer-reference/scheduled-and-delayed-messages?spm=a2c4g.11186623.0.i27#concept-2217704

    使用示例
    和普通消息相比,定时消费发送时,必须设置定时触发的目标时间戳。

    以Java语言为例,使用定时消息示例参考如下:

            MessageBuilder messageBuilder = new MessageBuilder();        //以下示例表示:延迟时间为10分钟之后的Unix时间戳。        Long deliverTimeStamp = System.currentTimeMillis() + 10L * 60 * 1000;        Message message = messageBuilder.setTopic("topic")                //设置消息索引键,可根据关键字精确查找某条消息。                .setKeys("messageKey")                //设置消息Tag,用于消费端根据指定Tag过滤消息。                .setTag("messageTag")                .setDeliveryTimestamp(deliverTimeStamp)                //消息体                .setBody("messageBody".getBytes())                .build();        try {            //发送消息,需要关注发送结果,并捕获失败等异常。            SendReceipt sendReceipt = producer.send(message);            System.out.println(sendReceipt.getMessageId());        } catch (ClientException e) {            e.printStackTrace();        }        //消费示例一:使用PushConsumer消费定时消息,只需要在消费监听器处理即可。        MessageListener messageListener = new MessageListener() {            @Override            public ConsumeResult consume(MessageView messageView) {                System.out.println(messageView.getDeliveryTimestamp());                //根据消费结果返回状态。                return ConsumeResult.SUCCESS;            }        };        //消费示例二:使用SimpleConsumer消费定时消息,主动获取消息进行消费处理并提交消费结果。        List messageViewList = null;        try {            messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));            messageViewList.forEach(messageView -> {                System.out.println(messageView);                //消费处理完成后,需要主动调用ACK提交消费结果。                try {                    simpleConsumer.ack(messageView);                } catch (ClientException e) {                    e.printStackTrace();                }            });        } catch (ClientException e) {            //如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。            e.printStackTrace();        }