tongchenkeji 发表于:2023-6-11 23:02:170次点击 已关注取消关注 关注 私信 请问RocketMQ Spring , 如何 让日志里携带 traceId, spanId ?[阿里云消息队列MQ] 暂停朗读为您朗读 请问RocketMQ Spring , 如何 让日志里携带 traceId, spanId ? 「点点赞赏,手留余香」 赞赏 还没有人赞赏,快来当第一个赞赏的人吧! 海报 消息队列 MQ# Java948# RocketMQ973# Spring48# 云消息队列 MQ1430# 日志服务1139# 消息中间件1371
安然ARAM 2023-11-28 0:14:26 1 如果您想要在RocketMQ Spring的日志中携带TraceId和SpanId,则可以使用MDC(Mapped Diagnostic Context)功能。MDC是log4j框架提供的一种上下文信息传递的机制,可以将一些变量绑定到当前线程,使得这些变量在整个线程生命周期内都可以访问。 以下是一个示例配置,可用于设置MDC并在日志中携带TraceId和SpanId: %d{yyyy-MM-dd HH:mm:ss.SSS} ${PID:- } [%thread] %-5level %logger{36} – traceId:%X{traceId}, spanId:%X{spanId} – %msg%n 在上面的示例中,我们使用了logback作为日志框架,并且定义了一个名为“CONSOLE”的输出器,输出内容包括时间、线程、日志级别、Logger名称、TraceId和SpanId。然后我们将“org.apache.rocketmq”Logger与CONSOLE输出器关联,并通过MDC将TraceId和SpanId设置为全局属性。 接着,在代码中需要生成TraceId和SpanId时,您可以使用Spring Cloud Sleuth库来创建TraceId和SpanId: import org.springframework.cloud.sleuth.*; // … @Autowired(required = false) private Tracer tracer; public void sendMessage(String message) { if (tracer != null) { Span newSpan = tracer.nextSpan().name(“sendMessage”).start(); try (Tracer.SpanInScope ws = tracer.withSpan(newSpan.start())) { // Do something with the message // … } finally { newSpan.tag(“message”, message); newSpan.finish(); } } else { // Fallback behavior when Tracer is not available // … } } 在上面的示例中,我们注入了一个名为tracer的Tracer对象,并使用nextSpan()方法创建一个新的Span。然后,我们使用withSpan()方法将当前线程与新的Span关联,并执行需要进行跟踪的操作(例如发送消息)。最后,我们通过tag()方法将消息内容添加到Span中,并调用finish()方法完成Span。 这样,当执行sendMessage()方法时,MDC会自动将TraceId和SpanId设置为全局属性,并在日志中携带它们。
vohelonAM 2023-11-28 0:14:26 2 在RocketMQ Spring中,可以通过配置RocketMQ的ProducerInterceptor和ConsumerInterceptor来实现在日志中携带traceId和spanId。 在ProducerInterceptor中,你可以通过MessageExt的putUserProperty方法将traceId和spanId添加到消息的用户属性中,例如: public class TraceProducerInterceptor implements ProducerInterceptor { private static final String TRACE_ID = "traceId"; private static final String SPAN_ID = "spanId"; @Override public Message> beforeConvert(Message> message) { String traceId = TraceContext.getTraceId(); String spanId = TraceContext.getSpanId(); if (traceId != null && spanId != null) { MessageHeaders headers = message.getHeaders(); headers.put(TRACE_ID, traceId); headers.put(SPAN_ID, spanId); if (message instanceof ErrorMessage) { ErrorMessage errorMessage = (ErrorMessage) message; Message> originalMessage = errorMessage.getOriginalMessage(); if (originalMessage != null) { originalMessage = beforeConvert(originalMessage); errorMessage = new ErrorMessage( errorMessage.getPayload(), originalMessage, errorMessage.getHeaders(), errorMessage.getCause()); } return errorMessage; } } return message; }} 在ConsumerInterceptor中,你可以通过MessageExt的getUserProperty方法获取到消息中的traceId和spanId,并将其添加到日志中,例如: public class TraceConsumerInterceptor implements ConsumerInterceptor { private static final String TRACE_ID = "traceId"; private static final String SPAN_ID = "spanId"; @Override public void afterReceive(Message> message) { String traceId = message.getHeaders().get(TRACE_ID, String.class); String spanId = message.getHeaders().get(SPAN_ID, String.class); if (traceId != null && spanId != null) { MDC.put(TRACE_ID, traceId); MDC.put(SPAN_ID, spanId); } }} 在配置文件中添加ProducerInterceptor和ConsumerInterceptor: rocketmq: producer: interceptor: - com.example.TraceProducerInterceptor consumer: interceptor: - com.example.TraceConsumerInterceptor 这样就可以在RocketMQ Spring的日志中携带traceId和spanId了。
算精通AM 2023-11-28 0:14:26 3 RocketMQ Spring 支持使用 OpenTracing(或 Jaeger)来实现分布式链路追踪,可以通过在消息发送和消费时设置 traceId 和 spanId 来携带这些信息。 具体来说,可以通过在消息体中设置 traceId 和 spanId,然后在消息发送前将这些信息设置到消息的属性中。例如,可以使用 Tracer 接口来生成 traceId 和 spanId,然后将这些信息设置到消息的属性中,例如: java Copy @Autowired private Tracer tracer; @Autowired private RocketMQTemplate rocketMQTemplate; public void sendMessage(String message) { String traceId = tracer.activeSpan().context().toTraceId(); String spanId = tracer.activeSpan().context().toSpanId(); Message msg = MessageBuilder.withPayload(message) .setHeader("TRACE_ID", traceId) .setHeader("SPAN_ID", spanId) .build();rocketMQTemplate.syncSend("topic", msg); } 在消息消费时,可以通过订阅消息时设置 MessageListenerConcurrently 或 MessageListenerOrderly 接口的实现类,然后在 consumeMessage 方法中获取消息属性中的 traceId 和 spanId。例如: java Copy @Component @RocketMQMessageListener(topic = “topic”, consumerGroup = “consumer-group”) public class MyMessageListener implements MessageListenerConcurrently { @Overridepublic ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { String traceId = msg.getProperty("TRACE_ID"); String spanId = msg.getProperty("SPAN_ID"); // 处理消息 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} } 需要注意的是,使用 OpenTracing(或 Jaeger)进行分布式链路追踪需要在应用中引入相应的依赖包,以及配置相应的参数。具体的配置方式可以参考 OpenTracing 或 Jaeger 的官方文档。
穿过生命散发芬芳AM 2023-11-28 0:14:26 4 可以通过RocketMQ的消息拦截器(MessageInterceptor)来实现在发送和接收消息时携带traceId和spanId,然后通过日志框架将其记录下来。 具体实现步骤如下: 1、定义一个自定义的MessageInterceptor,实现beforeSend和afterConsume方法,在这两个方法中可以获取traceId和spanId,并将其添加到消息的属性中。 public class TraceMessageInterceptor implements MessageInterceptor { @Override public Message> beforeSend(Message> message, org.apache.rocketmq.common.message.Message rocketmqMessage, SendCallback callback) { TraceContext traceContext = TraceContext.get(); if (traceContext != null) { rocketmqMessage.putUserProperty("traceId", traceContext.getTraceId()); rocketmqMessage.putUserProperty("spanId", traceContext.getSpanId()); } return message; } @Override public void afterConsume(MessageExt messageExt) { String traceId = messageExt.getUserProperty("traceId"); String spanId = messageExt.getUserProperty("spanId"); TraceContext.set(new TraceContext(traceId, spanId)); }} 2、在Spring Boot中配置RocketMQ的消息拦截器 @Configurationpublic class RocketMQConfig { @Autowired private TraceMessageInterceptor traceMessageInterceptor; @Bean public DefaultMQProducer defaultMQProducer() throws MQClientException { DefaultMQProducer producer = new DefaultMQProducer("producer_group"); producer.setNamesrvAddr("localhost:9876"); producer.setSendMsgTimeout(10000); producer.setVipChannelEnabled(false); producer.setRetryTimesWhenSendFailed(3); producer.setRetryTimesWhenSendAsyncFailed(3); producer.setRetryAnotherBrokerWhenNotStoreOK(true); producer.setSendMessageWithVIPChannel(false); producer.setSendLatencyFaultEnable(true); producer.setUnitMode(false); producer.setCompressMsgBodyOverHowmuch(1024 * 4); producer.setInterceptors(Arrays.asList(traceMessageInterceptor)); producer.start(); return producer; } @Bean public DefaultMQPushConsumer defaultMQPushConsumer(MQListener mqListener) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group"); consumer.setNamesrvAddr("localhost:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("topic", "*"); consumer.setConsumeThreadMax(1); consumer.setConsumeThreadMin(1); consumer.setPullBatchSize(32); consumer.setPullInterval(0); consumer.setConsumeMessageBatchMaxSize(1); consumer.setConsumeTimeout(15); consumer.setInterceptors(Arrays.asList(traceMessageInterceptor)); consumer.registerMessageListener(mqListener); consumer.start(); return consumer; }} 3、在日志框架中添加traceId和spanId的日志输出 @Slf4j@Componentpublic class TraceLogFilter extends OncePerRequestFilter { @Override protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException { TraceContext traceContext = TraceContext.get(); if (traceContext != null) { MDC.put("traceId", traceContext.getTraceId()); MDC.put("spanId", traceContext.getSpanId()); } try { filterChain.doFilter(request, response); } finally { MDC.clear(); } }} 通过这种方式,就可以在RocketMQ的消息中携带traceId和spanId,并通过日志框架将其输出到日志中,从而方便地跟踪分布式调用链路。
如果您想要在RocketMQ Spring的日志中携带TraceId和SpanId,则可以使用MDC(Mapped Diagnostic Context)功能。MDC是log4j框架提供的一种上下文信息传递的机制,可以将一些变量绑定到当前线程,使得这些变量在整个线程生命周期内都可以访问。
以下是一个示例配置,可用于设置MDC并在日志中携带TraceId和SpanId:
%d{yyyy-MM-dd HH:mm:ss.SSS} ${PID:- } [%thread] %-5level %logger{36} – traceId:%X{traceId}, spanId:%X{spanId} – %msg%n
在上面的示例中,我们使用了logback作为日志框架,并且定义了一个名为“CONSOLE”的输出器,输出内容包括时间、线程、日志级别、Logger名称、TraceId和SpanId。然后我们将“org.apache.rocketmq”Logger与CONSOLE输出器关联,并通过MDC将TraceId和SpanId设置为全局属性。
接着,在代码中需要生成TraceId和SpanId时,您可以使用Spring Cloud Sleuth库来创建TraceId和SpanId:
import org.springframework.cloud.sleuth.*; // …
@Autowired(required = false) private Tracer tracer;
public void sendMessage(String message) { if (tracer != null) { Span newSpan = tracer.nextSpan().name(“sendMessage”).start(); try (Tracer.SpanInScope ws = tracer.withSpan(newSpan.start())) { // Do something with the message // … } finally { newSpan.tag(“message”, message); newSpan.finish(); } } else { // Fallback behavior when Tracer is not available // … } } 在上面的示例中,我们注入了一个名为tracer的Tracer对象,并使用nextSpan()方法创建一个新的Span。然后,我们使用withSpan()方法将当前线程与新的Span关联,并执行需要进行跟踪的操作(例如发送消息)。最后,我们通过tag()方法将消息内容添加到Span中,并调用finish()方法完成Span。
这样,当执行sendMessage()方法时,MDC会自动将TraceId和SpanId设置为全局属性,并在日志中携带它们。
在RocketMQ Spring中,可以通过配置RocketMQ的
ProducerInterceptor
和ConsumerInterceptor
来实现在日志中携带traceId
和spanId
。在
ProducerInterceptor
中,你可以通过MessageExt
的putUserProperty
方法将traceId
和spanId
添加到消息的用户属性中,例如:在
ConsumerInterceptor
中,你可以通过MessageExt
的getUserProperty
方法获取到消息中的traceId
和spanId
,并将其添加到日志中,例如:在配置文件中添加
ProducerInterceptor
和ConsumerInterceptor
:这样就可以在RocketMQ Spring的日志中携带
traceId
和spanId
了。RocketMQ Spring 支持使用 OpenTracing(或 Jaeger)来实现分布式链路追踪,可以通过在消息发送和消费时设置 traceId 和 spanId 来携带这些信息。
具体来说,可以通过在消息体中设置 traceId 和 spanId,然后在消息发送前将这些信息设置到消息的属性中。例如,可以使用 Tracer 接口来生成 traceId 和 spanId,然后将这些信息设置到消息的属性中,例如:
java Copy @Autowired private Tracer tracer;
@Autowired private RocketMQTemplate rocketMQTemplate;
public void sendMessage(String message) { String traceId = tracer.activeSpan().context().toTraceId(); String spanId = tracer.activeSpan().context().toSpanId();
} 在消息消费时,可以通过订阅消息时设置 MessageListenerConcurrently 或 MessageListenerOrderly 接口的实现类,然后在 consumeMessage 方法中获取消息属性中的 traceId 和 spanId。例如:
java Copy @Component @RocketMQMessageListener(topic = “topic”, consumerGroup = “consumer-group”) public class MyMessageListener implements MessageListenerConcurrently {
} 需要注意的是,使用 OpenTracing(或 Jaeger)进行分布式链路追踪需要在应用中引入相应的依赖包,以及配置相应的参数。具体的配置方式可以参考 OpenTracing 或 Jaeger 的官方文档。
可以通过RocketMQ的消息拦截器(MessageInterceptor)来实现在发送和接收消息时携带traceId和spanId,然后通过日志框架将其记录下来。
具体实现步骤如下:
1、定义一个自定义的MessageInterceptor,实现beforeSend和afterConsume方法,在这两个方法中可以获取traceId和spanId,并将其添加到消息的属性中。
2、在Spring Boot中配置RocketMQ的消息拦截器
3、在日志框架中添加traceId和spanId的日志输出
通过这种方式,就可以在RocketMQ的消息中携带traceId和spanId,并通过日志框架将其输出到日志中,从而方便地跟踪分布式调用链路。