锐单电子商城 , 一站式电子元器件采购平台!
  • 电话:400-990-0325

SpringCloud(第 049 篇)Netflix Eureka 源码深入剖析(上)

时间:2023-03-07 06:30:00 046继电器

SpringCloud(第 049 篇)Netflix Eureka 深入分析源码(上)

-

一、大致介绍

1.鉴于一些朋友的问题,并提出解释eureka本章应运而生的源码分析; 2.所以我试着从自我理解的角度整理这篇文章Eureka分析源码,希望对大家有所帮助; 3、由于篇幅太长不能在一篇里面发布出来,所以拆分了上下篇;

二、基本原理

1、Eureka Server 每个节点启动后,提供服务注册服务Eureka Server注册,这样Eureka Server所有可用服务节点的信息都将存储在服务注册表中,服务节点的信息可以在界面中直观地看到。 2、Eureka Client 是一个Java 简化和使用客户端Eureka Server客户端还具有内置负载平衡器,采用轮询负载算法。 3、启动应用程序后,向前移动Eureka Server发送心跳(默认周期为30秒),如果Eureka Server心跳在多个心跳周期中没有收到节点,Eureka Server 该服务节点将从服务注册表中删除(默认)90秒)。 4、Eureka Server数据同步将通过复制完成; 5、Eureka Client即使是所有的缓存机制Eureka Server 如果全部挂断,客户端仍然可以利用缓存中的信息消耗其他服务API;

三、EurekaServer 启动过程分析

3.1 跑一下 springms-discovery-eureka 不难发现,我们会看到一些相关的代码 EurekaServer 启动流程日志;

2017-10-22 18:14:17.635  INFO 5288 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Located managed bean 'environmentManager': registering with JMX server as MBean [org.springframework.cloud.context.environment:name=environmentManager,type=EnvironmentManager] 2017-10-22 18:14:17.650  INFO 5288 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Located managed bean 'restartEndpoint': registering with JMX server as MBean [org.springframework.cloud.context.restart:name=restartEndpoint,type=RestartEndpoint] 2017-10-22 18:14:17.661  INFO 5288 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Located managed bean 'refreshScope': registering with JMX server as MBean [org.springframework.cloud.context.scope.refresh:name=refreshScope,type=RefreshScope] 2017-10-22 18:14:17.674  INFO 5288 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Located managed bean 'configurationPropertiesRebinder': registering with JMX server as MBean [org.springframework.cloud.context.properties:name=configurationPropertiesRebinder,context=335b5620,type=ConfigurationPropertiesRebinder] 2017-10-22 18:14:17.683  INFO span class="hljs-number">5288 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Located managed bean 'refreshEndpoint': registering with JMX server as MBean [org.springframework.cloud.endpoint:name=refreshEndpoint,type=RefreshEndpoint]
2017-10-22 18:14:17.926  INFO 5288 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 0
2017-10-22 18:14:17.927  INFO 5288 --- [           main] c.n.e.EurekaDiscoveryClientConfiguration : Registering application unknown with eureka with status UP
2017-10-22 18:14:17.927  INFO 5288 --- [      Thread-10] o.s.c.n.e.server.EurekaServerBootstrap   : Setting the eureka configuration..
2017-10-22 18:14:17.948  INFO 5288 --- [      Thread-10] o.s.c.n.e.server.EurekaServerBootstrap   : isAws returned false
2017-10-22 18:14:17.949  INFO 5288 --- [      Thread-10] o.s.c.n.e.server.EurekaServerBootstrap   : Initialized server context
2017-10-22 18:14:17.949  INFO 5288 --- [      Thread-10] c.n.e.r.PeerAwareInstanceRegistryImpl    : Got 1 instances from neighboring DS node
2017-10-22 18:14:17.949  INFO 5288 --- [      Thread-10] c.n.e.r.PeerAwareInstanceRegistryImpl    : Renew threshold is: 1
2017-10-22 18:14:17.949  INFO 5288 --- [      Thread-10] c.n.e.r.PeerAwareInstanceRegistryImpl    : Changing status to UP
2017-10-22 18:14:17.958  INFO 5288 --- [      Thread-10] e.s.EurekaServerInitializerConfiguration : Started Eureka Server
2017-10-22 18:14:18.019  INFO 5288 --- [           main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8761 (http)
2017-10-22 18:14:18.020  INFO 5288 --- [           main] c.n.e.EurekaDiscoveryClientConfiguration : Updating port to 8761
2017-10-22 18:14:18.023  INFO 5288 --- [           main] c.s.cloud.EurekaServerApplication        : Started EurekaServerApplication in 8.299 seconds (JVM running for 8.886)
【【【【【【 Eureka微服务 】】】】】】已启动.

【分析】:发现有这么一句日志打印“Setting the eureka configuration..”,eureka 开始进行配置,说不定也许就是Eureka Server 流程启动的开
始呢?我们抱着怀疑的心态进入这行日志打印的EurekaServerBootstrap类去看看。

3.2 进入 EurekaServerBootstrap 类看看,看这个类的名字,见名知意,应该就是 EurekaServer 的启动类了;

protected void initEurekaEnvironment() throws Exception {
    log.info("Setting the eureka configuration..");
    。。。
}

【分析一】:我们看到日志在 initEurekaEnvironment 方法中被打印出来,然后我顺着这个方法寻找该方法被调用的地方;

public void contextInitialized(ServletContext context) {
    try {
        initEurekaEnvironment();
        initEurekaServerContext();

        context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
    }
    catch (Throwable e) {
        log.error("Cannot bootstrap eureka server :", e);
        throw new RuntimeException("Cannot bootstrap eureka server :", e);
    }
}

【分析二】:接着发现 contextInitialized 这个方法里面调用了 initEurekaEnvironment 方法,接着我们再往上层寻找被调用的地方;

【分析三】:接着我们看到 EurekaServerInitializerConfiguration 类中有个 start 方法,该方法创建了一个线程来后台执行 EurekaServer 的初始化流程;

3.3 进入 EurekaServerInitializerConfiguration 方法,看看这个所谓的 EurekaServer 初始化配置做了哪些事情?

@Override
public void start() { // 打上断点
    new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                //TODO: is this class even needed now?
                eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
                log.info("Started Eureka Server");

                publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
                EurekaServerInitializerConfiguration.this.running = true;
                publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
            }
            catch (Exception ex) {
                // Help!
                log.error("Could not initialize Eureka servlet context", ex);
            }
        }
    }).start();
}

