RocketMQ Broker消息处理流程剩余源码解析
创始人
2024-05-28 18:19:11
0

🍊 Java学习:Java从入门到精通总结

🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想

🍊 绝对不一样的职场干货:大厂最佳实践经验指南


📆 最近更新:2023年3月4日

🍊 个人简介:通信工程本硕 for NJU💪、Java程序员🌕。做过科研paper,发过专利,优秀的程序员不应该只是CRUD

🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!


文章目录

  • DefaultMessageStore
  • CommitLog
  • MappedFileQueue
  • MappedFile
  • submitFlushRequest

DefaultMessageStore

接上文,SendMessageProcessor对象接收到消息之后,会把消息变成存储对象DefaultStoreMessage实例:
在这里插入图片描述
DefaultMessageStore的默认存储消息的方法asyncPutMessage如下:

@Override
public CompletableFuture asyncPutMessage(MessageExtBrokerInner msg) {PutMessageStatus checkStoreStatus = this.checkStoreStatus();if (checkStoreStatus != PutMessageStatus.PUT_OK) {return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));}PutMessageStatus msgCheckStatus = this.checkMessage(msg);if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));}long beginTime = this.getSystemClock().now();CompletableFuture putResultFuture = this.commitLog.asyncPutMessage(msg);putResultFuture.thenAccept((result) -> {long elapsedTime = this.getSystemClock().now() - beginTime;if (elapsedTime > 500) {log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);}this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);if (null == result || !result.isOk()) {this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();}});return putResultFuture;
}

整个存储流程的代码还是比较清晰的:

  1. 先判断消息是否投放成功 及 检查消息的格式
  2. 上述检查都没问题时就会把消息存储在CommitLog
CompletableFuture putResultFuturethis.commitLog.asyncPutMessage(msg);

CommitLog

