← Home

RocketMQ 3.3.4 Broker

13 October, 2020

差不多可以看消息队列的源码了。 在下从gitee上找到了rocketmq的早期版本(3.2.2), 坏消息是这个2014年的项目里没有单元测试极少, 调试会比较困难. 好消息是这个时候的RocketMQ还没开源多久,里面有很多中文注释。看起来会很舒服。

我们从Broker开始涂鸦。关于RocketMQ中每个角色的作用这里不再陈述:

先从初始化开始:

    public static void main(String[] args) {
        start(createBrokerController(args));
    }

rocketmq是从commandline启动的,createBrokerController函数比较长, 会有很多额外的逻辑干扰你,我这里直接说重点:

服务控制对象: Broker各个服务控制器,包括存储层配置,配置文件版本号,消费进度存储,Consumer连接、订阅关系管理等等。

以上就是createBrokerController的内容,函数虽然长,但是并不复杂。

下面为start函数的内容, 在main中的start函数实际上是去委托了BrokerController去执行.

    public void start() throws Exception {

        // 启动Broker的各层服务

        if (this.messageStore != null) {
            this.messageStore.start();
        }

        if (this.remotingServer != null) {
            this.remotingServer.start();
        }

        if (this.brokerOuterAPI != null) {
            this.brokerOuterAPI.start();
        }

        if (this.pullRequestHoldService != null) {
            this.pullRequestHoldService.start();
        }

        if (this.clientHousekeepingService != null) {
            this.clientHousekeepingService.start();
        }

        if (this.filterServerManager != null) {
            this.filterServerManager.start();
        }

        // 启动时,注册该Broker的信息到所有的NameServer
        this.registerBrokerAll(true);

        // 定时注册Broker到Name Server
        this.scheduledExecutorService.scheduleAtFixedRate(() -> {
            try {
                this.registerBrokerAll(true);
            } catch (Exception e) {
                log.error("registerBrokerAll Exception", e);
            }
        }, 1000 * 10, 1000 * 30, TimeUnit.MILLISECONDS);

        if (this.brokerStatsManager != null) {
            // 看起来就是一些数据统计线程
            this.brokerStatsManager.start();
        }

        // 删除多余的Topic
        this.addDeleteTopicTask();
    }

整个Borker的流程差不多就是这样.代码里并没有什么亮点说实话.