【分析一】:看到 log.info("Started Eureka Server"); 这行代码,相信大家已经释然了,这里就是所谓的启动了 EurekaServer 了,其实也就是 
eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext) 初始化了一些我们未知的东西;

【分析二】:当打印完启动Eureka Server日志后,调用了两次 publish 方法,该方法最终调用的是 this.applicationContext.publishEvent
(event) 方法,目的是利用Spring中ApplicationContext对事件传递性质,事件发布者(applicationContext)来发布事件(event),但是缺少的是监听
者,其实你仔细搜索下代码,发现好像没有地方对 EurekaServerStartedEvent、EurekaRegistryAvailableEvent 进行监听,奇了怪了,这是咋了呢?

【分析三】:然后找到 EurekaServerStartedEvent 所在的目录下,EurekaInstanceCanceledEvent、EurekaInstanceRegisteredEvent、
EurekaInstanceRenewedEvent、EurekaRegistryAvailableEvent、EurekaServerStartedEvent 有这么几个事件的类,服务下线事件、服务注册事
件、服务续约事件、注册中心启动事件、Eureka Server启动事件,这么几个事件都没有被监听,那么我们是不是给添加上监听,是不是就可以了呢?像这样
 @EventListener  public void listen(EurekaInstanceCanceledEvent event) { 。。。处下线逻辑 },添加 EventListener 监听注解,就可
以在我们自己的代码逻辑中收到这个事件的回调了,所以想想SpringCloud还是挺机制的,提供回调接口让我们自己实现自己的业务逻辑,真心不错;

【分析四】:那么反过来想想,为啥会无缘无故 start 方法就被调用了呢?那么反向继续向上找调用 start 方法的地方,结果找到了 
DefaultLifecycleProcessor类的doStart方法调用了 bean.start(); 这么一段代码;

3.4 进入 DefaultLifecycleProcessor 类看看,这个 EurekaServerInitializerConfiguration.start 方法是如何被触发的?