Broker接收到消息后,最终消息存储在Commitlog对象中,调用的是CommitLogputMessage方法

 public CompletableFuture asyncPutMessage(final MessageExtBrokerInner msg) {// Set the storage timemsg.setStoreTimestamp(System.currentTimeMillis());// Set the message body BODY CRC (consider the most appropriate setting// on the client)msg.setBodyCRC(UtilAll.crc32(msg.getBody()));// Back to ResultsAppendMessageResult result = null;StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();String topic = msg.getTopic();int queueId = msg.getQueueId();// 延时消息处理,事务为TRANSACTION_PREPARED_TYPE 和 TRANSACTION_ROLLBACK_TYPE 消息不支持延时投递final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {// Delay Deliveryif (msg.getDelayTimeLevel() > 0) {if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}// 存储消息时,延时消息进入SCHEDULE_TOPIC_XXXX的主题下topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;// 消息队列编号 = 延迟级别 - 1queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// Backup real topic, queueIdMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));msg.setTopic(topic);msg.setQueueId(queueId);}}InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();if (bornSocketAddress.getAddress() instanceof Inet6Address) {msg.setBornHostV6Flag();}InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();if (storeSocketAddress.getAddress() instanceof Inet6Address) {msg.setStoreHostAddressV6Flag();}PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);if (encodeResult != null) {return CompletableFuture.completedFuture(encodeResult);}msg.setEncodedBuff(putMessageThreadLocal.getEncoder().encoderBuffer);PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));long elapsedTimeInLock = 0;MappedFile unlockMappedFile = null;// 加锁,同一时刻只能有一个线程put消息putMessageLock.lock(); //spin or ReentrantLock ,depending on store configtry {MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();this.beginTimeInLock = beginLockTimestamp;// Here settings are stored timestamp, in order to ensure an orderly// globalmsg.setStoreTimestamp(beginLockTimestamp);// 只有不存在映射文件或文件已存满,才进行创建if (null == mappedFile || mappedFile.isFull()) {mappedFile = this.mappedFileQueue.getLastMappedFile(0); }if (null == mappedFile) {log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());beginTimeInLock = 0;return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));}// 追加消息至MappedFile的缓存中,更新写入位置wrotePositionresult = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);switch (result.getStatus()) {case PUT_OK:break;// 当文件剩余空间不足时,创建新的MappedFile并写入case END_OF_FILE: unlockMappedFile = mappedFile;// Create a new file, re-write the messagemappedFile = this.mappedFileQueue.getLastMappedFile(0);if (null == mappedFile) {// XXX: warn and notify melog.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());beginTimeInLock = 0;return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));}result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);break;case MESSAGE_SIZE_EXCEEDED:case PROPERTIES_SIZE_EXCEEDED:beginTimeInLock = 0;return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));case UNKNOWN_ERROR:beginTimeInLock = 0;return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));default:beginTimeInLock = 0;return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));}elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;beginTimeInLock = 0;} finally {// 释放锁putMessageLock.unlock();}/** log **/if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {this.defaultMessageStore.unlockMappedFile(unlockMappedFile);}PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);// StatisticsstoreStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());CompletableFuture flushResultFuture = submitFlushRequest(result, msg);CompletableFuture replicaResultFuture = submitReplicaRequest(result, msg);// 异步刷盘流程return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {if (flushStatus != PutMessageStatus.PUT_OK) {putMessageResult.setPutMessageStatus(flushStatus);}if (replicaStatus != PutMessageStatus.PUT_OK) {putMessageResult.setPutMessageStatus(replicaStatus);if (replicaStatus == PutMessageStatus.FLUSH_SLAVE_TIMEOUT) {log.error("do sync transfer other node, wait return, but failed, topic: {} tags: {} client address: {}",msg.getTopic(), msg.getTags(), msg.getBornHostNameString());}}return putMessageResult;});}

消息在CommitLog文件中是顺序存储的

RocketMQ消息存储在CommitLog文件里,最终落盘,对应的类为MappedFile,它是从MappedFileQueue中获取的,如果对象不存在,就会创建:

MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();// 只有不存在映射文件或文件已存满,才进行创建
if (null == mappedFile || mappedFile.isFull()) {mappedFile = this.mappedFileQueue.getLastMappedFile(0); 
}

创建(获取)完成对象之后就会把消息插入到mappedFile里,如果文件放不下了,则会重新创建一个mappedFile来对其进行写入,最后就是使用异步Future的方式把消息持久化到磁盘上。


MappedFileQueue

上面的源码里首先从MappedFileQueue映射队列尾部中获取MappedFile对象:

public MappedFile getLastMappedFile() {MappedFile mappedFileLast = null;while (!this.mappedFiles.isEmpty()) {try {mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);break;} catch (IndexOutOfBoundsException e) {//continue;} catch (Exception e) {log.error("getLastMappedFile has exception.", e);break;}}return mappedFileLast;}

MappedFile对象为空时,表示MappedFile对象不存在,此时就需要重新创建一个MappedFile对象,相应的方法在MappedFileQueue

public class MappedFileQueue {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);private static final InternalLogger LOG_ERROR = InternalLoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);// 批量删除文件上限private static final int DELETE_FILES_BATCH_MAX = 10;// 目录private final String storePath;// 每个映射文件的大小private final int mappedFileSize;// 映射文件数组private final CopyOnWriteArrayList mappedFiles = new CopyOnWriteArrayList();// 分配MappedFile服务private final AllocateMappedFileService allocateMappedFileService;// 最后flush到的offsetprivate long flushedWhere = 0;// 最后commit到的offsetprivate long committedWhere = 0;// 最后保存的时间戳private volatile long storeTimestamp = 0;/***	获取最后一个可写入的映射文件*	当最后一个文件已经满的时候,创建一个新的文件*/public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {long createOffset = -1;MappedFile mappedFileLast = getLastMappedFile();// 不存在映射文件if (mappedFileLast == null) {// 计算从哪个offset开始createOffset = startOffset - (startOffset % this.mappedFileSize);}// 最后一个文件已满if (mappedFileLast != null && mappedFileLast.isFull()) {createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;}// 创建文件if (createOffset != -1 && needCreate) {String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);String nextNextFilePath = this.storePath + File.separator+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);MappedFile mappedFile = null;if (this.allocateMappedFileService != null) {mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,nextNextFilePath, this.mappedFileSize);} else {try {mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);} catch (IOException e) {log.error("create mappedFile exception", e);}}if (mappedFile != null) {if (this.mappedFiles.isEmpty()) {mappedFile.setFirstCreateInQueue(true);}this.mappedFiles.add(mappedFile);}return mappedFile;}return mappedFileLast;}public MappedFile getLastMappedFile(final long startOffset) {return getLastMappedFile(startOffset, true);}
}

当获取到的“最后一个”MappedFile不存在或文件已满时,则新建一个,并计算新文件的createOffset

MappedFile

CommitLog的代码中,最终把消息追加到MappedFile文件的缓冲区中,同时更新其写入位置writePosition,但是还没有刷盘:

result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext);

其中调用了MappedFileappendMessage方法实现添加消息到消息映射文件:

