RocketMQ [TIMEOUT_CLEAN_QUEUE]broker busy异常

场景案例

最近在使用阿里云RocketMQ发送分区消息的时候,偶尔会出现下面的异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
+17
[2022-01-23 20:28:38]
content:
Caused by: com.aliyun.openservices.shade.com.alibaba.rocketmq.client.exception.MQBrokerException: CODE: 2 DESC: [TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: 2006ms, size of queue: 681
+18
[2022-01-23 20:28:38]
content:
For more information, please visit the url, http://rocketmq.apache.org/docs/faq/
+19
[2022-01-23 20:28:38]
content:
at com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.MQClientAPIImpl.processSendResponse(MQClientAPIImpl.java:686)
+20
[2022-01-23 20:28:38]
content:
at com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:461)
+21
[2022-01-23 20:28:38]
content:
at com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:442)
+22
[2022-01-23 20:28:38]
content:
at com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:404)
+23
[2022-01-23 20:28:38]
content:
at com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:753)
+24
[2022-01-23 20:28:38]
content:
at com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:1040)
+25
[2022-01-23 20:28:38]
content:
at com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:949)
+26
[2022-01-23 20:28:38]
content:
at com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:944)
+27
[2022-01-23 20:28:38]
content:
at com.aliyun.openservices.shade.com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:442)
+28
[2022-01-23 20:28:38]
content:
at com.aliyun.openservices.ons.api.impl.rocketmq.OrderProducerImpl.send(OrderProducerImpl.java:119)

原因

通过关键字[TIMEOUT_CLEAN_QUEUE]可以快速定位到异常所在的类-BrokerFastFailure.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
public class BrokerFastFailure {
//省略了一部分代码
public void start() {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
if (brokerController.getBrokerConfig().isBrokerFastFailureEnable()) {
cleanExpiredRequest();
}
}
}, 1000, 10, TimeUnit.MILLISECONDS);
}

private void cleanExpiredRequest() {
while (this.brokerController.getMessageStore().isOSPageCacheBusy()) {
try {
if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) {
final Runnable runnable = this.brokerController.getSendThreadPoolQueue().poll(0, TimeUnit.SECONDS);
if (null == runnable) {
break;
}

final RequestTask rt = castRunnable(runnable);
rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", System.currentTimeMillis() - rt.getCreateTimestamp(), this.brokerController.getSendThreadPoolQueue().size()));
} else {
break;
}
} catch (Throwable ignored) {
}
}

//清除发送队列中的失效请求
cleanExpiredRequestInQueue(this.brokerController.getSendThreadPoolQueue(),
this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue());

//清除消费队列中的失效请求
cleanExpiredRequestInQueue(this.brokerController.getPullThreadPoolQueue(),
this.brokerController.getBrokerConfig().getWaitTimeMillsInPullQueue());

//清除心跳队列中的失效请求
cleanExpiredRequestInQueue(this.brokerController.getHeartbeatThreadPoolQueue(),
this.brokerController.getBrokerConfig().getWaitTimeMillsInHeartbeatQueue());

//清除事务队列中的失效请求
cleanExpiredRequestInQueue(this.brokerController.getEndTransactionThreadPoolQueue(), this
.brokerController.getBrokerConfig().getWaitTimeMillsInTransactionQueue());
}

void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, final long maxWaitTimeMillsInQueue) {
while (true) {
try {
if (!blockingQueue.isEmpty()) {
final Runnable runnable = blockingQueue.peek();
if (null == runnable) {
break;
}
final RequestTask rt = castRunnable(runnable);
if (rt == null || rt.isStopRun()) {
break;
}

final long behind = System.currentTimeMillis() - rt.getCreateTimestamp();
if (behind >= maxWaitTimeMillsInQueue) {
if (blockingQueue.remove(runnable)) {
rt.setStopRun(true);
rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, blockingQueue.size()));
}
} else {
break;
}
} else {
break;
}
} catch (Throwable ignored) {
}
}
}
//省略了一部分代码
}