private void doStart(Map lifecycleBeans, String beanName, boolean autoStartupOnly) {
    // 打上断点
    Lifecycle bean = lifecycleBeans.remove(beanName);
    if (bean != null && !this.equals(bean)) {
        String[] dependenciesForBean = this.beanFactory.getDependenciesForBean(beanName);
        for (String dependency : dependenciesForBean) {
            doStart(lifecycleBeans, dependency, autoStartupOnly);
        }
        if (!bean.isRunning() &&
                (!autoStartupOnly || !(bean instanceof SmartLifecycle) || ((SmartLifecycle) bean).isAutoStartup())) {
            if (logger.isDebugEnabled()) {
                logger.debug("Starting bean '" + beanName + "' of type [" + bean.getClass() + "]");
            }
            try {
                bean.start();
            }
            catch (Throwable ex) {
                throw new ApplicationContextException("Failed to start bean '" + beanName + "'", ex);
            }
            if (logger.isDebugEnabled()) {
                logger.debug("Successfully started bean '" + beanName + "'");
            }
        }
    }
}

【分析一】:看到在 bean.isRunning 等一系列状态的判断下才去调用 bean.start() 方法的,我们再往上寻找被调用地方;

public void start() {
    // 打上断点
    if (this.members.isEmpty()) {
        return;
    }
    if (logger.isInfoEnabled()) {
        logger.info("Starting beans in phase " + this.phase);
    }
    Collections.sort(this.members);
    for (LifecycleGroupMember member : this.members) {
        if (this.lifecycleBeans.containsKey(member.name)) {
            doStart(this.lifecycleBeans, member.name, this.autoStartupOnly);
        }
    }
}

【分析二】:该类是DefaultLifecycleProcessor中内部类LifecycleGroup的一个方法,再往上寻找调用方;

private void startBeans(boolean autoStartupOnly) {
    Map lifecycleBeans = getLifecycleBeans();
    Map phases = new HashMap();
    for (Map.Entry entry : lifecycleBeans.entrySet()) {
        Lifecycle bean = entry.getValue();
        if (!autoStartupOnly || (bean instanceof SmartLifecycle && ((SmartLifecycle) bean).isAutoStartup())) {
            int phase = getPhase(bean);
            LifecycleGroup group = phases.get(phase);
            if (group == null) {
                group = new LifecycleGroup(phase, this.timeoutPerShutdownPhase, lifecycleBeans, autoStartupOnly);
                phases.put(phase, group);
            }
            group.add(entry.getKey(), bean);
        }
    }
    if (phases.size() > 0) {
        List keys = new ArrayList(phases.keySet());
        Collections.sort(keys);
        for (Integer key : keys) {
            phases.get(key).start();
        }
    }
}

【分析三】:startBeans 属于 DefaultLifecycleProcessor 类的一个私有方法,startBeans 方法第一行就是获取 getLifecycleBeans() 生命周期
Bean对象,由此可见似乎 Eureka Server 之所以会被启动,是不是实现了某个接口或者重写了某个方法,才会导致由于容易在初始化的过程中因调用某些特
殊方法或者某些类才启动的,因此我们回头去看看 EurekaServerInitializerConfiguration 这个类;

【分析四】:结果发现 EurekaServerInitializerConfiguration 这个类实现了 SmartLifecycle 这么样的一个接口,而 SmartLifecycle 接口又继
承了 Lifecycle 生命周期接口类,所以真想已经重见天日了,原来是实现了 Lifecycle 这样的一个接口,然后实现了 start 方法,因此 Eureka 
Server 就这么稀里糊涂的就被莫名其妙的启动起来了?

3.5 到这里难道就真的完了么?难道Eureka Server启动就干这么点点事情?不可能吧?

【分析一】:我们之前仅仅只是通过了日志来逆向分析,但是我们是不是忘了我们本应该标志是Eureka Server的这个注解了呢?没错,我们在分析的过程中
已经将 @EnableEurekaServer 这个注解遗忘了,那么我们现在先回到这个注解类来看看;

3.6 进入 EnableEurekaServer 类,看看究竟干了啥?

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerConfiguration.class)
public @interface EnableEurekaServer {

}

【分析一】:我们不难发现 EnableEurekaServer 类上有个 @Import 注解,引用了一个 class 文件,由此我们进入观察;

3.7 进入 EurekaServerConfiguration 类看看,看名称的话,理解的意思大概就是 Eureka Server 配置类;

