这篇文章主要介绍了RocketMQ中如何实现push consumer顺序消费,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。
黔西ssl适用于网站、小程序/APP、API接口等需要进行数据传输应用场景,ssl证书未来市场广阔!成为成都创新互联公司的ssl证书销售渠道,可以享受市场价格4-6折优惠!如果有意向欢迎电话联系或者加微信:028-86922220(备注:SSL证书合作)期待与您的合作!
顺序消费的逻辑实现在类ConsumeMessageOrderlyService中,为了实现消费的有序性需要对queue进行加锁,包括:
在broker对message queue加锁,保证当前client占有该队列
consumer端对MessageQueue加锁,保证当前线程占有该队列
consumer端对ProcessQueue加锁,保证当前线程占有该队列
对broker上message queue加锁是在ConsumeMessageOrderlyService中周期性调度执行的:
// ConsumeMessageOrderlySerivce public void start() { if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { ConsumeMessageOrderlyService.this.lockMQPeriodically(); } }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS); } } public synchronized void lockMQPeriodically() { if (!this.stopped) { // 通过LOCK_BATCH_MQ请求在broker批量锁定mq this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll(); } }
ConsumeMessageOrderlyService中的消费请求提交:
// ConsumeMessageOrderlySerivce public void submitConsumeRequest( final Listmsgs, final ProcessQueue processQueue, final MessageQueue messageQueue, final boolean dispathToConsume) { if (dispathToConsume) { // 提交ConsumeRequest,丢弃了入参的msgs,每次都从ProcessQueue中顺序获取 ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue); this.consumeExecutor.submit(consumeRequest); } }
顺序处理了逻辑:
// ConsumeMessageOrderlyService.ConsumeRequest public void run() { if (this.processQueue.isDropped()) { log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue); return; } // 1.获取MessageQueue上的锁 final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue); synchronized (objLock) { if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) { final long beginTime = System.currentTimeMillis(); for (boolean continueConsume = true; continueConsume; ) { // 循环处理 // ... // 单个ConsumeRequest最长处理时间默认60s long interval = System.currentTimeMillis() - beginTime; if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) { ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10); break; } final int consumeBatchSize = ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); // 2. 从ProcessQueue顺序获取batchSize个消息 Listmsgs = this.processQueue.takeMessags(consumeBatchSize); defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup()); if (!msgs.isEmpty()) { final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue); ConsumeOrderlyStatus status = null; ConsumeMessageContext consumeMessageContext = null; // .... long beginTimestamp = System.currentTimeMillis(); ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; boolean hasException = false; try { // 3. 获取ProcessQueue上的锁 this.processQueue.getLockConsume().lock(); if (this.processQueue.isDropped()) { log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}", this.messageQueue); break; } // 4. 推给业务处理逻辑 status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", RemotingHelper.exceptionSimpleDesc(e), ConsumeMessageOrderlyService.this.consumerGroup, msgs, messageQueue); hasException = true; } finally { this.processQueue.getLockConsume().unlock(); // 解锁 } // ... // 5. 处理消费结果 continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this); } else { continueConsume = false; // ProcessQueue为空,停止本次推送 } } } else { if (this.processQueue.isDropped()) { log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue); return; } ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100); } } }
在processConsumeResult中主要会执行2步操作:
在ProcessQueue上执行commit(),将前一次takeMessages返回的msgs从缓存中删除
更新OffsetStore
感谢你能够认真阅读完这篇文章,希望小编分享的“RocketMQ中如何实现push consumer顺序消费”这篇文章对大家有帮助,同时也希望大家多多支持创新互联,关注创新互联行业资讯频道,更多相关知识等着你来学习!
本文名称:RocketMQ中如何实现pushconsumer顺序消费
文章起源:http://cqwzjz.cn/article/peihgi.html