RocketMQ Broker消息处理流程剩余源码解析
创始人
2024-05-28 18:19:11
0

🍊 Java学习:Java从入门到精通总结

🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想

🍊 绝对不一样的职场干货:大厂最佳实践经验指南


📆 最近更新:2023年3月4日

🍊 个人简介:通信工程本硕 for NJU💪、Java程序员🌕。做过科研paper,发过专利,优秀的程序员不应该只是CRUD

🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!


文章目录

  • DefaultMessageStore
  • CommitLog
  • MappedFileQueue
  • MappedFile
  • submitFlushRequest

DefaultMessageStore

接上文,SendMessageProcessor对象接收到消息之后,会把消息变成存储对象DefaultStoreMessage实例:
在这里插入图片描述
DefaultMessageStore的默认存储消息的方法asyncPutMessage如下:

@Override
public CompletableFuture asyncPutMessage(MessageExtBrokerInner msg) {PutMessageStatus checkStoreStatus = this.checkStoreStatus();if (checkStoreStatus != PutMessageStatus.PUT_OK) {return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));}PutMessageStatus msgCheckStatus = this.checkMessage(msg);if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));}long beginTime = this.getSystemClock().now();CompletableFuture putResultFuture = this.commitLog.asyncPutMessage(msg);putResultFuture.thenAccept((result) -> {long elapsedTime = this.getSystemClock().now() - beginTime;if (elapsedTime > 500) {log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);}this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);if (null == result || !result.isOk()) {this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();}});return putResultFuture;
}

整个存储流程的代码还是比较清晰的:

  1. 先判断消息是否投放成功 及 检查消息的格式
  2. 上述检查都没问题时就会把消息存储在CommitLog
CompletableFuture putResultFuturethis.commitLog.asyncPutMessage(msg);

CommitLog