【分析一】:果不其然,这个类有很多 @Bean@Configuration 注解过的方法,那是不是我们可以认为刚才 3.1~3.4 的推论是不是就是由于被实例化了这么一个 Bean,然后就慢慢的调用到了 start 方法了呢?

【分析二】:搜索 “Bootstrap” 字样,还真发现了有这么一个方法;

@Bean
public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
        EurekaServerContext serverContext) {
    return new EurekaServerBootstrap(this.applicationInfoManager,
            this.eurekaClientConfig, this.eurekaServerConfig, registry,
            serverContext);
}

【分析三】:既然有这么一个 Bean,那么是不是和刚开始顺着日志逆向分析也是有一定道理的,没有这么一个Bean的存在,那么 DefaultLifecycleProcessor.startBeans 方法中 getLifecycleBeans 的这个也就没那么顺畅被找到了呢?不过我的猜想是这样的,本人没有将源码下载下来,将 eurekaServerBootstrap 方法中的 @Bean 注解注释掉试试,不过推理起来也八九不离十,这个疑问悬念就留给大家尝试尝试吧;

【分析四】:既然找到了一个 @Bean 注解过的方法,那我们再找找其他的一些被注解过的方法,比如一些通用全局用的类似词眼,比如 Context,Bean,Init、Server 之类的;

@Bean
public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,
        PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
    return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
            registry, peerEurekaNodes, this.applicationInfoManager);
}

@Bean
public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,
        ServerCodecs serverCodecs) {
    return new PeerEurekaNodes(registry, this.eurekaServerConfig,
            this.eurekaClientConfig, serverCodecs, this.applicationInfoManager);
}

@Bean
public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
        ServerCodecs serverCodecs) {
    this.eurekaClient.getApplications(); // force initialization
    return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
            serverCodecs, this.eurekaClient,
            this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(),
            this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
}

@Bean
@ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true)
public EurekaController eurekaController() {
    return new EurekaController(this.applicationInfoManager);
}

【分析五】:DefaultEurekaServerContext.initialize 初始化了一些东西,现在还不知道干啥用的,先放这里,打上断点;

【分析六】:PeerEurekaNodes.start 方法,又是一个 start 方法,但是该类没有实现任何类,姑且先放这里,打上断点;

【分析七】:InstanceRegistry.register 方法,而且还有几个呢,可能是客户端注册用的,也先放这里,都打上断点,或者将 这个类的所有方法都断点上,断点打完后发现有注册的,有续约的,有注销的;

【分析八】:打完这些断点后,感觉没有思路了,索性就断点跑一把,看看有什么新的发现点;

3.8 停止服务,Debug 跑一下 springms-discovery-eureka 代码;

【分析一】:DefaultEurekaServerContext.initialize 方法被调用了,证实了刚才想法,EurekaServerConfiguration 不是白写的,还是有它的作用的;

@PostConstruct
@Override
public void initialize() throws Exception {
    logger.info("Initializing ...");
    peerEurekaNodes.start();
    registry.init(peerEurekaNodes);
    logger.info("Initialized");
}

【分析二】:进入 initialize 方法中 peerEurekaNodes.start();

public void start() {
    taskExecutor = Executors.newSingleThreadScheduledExecutor(
            new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
                    thread.setDaemon(true);
                    return thread;
                }
            }
    );
    try {
        updatePeerEurekaNodes(resolvePeerUrls());
        Runnable peersUpdateTask = new Runnable() {
            @Override
            public void run() {
                try {
                    updatePeerEurekaNodes(resolvePeerUrls());
                } catch (Throwable e) {
                    logger.error("Cannot update the replica Nodes", e);
                }

            }
        };
        // 注释:间隔 600000 毫秒,即 10分钟 间隔执行一次服务集群数据同步;
        taskExecutor.scheduleWithFixedDelay(
                peersUpdateTask,
                serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                TimeUnit.MILLISECONDS
        );
    } catch (Exception e) {
        throw new IllegalStateException(e);
    }
    for (PeerEurekaNode node : peerEurekaNodes) {
        logger.info("Replica node URL: " + node.getServiceUrl());
    }
}

【分析三】: start 方法中会看到一个定时调度的任务,updatePeerEurekaNodes(resolvePeerUrls()); 间隔 600000 毫秒,即 10分钟 间隔执行一次服务集群数据同步;

