+17 [2022-01-2320:28:38] content: Caused by: com.aliyun.openservices.shade.com.alibaba.rocketmq.client.exception.MQBrokerException: CODE: 2 DESC: [TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: 2006ms, size of queue: 681 +18 [2022-01-2320:28:38] content: For more information, please visit the url, http://rocketmq.apache.org/docs/faq/ +19 [2022-01-2320:28:38] content: at com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.MQClientAPIImpl.processSendResponse(MQClientAPIImpl.java:686) +20 [2022-01-2320:28:38] content: at com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:461) +21 [2022-01-2320:28:38] content: at com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:442) +22 [2022-01-2320:28:38] content: at com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:404) +23 [2022-01-2320:28:38] content: at com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:753) +24 [2022-01-2320:28:38] content: at com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:1040) +25 [2022-01-2320:28:38] content: at com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:949) +26 [2022-01-2320:28:38] content: at com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:944) +27 [2022-01-2320:28:38] content: at com.aliyun.openservices.shade.com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:442) +28 [2022-01-2320:28:38] content: at com.aliyun.openservices.ons.api.impl.rocketmq.OrderProducerImpl.send(OrderProducerImpl.java:119)
//清除事务队列中的失效请求 cleanExpiredRequestInQueue(this.brokerController.getEndTransactionThreadPoolQueue(), this .brokerController.getBrokerConfig().getWaitTimeMillsInTransactionQueue()); }
voidcleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, finallong maxWaitTimeMillsInQueue){ while (true) { try { if (!blockingQueue.isEmpty()) { final Runnable runnable = blockingQueue.peek(); if (null == runnable) { break; } final RequestTask rt = castRunnable(runnable); if (rt == null || rt.isStopRun()) { break; }
finallong behind = System.currentTimeMillis() - rt.getCreateTimestamp(); if (behind >= maxWaitTimeMillsInQueue) { if (blockingQueue.remove(runnable)) { rt.setStopRun(true); rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, blockingQueue.size())); } } else { break; } } else { break; } } catch (Throwable ignored) { } } } //省略了一部分代码 }