Broker接收到消息后,最终消息存储在Commitlog对象中,调用的是CommitLogputMessage方法

 public CompletableFuture asyncPutMessage(final MessageExtBrokerInner msg) {// Set the storage timemsg.setStoreTimestamp(System.currentTimeMillis());// Set the message body BODY CRC (consider the most appropriate setting// on the client)msg.setBodyCRC(UtilAll.crc32(msg.getBody()));// Back to ResultsAppendMessageResult result = null;StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();String topic = msg.getTopic();int queueId = msg.getQueueId();// 延时消息处理,事务为TRANSACTION_PREPARED_TYPE 和 TRANSACTION_ROLLBACK_TYPE 消息不支持延时投递final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {// Delay Deliveryif (msg.getDelayTimeLevel() > 0) {if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}// 存储消息时,延时消息进入SCHEDULE_TOPIC_XXXX的主题下topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;// 消息队列编号 = 延迟级别 - 1queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// Backup real topic, queueIdMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));msg.setTopic(topic);msg.setQueueId(queueId);}}InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();if (bornSocketAddress.getAddress() instanceof Inet6Address) {msg.setBornHostV6Flag();}InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();if (storeSocketAddress.getAddress() instanceof Inet6Address) {msg.setStoreHostAddressV6Flag();}PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);if (encodeResult != null) {return CompletableFuture.completedFuture(encodeResult);}msg.setEncodedBuff(putMessageThreadLocal.getEncoder().encoderBuffer);PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));long elapsedTimeInLock = 0;MappedFile unlockMappedFile = null;// 加锁,同一时刻只能有一个线程put消息putMessageLock.lock(); //spin or ReentrantLock ,depending on store configtry {MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();this.beginTimeInLock = beginLockTimestamp;// Here settings are stored timestamp, in order to ensure an orderly// globalmsg.setStoreTimestamp(beginLockTimestamp);// 只有不存在映射文件或文件已存满,才进行创建if (null == mappedFile || mappedFile.isFull()) {mappedFile = this.mappedFileQueue.getLastMappedFile(0); }if (null == mappedFile) {log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());beginTimeInLock = 0;return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));}// 追加消息至MappedFile的缓存中,更新写入位置wrotePositionresult = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);switch (result.getStatus()) {case PUT_OK:break;// 当文件剩余空间不足时,创建新的MappedFile并写入case END_OF_FILE: unlockMappedFile = mappedFile;// Create a new file, re-write the messagemappedFile = this.mappedFileQueue.getLastMappedFile(0);if (null == mappedFile) {// XXX: warn and notify melog.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());beginTimeInLock = 0;return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));}result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);break;case MESSAGE_SIZE_EXCEEDED:case PROPERTIES_SIZE_EXCEEDED:beginTimeInLock = 0;return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));case UNKNOWN_ERROR:beginTimeInLock = 0;return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));default:beginTimeInLock = 0;return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));}elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;beginTimeInLock = 0;} finally {// 释放锁putMessageLock.unlock();}/** log **/if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {this.defaultMessageStore.unlockMappedFile(unlockMappedFile);}PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);// StatisticsstoreStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());CompletableFuture flushResultFuture = submitFlushRequest(result, msg);CompletableFuture replicaResultFuture = submitReplicaRequest(result, msg);// 异步刷盘流程return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {if (flushStatus != PutMessageStatus.PUT_OK) {putMessageResult.setPutMessageStatus(flushStatus);}if (replicaStatus != PutMessageStatus.PUT_OK) {putMessageResult.setPutMessageStatus(replicaStatus);if (replicaStatus == PutMessageStatus.FLUSH_SLAVE_TIMEOUT) {log.error("do sync transfer other node, wait return, but failed, topic: {} tags: {} client address: {}",msg.getTopic(), msg.getTags(), msg.getBornHostNameString());}}return putMessageResult;});}

消息在CommitLog文件中是顺序存储的

RocketMQ消息存储在CommitLog文件里,最终落盘,对应的类为MappedFile,它是从MappedFileQueue中获取的,如果对象不存在,就会创建:

MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();// 只有不存在映射文件或文件已存满,才进行创建
if (null == mappedFile || mappedFile.isFull()) {mappedFile = this.mappedFileQueue.getLastMappedFile(0); 
}

创建(获取)完成对象之后就会把消息插入到mappedFile里,如果文件放不下了,则会重新创建一个mappedFile来对其进行写入,最后就是使用异步Future的方式把消息持久化到磁盘上。


MappedFileQueue

上面的源码里首先从MappedFileQueue映射队列尾部中获取MappedFile对象:

public MappedFile getLastMappedFile() {MappedFile mappedFileLast = null;while (!this.mappedFiles.isEmpty()) {try {mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);break;} catch (IndexOutOfBoundsException e) {//continue;} catch (Exception e) {log.error("getLastMappedFile has exception.", e);break;}}return mappedFileLast;}

MappedFile对象为空时,表示MappedFile对象不存在,此时就需要重新创建一个MappedFile对象,相应的方法在MappedFileQueue

public class MappedFileQueue {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);private static final InternalLogger LOG_ERROR = InternalLoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);// 批量删除文件上限private static final int DELETE_FILES_BATCH_MAX = 10;// 目录private final String storePath;// 每个映射文件的大小private final int mappedFileSize;// 映射文件数组private final CopyOnWriteArrayList mappedFiles = new CopyOnWriteArrayList();// 分配MappedFile服务private final AllocateMappedFileService allocateMappedFileService;// 最后flush到的offsetprivate long flushedWhere = 0;// 最后commit到的offsetprivate long committedWhere = 0;// 最后保存的时间戳private volatile long storeTimestamp = 0;/***	获取最后一个可写入的映射文件*	当最后一个文件已经满的时候,创建一个新的文件*/public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {long createOffset = -1;MappedFile mappedFileLast = getLastMappedFile();// 不存在映射文件if (mappedFileLast == null) {// 计算从哪个offset开始createOffset = startOffset - (startOffset % this.mappedFileSize);}// 最后一个文件已满if (mappedFileLast != null && mappedFileLast.isFull()) {createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;}// 创建文件if (createOffset != -1 && needCreate) {String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);String nextNextFilePath = this.storePath + File.separator+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);MappedFile mappedFile = null;if (this.allocateMappedFileService != null) {mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,nextNextFilePath, this.mappedFileSize);} else {try {mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);} catch (IOException e) {log.error("create mappedFile exception", e);}}if (mappedFile != null) {if (this.mappedFiles.isEmpty()) {mappedFile.setFirstCreateInQueue(true);}this.mappedFiles.add(mappedFile);}return mappedFile;}return mappedFileLast;}public MappedFile getLastMappedFile(final long startOffset) {return getLastMappedFile(startOffset, true);}
}

当获取到的“最后一个”MappedFile不存在或文件已满时,则新建一个,并计算新文件的createOffset

MappedFile

CommitLog的代码中,最终把消息追加到MappedFile文件的缓冲区中,同时更新其写入位置writePosition,但是还没有刷盘:

result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext);

其中调用了MappedFileappendMessage方法实现添加消息到消息映射文件:

public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,PutMessageContext putMessageContext) {assert messageExt != null;assert cb != null;int currentPos = this.wrotePosition.get();if (currentPos < this.fileSize) {// 缓冲区ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();byteBuffer.position(currentPos);AppendMessageResult result;// 判断是否是批量操作if (messageExt instanceof MessageExtBrokerInner) {result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,(MessageExtBrokerInner) messageExt, putMessageContext);} else if (messageExt instanceof MessageExtBatch) {result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,(MessageExtBatch) messageExt, putMessageContext);} else {return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);}// 更新写入位置 + 写入偏移量this.wrotePosition.addAndGet(result.getWroteBytes());this.storeTimestamp = result.getStoreTimestamp();return result;}log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);}

通过源码可以看到,实际上是吧消息放入ByteBuffer,同时更新写入位置和偏移量


submitFlushRequest

提交刷盘请求的源码如下:

public CompletableFuture submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {// 异步flushif (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;if (messageExt.isWaitStoreMsgOK()) {GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());service.putRequest(request);return request.future();} else {service.wakeup();return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);}}// 同步flushelse {if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {flushCommitLogService.wakeup();	// 异步,使用MappedByteBuffer(默认策略)} else  {commitLogService.wakeup();	// 异步,使用字节缓冲区 + FileChannel}return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);}}

CommitLog文件刷盘是由FlushCommitLogService服务具体执行,一共有两种刷盘策略:

  1. FlushDiskType.SYNC_FLUSH:同步刷盘
  2. FlushDiskType.ASYNC_FLUSH:异步刷盘

同步刷盘的情况下,必须要等到数据刷盘成功后才会有返回结果;如果是异步刷盘,只需要把消息放入内存之后即可返回。

上述策略可以在broker.conf配置文件中进行配置

相关内容

热门资讯

苹果系统安卓爱思助手,系统兼容... 你有没有发现,手机的世界里,苹果系统和安卓系统就像是一对欢喜冤家,总是各有各的粉丝,各有各的拥趸。而...
安卓系统占用很大内存,揭秘内存... 手机里的安卓系统是不是让你感觉内存不够用,就像你的房间堆满了杂物,总是找不到地方放新东西?别急,今天...
安卓系统p30,安卓系统下的摄... 你有没有发现,最近安卓系统P30在手机圈里可是火得一塌糊涂呢!这不,我就来给你好好扒一扒这款手机的那...
siri被安卓系统进入了,智能... 你知道吗?最近科技圈可是炸开了锅,因为一个大家伙——Siri,竟然悄悄地溜进了安卓系统!这可不是什么...
最强挂机系统和安卓区别,揭秘安... 亲爱的读者,你是否曾在游戏中遇到过这样的困扰:一边想要享受游戏带来的乐趣,一边又不想放弃手中的零食或...
安卓系统为什么设系统盘,保障稳... 你有没有想过,为什么安卓系统里会有一个叫做“系统盘”的东西呢?这可不是随便设置的,背后可是有大学问的...
王者怎么加安卓系统的,轻松提升... 你有没有想过,你的手机里那款超酷的王者荣耀,怎么才能让它更好地在你的安卓系统上运行呢?别急,今天就来...
安卓手机系统怎么开热点,共享网... 你有没有想过,当你身处一个没有Wi-Fi信号的地方,而你的安卓手机里却存满了精彩视频和游戏时,是不是...
安卓系统11的平板电脑,性能升... 你有没有发现,最近平板电脑市场又热闹起来了?没错,安卓系统11的新一代平板电脑正在悄悄地走进我们的生...
安卓手机系统创始人,安卓手机系... 你有没有想过,那些陪伴我们每天生活的安卓手机,它们的灵魂是谁赋予的呢?没错,就是那位神秘而又传奇的安...
安卓11系统速度提升,体验再升... 你知道吗?最近安卓系统又升级啦!这次可是直接跳到了安卓11,听说速度提升了不少呢!是不是很心动?那就...
安卓5.1原生系统设置apk,... 你有没有想过,你的安卓手机里那些看似普通的设置,其实隐藏着不少小秘密呢?今天,就让我带你一探究竟,揭...
手机安卓系统玩音游,畅享指尖音... 你有没有发现,现在手机上的游戏种类越来越丰富,尤其是音游,简直让人爱不释手!今天,就让我来给你详细介...
安卓系统与win10,系统融合... 你有没有想过,为什么你的手机里装的是安卓系统,而电脑上却是Windows 10呢?这两种操作系统,就...
苹果系统王者安卓系统可以登吗,... 你有没有想过,为什么苹果系统的手机那么受欢迎,而安卓系统的手机却也能在市场上占有一席之地呢?今天,咱...
安卓系统怎么重制系统还原,安卓... 手机用久了是不是感觉卡得要命,想给它来个大变身?别急,今天就来教你怎么给安卓手机重置系统,让它焕然一...
安卓9系统怎样应用分身,轻松实... 你有没有发现,手机里的APP越来越多,有时候一个APP里还要处理好多任务,分身功能简直就是救星啊!今...
获取安卓系统的ip地址,轻松获... 你有没有想过,你的安卓手机里隐藏着一个神秘的IP地址?没错,就是那个能让你在网络世界里找到自己的小秘...
LG彩电安卓系统升级,畅享智能... 你家的LG彩电是不是最近有点儿“闹别扭”,屏幕上时不时地跳出个升级提示?别急,今天就来给你详细说说这...
阴阳师安卓苹果系统,安卓与苹果... 亲爱的玩家们,你是否曾在深夜里,手握手机,沉浸在阴阳师的神秘世界?今天,就让我带你一起探索这款风靡全...