【分析四】: 然后断点放走放下走,进入 initialize 方法中 registry.init(peerEurekaNodes);

@Override
public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
    this.numberOfReplicationsLastMin.start();
    this.peerEurekaNodes = peerEurekaNodes;
    // 注释:初始化 Eureka Server 响应缓存,默认缓存时间为30s
    initializedResponseCache();
    // 注释:定时任务,多久重置一下心跳阈值,900000 毫秒,即 15分钟 的间隔时间,会重置心跳阈值
    scheduleRenewalThresholdUpdateTask();
    // 注释:初始化远端注册
    initRemoteRegionRegistry();

    try {
        Monitors.registerObject(this);
    } catch (Throwable e) {
        logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);
    }
}

【分析五】: 缓存也配置好了,定时任务也配置好了,似乎应该没啥了,那么我们把断点放开,看看下一步会走到哪里?

3.9 EurekaServerInitializerConfiguration.start 也进断点了。

【分析一】:先是 DefaultLifecycleProcessor.doStart 方法进断点,然后才是 EurekaServerInitializerConfiguration.start 方法进断点;

【分析二】:再一次证明刚刚的逆向分析仅仅只是缺了个从头EnableEurekaServer分析罢了,但是最终方法论分析思路还是对的,由于开始分析过这里,然而我们就跳过,继续放开断点向后继续看看;

3.10 InstanceRegistry.openForTraffic 也进断点了。

【分析一】:这不就是我们刚才在 “步骤3.7之分析七” 打的断点么?看下堆栈信息,正是 “步骤3.2之分析一” 中 initEurekaServerContext 方法中有
这么一句 this.registry.openForTraffic(this.applicationInfoManager, registryCount); 调用到了,因果轮回,代码千变万化,打上断点还有有好处的,结果还是回到了开始日志逆向分析的地方。

【分析二】:进入 super.openForTraffic 方法;

@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
    // Renewals happen every 30 seconds and for a minute it should be a factor of 2.
    // 注释:每30秒续约一次,那么每分钟续约就是2次,所以才是 count * 2 的结果;
    this.expectedNumberOfRenewsPerMin = count * 2;
    this.numberOfRenewsPerMinThreshold =
            (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
    logger.info("Got " + count + " instances from neighboring DS node");
    logger.info("Renew threshold is: " + numberOfRenewsPerMinThreshold);
    this.startupTime = System.currentTimeMillis();
    if (count > 0) {
        this.peerInstancesTransferEmptyOnStartup = false;
    }
    DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
    boolean isAws = Name.Amazon == selfName;
    if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
        logger.info("Priming AWS connections for all replicas..");
        primeAwsReplicas(applicationInfoManager);
    }
    logger.info("Changing status to UP");
    // 注释:修改 Eureka Server 为上电状态,就是说设置 Eureka Server 已经处于活跃状态了,那就是意味着 EurekaServer 基本上说可以正常使用了;
    applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
    // 注释:定时任务,60000 毫秒,即 1分钟 的间隔时间,Eureke Server定期进行失效节点的清理
    super.postInit();
}

【分析三】:这里主要设置了服务状态,以及开启了定时清理失效节点的定时任务,每分钟扫描一次;

3.11 继续放开断点,来到了日志打印 “main] c.n.e.EurekaDiscoveryClientConfiguration : Updating port to 8761” 的EurekaDiscoveryClientConfiguration 类中 onApplicationEvent 方法。

@EventListener(EmbeddedServletContainerInitializedEvent.class)
public void onApplicationEvent(EmbeddedServletContainerInitializedEvent event) {
    // TODO: take SSL into account when Spring Boot 1.2 is available
    int localPort = event.getEmbeddedServletContainer().getPort();
    if (this.port.get() == 0) {
        log.info("Updating port to " + localPort);
        this.port.compareAndSet(0, localPort);
        start();
    }
}

【分析一】:设置端口,当看到 Updating port to 8761 这样的日志打印出来的话,说明 Eureka Server 整个启动也就差不多Over了。现在回头看看,
发现分析了不少的方法和流程,有种感觉被掏空的感觉了。

3.12 总结 EurekaServer 启动时候大概干了哪些事情?

