请问RocketMQ Spring , 如何 让日志里携带 traceId, spanId ?[阿里云消息队列MQ]

请问RocketMQ Spring , 如何 让日志里携带 traceId, spanId ?

「点点赞赏,手留余香」

    还没有人赞赏,快来当第一个赞赏的人吧!
=====这是一个广告位,招租中,联系qq 78315851====
3 条回复 A 作者 M 管理员
  1. 如果您想要在RocketMQ Spring的日志中携带TraceId和SpanId,则可以使用MDC(Mapped Diagnostic Context)功能。MDC是log4j框架提供的一种上下文信息传递的机制,可以将一些变量绑定到当前线程,使得这些变量在整个线程生命周期内都可以访问。

    以下是一个示例配置,可用于设置MDC并在日志中携带TraceId和SpanId:

    %d{yyyy-MM-dd HH:mm:ss.SSS} ${PID:- } [%thread] %-5level %logger{36} – traceId:%X{traceId}, spanId:%X{spanId} – %msg%n

    在上面的示例中,我们使用了logback作为日志框架,并且定义了一个名为“CONSOLE”的输出器,输出内容包括时间、线程、日志级别、Logger名称、TraceId和SpanId。然后我们将“org.apache.rocketmq”Logger与CONSOLE输出器关联,并通过MDC将TraceId和SpanId设置为全局属性。

    接着,在代码中需要生成TraceId和SpanId时,您可以使用Spring Cloud Sleuth库来创建TraceId和SpanId:

    import org.springframework.cloud.sleuth.*; // …

    @Autowired(required = false) private Tracer tracer;

    public void sendMessage(String message) { if (tracer != null) { Span newSpan = tracer.nextSpan().name(“sendMessage”).start(); try (Tracer.SpanInScope ws = tracer.withSpan(newSpan.start())) { // Do something with the message // … } finally { newSpan.tag(“message”, message); newSpan.finish(); } } else { // Fallback behavior when Tracer is not available // … } } 在上面的示例中,我们注入了一个名为tracer的Tracer对象,并使用nextSpan()方法创建一个新的Span。然后,我们使用withSpan()方法将当前线程与新的Span关联,并执行需要进行跟踪的操作(例如发送消息)。最后,我们通过tag()方法将消息内容添加到Span中,并调用finish()方法完成Span。

    这样,当执行sendMessage()方法时,MDC会自动将TraceId和SpanId设置为全局属性,并在日志中携带它们。

  2. 在RocketMQ Spring中,可以通过配置RocketMQ的ProducerInterceptorConsumerInterceptor来实现在日志中携带traceIdspanId

    ProducerInterceptor中,你可以通过MessageExtputUserProperty方法将traceIdspanId添加到消息的用户属性中,例如:

    public class TraceProducerInterceptor implements ProducerInterceptor {    private static final String TRACE_ID = "traceId";    private static final String SPAN_ID = "spanId";    @Override    public Message beforeConvert(Message message) {        String traceId = TraceContext.getTraceId();        String spanId = TraceContext.getSpanId();        if (traceId != null && spanId != null) {            MessageHeaders headers = message.getHeaders();            headers.put(TRACE_ID, traceId);            headers.put(SPAN_ID, spanId);            if (message instanceof ErrorMessage) {                ErrorMessage errorMessage = (ErrorMessage) message;                Message originalMessage = errorMessage.getOriginalMessage();                if (originalMessage != null) {                    originalMessage = beforeConvert(originalMessage);                    errorMessage = new ErrorMessage(                            errorMessage.getPayload(), originalMessage, errorMessage.getHeaders(), errorMessage.getCause());                }                return errorMessage;            }        }        return message;    }}

    ConsumerInterceptor中,你可以通过MessageExtgetUserProperty方法获取到消息中的traceIdspanId,并将其添加到日志中,例如:

    public class TraceConsumerInterceptor implements ConsumerInterceptor {    private static final String TRACE_ID = "traceId";    private static final String SPAN_ID = "spanId";    @Override    public void afterReceive(Message message) {        String traceId = message.getHeaders().get(TRACE_ID, String.class);        String spanId = message.getHeaders().get(SPAN_ID, String.class);        if (traceId != null && spanId != null) {            MDC.put(TRACE_ID, traceId);            MDC.put(SPAN_ID, spanId);        }    }}

    在配置文件中添加ProducerInterceptorConsumerInterceptor

    rocketmq:  producer:    interceptor:      - com.example.TraceProducerInterceptor  consumer:    interceptor:      - com.example.TraceConsumerInterceptor

    这样就可以在RocketMQ Spring的日志中携带traceIdspanId了。

  3. RocketMQ Spring 支持使用 OpenTracing(或 Jaeger)来实现分布式链路追踪,可以通过在消息发送和消费时设置 traceId 和 spanId 来携带这些信息。

    具体来说,可以通过在消息体中设置 traceId 和 spanId,然后在消息发送前将这些信息设置到消息的属性中。例如,可以使用 Tracer 接口来生成 traceId 和 spanId,然后将这些信息设置到消息的属性中,例如:

    java Copy @Autowired private Tracer tracer;

    @Autowired private RocketMQTemplate rocketMQTemplate;

    public void sendMessage(String message) { String traceId = tracer.activeSpan().context().toTraceId(); String spanId = tracer.activeSpan().context().toSpanId();

    Message msg = MessageBuilder.withPayload(message)        .setHeader("TRACE_ID", traceId)        .setHeader("SPAN_ID", spanId)        .build();rocketMQTemplate.syncSend("topic", msg);

    } 在消息消费时,可以通过订阅消息时设置 MessageListenerConcurrently 或 MessageListenerOrderly 接口的实现类,然后在 consumeMessage 方法中获取消息属性中的 traceId 和 spanId。例如:

    java Copy @Component @RocketMQMessageListener(topic = “topic”, consumerGroup = “consumer-group”) public class MyMessageListener implements MessageListenerConcurrently {

    @Overridepublic ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {    for (MessageExt msg : msgs) {        String traceId = msg.getProperty("TRACE_ID");        String spanId = msg.getProperty("SPAN_ID");        // 处理消息    }    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}

    } 需要注意的是,使用 OpenTracing(或 Jaeger)进行分布式链路追踪需要在应用中引入相应的依赖包,以及配置相应的参数。具体的配置方式可以参考 OpenTracing 或 Jaeger 的官方文档。

  4. 可以通过RocketMQ的消息拦截器(MessageInterceptor)来实现在发送和接收消息时携带traceId和spanId,然后通过日志框架将其记录下来。

    具体实现步骤如下:

    1、定义一个自定义的MessageInterceptor,实现beforeSend和afterConsume方法,在这两个方法中可以获取traceId和spanId,并将其添加到消息的属性中。

    public class TraceMessageInterceptor implements MessageInterceptor {    @Override    public Message beforeSend(Message message, org.apache.rocketmq.common.message.Message rocketmqMessage, SendCallback callback) {        TraceContext traceContext = TraceContext.get();        if (traceContext != null) {            rocketmqMessage.putUserProperty("traceId", traceContext.getTraceId());            rocketmqMessage.putUserProperty("spanId", traceContext.getSpanId());        }        return message;    }    @Override    public void afterConsume(MessageExt messageExt) {        String traceId = messageExt.getUserProperty("traceId");        String spanId = messageExt.getUserProperty("spanId");        TraceContext.set(new TraceContext(traceId, spanId));    }}

    2、在Spring Boot中配置RocketMQ的消息拦截器

    @Configurationpublic class RocketMQConfig {    @Autowired    private TraceMessageInterceptor traceMessageInterceptor;    @Bean    public DefaultMQProducer defaultMQProducer() throws MQClientException {        DefaultMQProducer producer = new DefaultMQProducer("producer_group");        producer.setNamesrvAddr("localhost:9876");        producer.setSendMsgTimeout(10000);        producer.setVipChannelEnabled(false);        producer.setRetryTimesWhenSendFailed(3);        producer.setRetryTimesWhenSendAsyncFailed(3);        producer.setRetryAnotherBrokerWhenNotStoreOK(true);        producer.setSendMessageWithVIPChannel(false);        producer.setSendLatencyFaultEnable(true);        producer.setUnitMode(false);        producer.setCompressMsgBodyOverHowmuch(1024 * 4);        producer.setInterceptors(Arrays.asList(traceMessageInterceptor));        producer.start();        return producer;    }    @Bean    public DefaultMQPushConsumer defaultMQPushConsumer(MQListener mqListener) throws MQClientException {        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");        consumer.setNamesrvAddr("localhost:9876");        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);        consumer.subscribe("topic", "*");        consumer.setConsumeThreadMax(1);        consumer.setConsumeThreadMin(1);        consumer.setPullBatchSize(32);        consumer.setPullInterval(0);        consumer.setConsumeMessageBatchMaxSize(1);        consumer.setConsumeTimeout(15);        consumer.setInterceptors(Arrays.asList(traceMessageInterceptor));        consumer.registerMessageListener(mqListener);        consumer.start();        return consumer;    }}

    3、在日志框架中添加traceId和spanId的日志输出

    @Slf4j@Componentpublic class TraceLogFilter extends OncePerRequestFilter {    @Override    protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException {        TraceContext traceContext = TraceContext.get();        if (traceContext != null) {            MDC.put("traceId", traceContext.getTraceId());            MDC.put("spanId", traceContext.getSpanId());        }        try {            filterChain.doFilter(request, response);        } finally {            MDC.clear();        }    }}

    通过这种方式,就可以在RocketMQ的消息中携带traceId和spanId,并通过日志框架将其输出到日志中,从而方便地跟踪分布式调用链路。