上面开启了一个定时任务线程池,每隔10ms执行一次任务。如果发送队列的头元素等待时间超过该队列设置的最大等待时间(waitTimeMillsInSendQueue,默认为200ms),则丢弃该元素对象的任务,并对这个请求返回[TIMEOUT_CLEAN_QUEUE]broker busy异常信息。这里其中涉及到Broker快速机制。

broker快速失败原理


当Broker收到客户端的请求后会将消息先放入队列(SendThreadPoolQueue,默认容量为10000)。Broker 会专门使用一个线程池(SendMessageExecutor)去从队列中获取任务并执行消息写入请求,为了保证消息的顺序处理,该线程池默认线程个数为1。为了避免队列中存在过多无效超时的发送请求(消息发送端的默认超时时间为3s), Broker 端快速失败机制,即开启一个定时调度线程,每隔10ms去检查发送队列中的头元素,如果头元素等待时间超过200ms就会启动快速失败,向客户端返回[TIMEOUT_CLEAN_QUEUE]broker busy,让客户端重试发送消息。

发送消息线程池线程个数为什么设置为1

因为RocketMQ4.0.x 版本后默认使用自旋锁,这样能够减少CPU上下文切换,提高性能。自旋锁适合在并发低的时候使用,在多线程场景,即在并发高的时候就要使用可重入锁了。
可以用过以下的参数来进行调整

1
2
3
4
5
6
7
//默认
sendMessageThreadPoolNums=1
useReentrantLockWhenPutMessage=false

//调整后
sendMessageThreadPoolNums=32
useReentrantLockWhenPutMessage=true

如果sendMessageThreadPoolNums > 1,在使用绝对顺序消息时,是无法保证消息的顺序的,这里会有多个线程处理一个队列的消息,顺序错乱。

解决方案

调整broker参数

调大broker的默认发送消息任务队列等待时长waitTimeMillsInSendQueue值(单位: ms)

消息重试

从异常日志com.aliyun.openservices.shade.com.alibaba.rocketmq.client.exception.MQBrokerException: CODE: 2可以看出,
可以在业务逻辑处理对MQ异常捕获,如果捕获到异常为MQBrokerException并且responseCode为2,则重发消息。这里只能手动重试,因为在MQBrokerException异常中,只有以下几种responseCode可以自动进行重试:

1
2
3
4
5
6
7
8
private final Set<Integer> retryResponseCodes = new CopyOnWriteArraySet<Integer>(Arrays.asList(
ResponseCode.TOPIC_NOT_EXIST,
ResponseCode.SERVICE_NOT_AVAILABLE,
ResponseCode.SYSTEM_ERROR,
ResponseCode.NO_PERMISSION,
ResponseCode.NO_BUYER_ID,
ResponseCode.NOT_IN_CURRENT_UNIT
));

以上定义在DefaultMQProducer.java类中

状态码映射

  • RemotingSysResponseCode.SYSTEM_BUSY: 2
  • ResponseCode.TOPIC_NOT_EXIST: 17
  • ResponseCode.SERVICE_NOT_AVAILABLE: 14
  • ResponseCode.SYSTEM_ERROR: 1
  • ResponseCode.NO_PERMISSION: 16
  • ResponseCode.NO_BUYER_ID: 204
  • ResponseCode.NOT_IN_CURRENT_UNIT: 205

我们可以在DefaultMQProducerImpl的sendDefaultImpl()方法看到对MQBrokerException异常的处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
...
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
...
} catch (RemotingException e) {
...
continue;
} catch (MQClientException e) {
...
continue;
} catch (MQBrokerException e) {
....
if (this.defaultMQProducer.getRetryResponseCodes().contains(e.getResponseCode())) {
continue;
} else {
if (sendResult != null) {
return sendResult;
}

throw e;
}
} catch (InterruptedException e) {
...
throw e;
}
} else {
break;
}
}

if (sendResult != null) {
return sendResult;
}

...
}
}

存 + 定时器扫描

这里的存可以是存在数据库或者redis等其他存储介质。只有当出现上述异常时,才进行存储,所以数据量相对会小很多,然后用xxl-job定时器定时扫描,重新发送消息。