1、初始化Eureka环境,Eureka上下文;
2、初始化EurekaServer的缓存
3、启动了一些定时任务,比如充值心跳阈值定时任务,清理失效节点定时任务;
4、更新EurekaServer上电状态,更新EurekaServer端口;

虽然我从列举的流程里面大概总结了这么几点,但是还是有些是我没关注到的,如果大家有关注到的,可以和我共同讨论分析分析。

四、EurekaServer 处理服务注册、集群数据复制

4.1 EurekaClient 是如何注册到 EurekaServer 的?

【分析一】:由于我们刚才在 org.springframework.cloud.netflix.eureka.server.InstanceRegistry 的每个方法都打了一个断点,而且现在 
EurekaServer 已经处于 Debug 运行状态,那么我们就随便找一个被 @EnableEurekaClient 的微服务启动试试,要么就拿 springms-provider-user
微服务来试试吧,直接 Run。

【分析二】:猜测,如果如我们分析所想,当 springms-provider-user 启动后,就一定会调用注册register方法,那么就接着往下看,拭目以待;

4.2 InstanceRegistry.register(final InstanceInfo info, final boolean isReplication) 方法进断点了。

【分析一】:由于 InstanceRegistry.register 是我们刚刚打断点的地方,那么我们顺着堆栈信息往上看,原来是 ApplicationResource.addInstance 方法被调用了,那么我们就看看 addInstance 这个方法,并在 addInstance 这里打上断点;接着我们重新杀死 springms-provider-user 服务,然后再重启 springms-provider-user 服务;

4.2 断点再次来到了 ApplicationResource 类,这个类呢,主要是处理接收 Http 的服务请求。

@POST
@Consumes({
    
      "application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
                            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
    logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
    // validate that the instanceinfo contains all the necessary required fields
    if (isBlank(info.getId())) {
        return Response.status(400).entity("Missing instanceId").build();
    } else if (isBlank(info.getHostName())) {
        return Response.status(400).entity("Missing hostname").build();
    } else if (isBlank(info.getAppName())) {
        return Response.status(400).entity("Missing appName").build();
    } else if (!appName.equals(info.getAppName())) {
        return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
    } else if (info.getDataCenterInfo() == null) {
        return Response.status(400).entity("Missing dataCenterInfo").build();
    } else if (info.getDataCenterInfo().getName() == null) {
        return Response.status(400).entity("Missing dataCenterInfo Name").build();
    }

    // handle cases where clients may be registering with bad DataCenterInfo with missing data
    DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
    if (dataCenterInfo instanceof UniqueIdentifier) {
        String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
        if (isBlank(dataCenterInfoId)) {
            boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
            if (experimental) {
                String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
                return Response.status(400).entity(entity).build();
            } else if (dataCenterInfo instanceof AmazonInfo) {
                AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
                String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
                if (effectiveId == null) {
                    amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
                }
            } else {
                logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
            }
        }
    }

    registry.register(info, "true".equals(isReplication));
    return Response.status(204).build(); // 204 to be backwards compatible
}

【分析一】:这里的写法貌似看起来和我们之前 Controller 的 RESTFUL 写法有点不一样,仔细一看,原来是Jersey RESTful 框架,是一个产品级的 
RESTful service 和 client 框架。与Struts类似,它同样可以和hibernate,spring框架整合。

【分析二】:紧接着,我们看到 registry.register(info, "true".equals(isReplication)); 这么一段代码,注册啊,原来EurekaClient客户端启
动后会调用会通过Http(s)请求,直接调到 ApplicationResource.addInstance 方法,那么总算明白了,只要是和注册有关的,都会调用这个方法。

【分析三】:接着我们深入 registry.register(info, "true".equals(isReplication)) 查看;

@Override
public void register(final InstanceInfo info, final boolean isReplication) {
    handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication);
    super.register(info, isReplication);
}

【分析四】:看看 handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication) 方法;

private void handleRegistration(InstanceInfo info, int leaseDuration,
        boolean isReplication) {
    log("register " + info.getAppName() + ", vip " + info.getVIPAddress()
            + ", leaseDuration " + leaseDuration + ", isReplication "
            + isReplication);
    publishEvent(new EurekaInstanceRegisteredEvent(this, info, leaseDuration,
            isReplication));
}