public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,PutMessageContext putMessageContext) {assert messageExt != null;assert cb != null;int currentPos = this.wrotePosition.get();if (currentPos < this.fileSize) {// 缓冲区ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();byteBuffer.position(currentPos);AppendMessageResult result;// 判断是否是批量操作if (messageExt instanceof MessageExtBrokerInner) {result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,(MessageExtBrokerInner) messageExt, putMessageContext);} else if (messageExt instanceof MessageExtBatch) {result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,(MessageExtBatch) messageExt, putMessageContext);} else {return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);}// 更新写入位置 + 写入偏移量this.wrotePosition.addAndGet(result.getWroteBytes());this.storeTimestamp = result.getStoreTimestamp();return result;}log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);}

通过源码可以看到,实际上是吧消息放入ByteBuffer,同时更新写入位置和偏移量


submitFlushRequest

提交刷盘请求的源码如下:

public CompletableFuture submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {// 异步flushif (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;if (messageExt.isWaitStoreMsgOK()) {GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());service.putRequest(request);return request.future();} else {service.wakeup();return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);}}// 同步flushelse {if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {flushCommitLogService.wakeup();	// 异步,使用MappedByteBuffer(默认策略)} else  {commitLogService.wakeup();	// 异步,使用字节缓冲区 + FileChannel}return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);}}

CommitLog文件刷盘是由FlushCommitLogService服务具体执行,一共有两种刷盘策略:

  1. FlushDiskType.SYNC_FLUSH:同步刷盘
  2. FlushDiskType.ASYNC_FLUSH:异步刷盘

同步刷盘的情况下,必须要等到数据刷盘成功后才会有返回结果;如果是异步刷盘,只需要把消息放入内存之后即可返回。

上述策略可以在broker.conf配置文件中进行配置

相关内容

热门资讯

nova 4刷安卓系统,体验全... 最近手机界可是热闹非凡呢!听说华为nova 4要刷安卓系统了,这可真是让人兴奋不已。你有没有想过,你...
如果当初没有安卓系统,科技世界... 想象如果没有安卓系统,我们的生活会是怎样的呢?是不是觉得有点不可思议?别急,让我们一起穿越时空,探索...
安卓电视装win系统,系统转换... 亲爱的读者们,你是否曾想过,在你的安卓电视上装一个Windows系统,让它瞬间变身成为一台功能强大的...
安卓手机还原系统好处,重拾流畅... 你有没有遇到过安卓手机卡顿、运行缓慢的情况?别急,今天就来给你揭秘一下安卓手机还原系统的那些好处,让...
安卓系统能跑win吗,探索跨平... 你有没有想过,你的安卓手机里能不能装上Windows系统呢?这听起来是不是有点像科幻电影里的情节?别...
安卓车载系统蓝牙设置,畅享智能... 你有没有发现,现在开车的时候,手机和车载系统之间的互动越来越频繁了呢?这不,今天就来给你详细说说安卓...
奥利奥安卓系统,探索新一代智能... 你有没有想过,一块小小的奥利奥饼干竟然能和强大的安卓系统扯上关系?没错,今天就要来聊聊这个跨界组合,...
微信使用安卓系统,功能解析与操... 你有没有发现,现在用微信的人越来越多了呢?尤其是安卓系统的用户,简直就像潮水一样涌来。今天,就让我带...
体验最新原生安卓系统,极致体验... 你有没有想过,手机系统就像是我们生活的调味品,有时候换一种口味,生活都会变得有趣起来呢?最近,我体验...
安卓系统能玩原神,尽享奇幻冒险... 你有没有想过,在安卓系统上也能畅玩《原神》这样的热门游戏呢?没错,就是那个画面精美、角色丰富、玩法多...
安卓写手机银行系统,基于安卓平... 你有没有想过,手机银行系统在我们日常生活中扮演了多么重要的角色呢?每天刷刷手机,就能轻松管理账户,转...
僵尸之夜恐怖安卓系统,揭秘恐怖... 僵尸之夜,恐怖安卓系统来袭!想象一个寂静的夜晚,你正沉浸在美梦中,突然,一阵诡异的铃声打破了夜的宁静...
谷歌框架和安卓系统,构建智能移... 你有没有想过,为什么你的手机那么聪明,能帮你找到路线,还能帮你拍出美美的照片呢?这都要归功于一个超级...
安卓系统和oppo系统哪个流畅... 你有没有想过,手机系统哪个更流畅呢?安卓系统和OPPO系统,这两个名字听起来就让人心动。今天,咱们就...
安卓怎么用微软系统,利用微软系... 你是不是也和我一样,对安卓手机上的微软系统充满了好奇?想象那熟悉的Windows界面在你的安卓手机上...
安卓系统如何安装nfc,安卓系... 你有没有想过,用手机刷公交卡、支付账单,是不是比掏出钱包来得酷炫多了?这就得归功于NFC技术啦!今天...
ios系统可以转安卓,跨平台应... 你有没有想过,你的iPhone手机里的那些宝贝应用,能不能搬到安卓手机上继续使用呢?没错,今天就要来...
iOSapp移植到安卓系统,i... 你有没有想过,那些在iOS上让你爱不释手的app,是不是也能在安卓系统上大放异彩呢?今天,就让我带你...
现在安卓随便换系统,探索个性化... 你知道吗?现在安卓手机换系统简直就像换衣服一样简单!没错,就是那种随时随地、随心所欲的感觉。今天,就...
安卓系统安装按钮灰色,探究原因... 最近发现了一个让人头疼的小问题,那就是安卓手机的安装按钮突然变成了灰色,这可真是让人摸不着头脑。你知...