【分析五】:该方法仅仅只是打了一个日志,然后通过 ApplicationContext 发布了一个事件 EurekaInstanceRegisteredEvent 服务注册事件,正如
“步骤3.3之分析三” 所提到的,用户可以给 EurekaInstanceRegisteredEvent 添加监听事件,那么用户就可以在此刻实现自己想要的一些业务逻辑。
然后我们再来看看 super.register(info, isReplication) 方法,该方法是 InstanceRegistry 的父类 PeerAwareInstanceRegistryImpl 的方法。

4.3 进入 PeerAwareInstanceRegistryImpl 类的 register(final InstanceInfo info, final boolean isReplication) 方法;

@Override
public void register(final InstanceInfo info, final boolean isReplication) {
    // 注释:续约时间,默认时间是常量值 90 秒
    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
    // 注释:续约时间,当然也可以从配置文件中取出来,所以说续约时间值也是可以让我们自己自定义配置的
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
        leaseDuration = info.getLeaseInfo().getDurationInSecs();
    }
    // 注释:将注册方的信息写入 EurekaServer 的注册表,父类为 AbstractInstanceRegistry
    super.register(info, leaseDuration, isReplication);
    // 注释:EurekaServer 节点之间的数据同步,复制到其他Peer
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}

【分析一】:进入 super.register(info, leaseDuration, isReplication) 看看是如何写入 EurekaServer 的注册表的,即进入 AbstractInstanceRegistry.register(InstanceInfo registrant, int leaseDuration, boolean isReplication) 方法。

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
    try {
        read.lock();
        // 注释:registry 这个变量,就是我们所谓的注册表,注册表是保存在内存中的;
        Map> gMap = registry.get(registrant.getAppName());
        REGISTER.increment(isReplication);
        if (gMap == null) {
            final ConcurrentHashMap> gNewMap = new ConcurrentHashMap>();
            gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
            if (gMap == null) {
                gMap = gNewMap;
            }
        }
        Lease existingLease = gMap.get(registrant.getId());
        // Retain the last dirty timestamp without overwriting it, if there is already a lease
        if (existingLease != null && (existingLease.getHolder() != null)) {
            Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
            Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
            logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
            if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                        " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                registrant = existingLease.getHolder();
            }
        } else {
            // The lease does not exist and hence it is a new registration
            synchronized (lock) {
                if (this.expectedNumberOfRenewsPerMin > 0) {
                    // Since the client wants to cancel it, reduce the threshold
                    // (1
                    // for 30 seconds, 2 for a minute)
                    this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
                    this.numberOfRenewsPerMinThreshold =
                            (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
                }
            }
            logger.debug("No previous lease information found; it is new registration");
        }
        Lease lease = new Lease(registrant, leaseDuration);
        if (existingLease != null) {
            lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
        }
        gMap.put(registrant.getId(), lease);
        synchronized (recentRegisteredQueue) {
            recentRegisteredQueue.add(new Pair(
                    System.currentTimeMillis(),
                    registrant.getAppName() + "(" + registrant.getId() + ")"));
        }
        // This is where the initial state transfer of overridden status happens
        if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
            logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                            + "overrides", registrant.getOverriddenStatus(), registrant.getId());
            if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
            }
        }
        InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
        if (overriddenStatusFromMap != null) {
            logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
            registrant.setOverriddenStatus(overriddenStatusFromMap);
        }

        // Set the status based on the overridden status rules
        InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
        registrant.setStatusWithoutDirty(overriddenInstanceStatus);

        // If the lease is registered with UP status, set lease service up timestamp
        if (InstanceStatus.UP.equals(registrant.getStatus())) {
            lease.serviceUp();
        }
        registrant.setActionType(ActionType.ADDED);
        recentlyChangedQueue.add(new RecentlyChangedItem(lease));
        registrant.setLastUpdatedTimestamp();
        invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
        logger.info("Registered instance {}/{} with status {} (replication={})",
                registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
    } finally {
        read.unlock();
    }
}

【分析二】:发现这个方法有点长,大致阅读,主要更新了注册表的时间之外,还更新了缓存等其它东西,大家有兴趣的可以深究阅读该方法;

4.4 跳出来我们接着看上面的 replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isR

锐单商城拥有海量元器件数据手册IC替代型号,打造电子元器件IC百科大全!

相关文章