镇平县建设局网站,做化工的外贸网站都有什么意思,上海工商信息查询网,信息流推广渠道有哪些背景
在使用dubbo的时候#xff0c;发现当消费者启动的时候#xff0c;如果提供者没有启动#xff0c;即使提供者后来启动了#xff0c;消费者也调不通提供者提供的接口了。
注册中心使用都是nacos
dubbo版本是3.0.4
例子
接口
public interface DemoService {String…背景
在使用dubbo的时候发现当消费者启动的时候如果提供者没有启动即使提供者后来启动了消费者也调不通提供者提供的接口了。
注册中心使用都是nacos
dubbo版本是3.0.4
例子
接口
public interface DemoService {String sayHello();
}提供者
DubboService
public class DemoServiceImpl implements DemoService {Overridepublic String sayHello() {return hello;}
}EnableDubbo
SpringBootApplication(exclude {DataSourceAutoConfiguration.class})
public class ReferenceCheckProviderStarter {public static void main(String[] args) {new SpringApplicationBuilder(ReferenceCheckProviderStarter.class).web(WebApplicationType.NONE) // .REACTIVE, .SERVLET.run(args);System.out.println(dubbo service started);}
}消费者
EnableDubbo
RestController
SpringBootApplication(exclude {DataSourceAutoConfiguration.class})
public class ReferenceCheckConsumerStarter {DubboReferenceprivate DemoService demoService;GetMapping(/dubbo/nacos/test)public Object test() {return demoService.sayHello();}public static void main(String[] args) {SpringApplication.run(ReferenceCheckConsumerStarter.class, args);}
}1. 先启动provider再启动consumer
a. 启动provider nacos出现provider的服务
b. 启动consumer nacos出现consumer的服务
访问 http://127.0.0.1:8080/dubbo/nacos/test 后返回helloc. 终止provider nacos上provider的服务消失了
访问 http://127.0.0.1:8080/dubbo/nacos/test 后返回No provider available from registryd. 重新启动provider nacos出现provider的服务
访问 http://127.0.0.1:8080/dubbo/nacos/test 后返回hello可以看出先启动provider再启动consumer整个过程是没问题。
2. 先启动consumer再启动provider
a. 启动consumer nacos出现consumer的服务但立即又消失了
b. 启动provider nacos出现provider的服务
访问 http://127.0.0.1:8080/dubbo/nacos/test 后返回Directory already destroyed .可以看出当consumer先启动时如果provider此时没有启动consumer就再也访问不到provider的服务了。
3. 先启动consumer再启动provider (checkfalse)
修改一下注解DubboRefere的参数
DubboReference(check false)
private DemoService demoService;a. 启动consumer nacos出现consumer的服务
访问 http://127.0.0.1:8080/dubbo/nacos/test 后返回
No provider available from registryb. 启动provider nacos出现provider的服务
访问 http://127.0.0.1:8080/dubbo/nacos/test 后返回
hello可以看出即使是consumer先启动当provider启动后consumer还是能够访问到provider的服务的。
关于报错
org.apache.dubbo.rpc.RpcException: No provider available from registry
public class RegistryDirectoryT extends DynamicDirectoryT {
Overridepublic ListInvokerT doList(Invocation invocation) {if (forbidden) {// 1. No service provider 2. Service providers are disabledthrow new RpcException(RpcException.FORBIDDEN_EXCEPTION, No provider available from registry getUrl().getAddress() for service getConsumerUrl().getServiceKey() on consumer NetUtils.getLocalHost() use dubbo version Version.getVersion() , please check status of providers(disabled, not registered or in blacklist).);}// ......}
}public class RegistryDirectoryT extends DynamicDirectoryT {String EMPTY_PROTOCOL empty;private void refreshInvoker(ListURL invokerUrls) {Assert.notNull(invokerUrls, invokerUrls should not be null);if (invokerUrls.size() 1 invokerUrls.get(0) ! null EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {this.forbidden true; // Forbid to accessthis.invokers Collections.emptyList();routerChain.setInvokers(this.invokers);destroyAllInvokers(); // Close all invokers} else {this.forbidden false; // Allow to accessif (invokerUrls Collections.URLemptyList()) {invokerUrls new ArrayList();}if (invokerUrls.isEmpty() this.cachedInvokerUrls ! null) {invokerUrls.addAll(this.cachedInvokerUrls);} else {this.cachedInvokerUrls new HashSet();this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison}if (invokerUrls.isEmpty()) {return;}// cant use local reference because this.urlInvokerMap might be accessed at isAvailable() by main thread concurrently.MapURL, InvokerT oldUrlInvokerMap null;if (this.urlInvokerMap ! null) {// the initial capacity should be set greater than the maximum number of entries divided by the load factor to avoid resizing.oldUrlInvokerMap new LinkedHashMap(Math.round(1 this.urlInvokerMap.size() / DEFAULT_HASHMAP_LOAD_FACTOR));this.urlInvokerMap.forEach(oldUrlInvokerMap::put);}MapURL, InvokerT newUrlInvokerMap toInvokers(oldUrlInvokerMap, invokerUrls);// Translate url list to Invoker map/*** If the calculation is wrong, it is not processed.** 1. The protocol configured by the client is inconsistent with the protocol of the server.* eg: consumer protocol dubbo, provider only has other protocol services(rest).* 2. The registration center is not robust and pushes illegal specification data.**/if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {logger.error(new IllegalStateException(urls to invokers error .invokerUrls.size : invokerUrls.size() , invoker.size :0. urls : invokerUrls.toString()));return;}ListInvokerT newInvokers Collections.unmodifiableList(new ArrayList(newUrlInvokerMap.values()));// pre-route and build cache, notice that route cache should build on original Invoker list.// toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed.routerChain.setInvokers(newInvokers);this.invokers multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;this.urlInvokerMap newUrlInvokerMap;try {destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker} catch (Exception e) {logger.warn(destroyUnusedInvokers error. , e);}// notify invokers refreshedthis.invokersChanged();}}private synchronized void refreshOverrideAndInvoker(ListURL urls) {// mock zookeeper://xxx?mockreturn nulloverrideDirectoryUrl();refreshInvoker(urls);}Overridepublic synchronized void notify(ListURL urls) {if (isDestroyed()) {return;}MapString, ListURL categoryUrls urls.stream().filter(Objects::nonNull).filter(this::isValidCategory).filter(this::isNotCompatibleFor26x).collect(Collectors.groupingBy(this::judgeCategory));ListURL configuratorURLs categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());this.configurators Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);ListURL routerURLs categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());toRouters(routerURLs).ifPresent(this::addRouters);// providersListURL providerURLs categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());// 3.x added for extend URL addressExtensionLoaderAddressListener addressListenerExtensionLoader getUrl().getOrDefaultModuleModel().getExtensionLoader(AddressListener.class);ListAddressListener supportedListeners addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);if (supportedListeners ! null !supportedListeners.isEmpty()) {for (AddressListener addressListener : supportedListeners) {providerURLs addressListener.notify(providerURLs, getConsumerUrl(),this);}}refreshOverrideAndInvoker(providerURLs); // 这里}}public abstract class AbstractRegistry implements Registry {/*** Notify changes from the Provider side.** param url consumer side url* param listener listener* param urls provider latest urls*/protected void notify(URL url, NotifyListener listener, ListURL urls) {if (url null) {throw new IllegalArgumentException(notify url null);}if (listener null) {throw new IllegalArgumentException(notify listener null);}if ((CollectionUtils.isEmpty(urls)) !ANY_VALUE.equals(url.getServiceInterface())) {logger.warn(Ignore empty notify urls for subscribe url url);return;}if (logger.isInfoEnabled()) {logger.info(Notify urls for subscribe url url , url size: urls.size());}// keep every providers category.MapString, ListURL result new HashMap(); // 这里for (URL u : urls) {if (UrlUtils.isMatch(url, u)) {String category u.getCategory(DEFAULT_CATEGORY);ListURL categoryList result.computeIfAbsent(category, k - new ArrayList()); // 这里categoryList.add(u); // 这里}}if (result.size() 0) {return;}MapString, ListURL categoryNotified notified.computeIfAbsent(url, u - new ConcurrentHashMap());for (Map.EntryString, ListURL entry : result.entrySet()) {String category entry.getKey();ListURL categoryList entry.getValue();categoryNotified.put(category, categoryList);listener.notify(categoryList); // 这里// We will update our cache file after each notification.// When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.if (localCacheEnabled) {saveProperties(url);}}}
}public class NacosRegistry extends FailbackRegistry {private void notifySubscriber(URL url, NotifyListener listener, CollectionInstance instances) {ListInstance enabledInstances new LinkedList(instances);if (enabledInstances.size() 0) {// InstancesfilterEnabledInstances(enabledInstances);}ListURL urls toUrlWithEmpty(url, enabledInstances);NacosRegistry.this.notify(url, listener, urls); // 这里}String EMPTY_PROTOCOL empty;private ListURL toUrlWithEmpty(URL consumerURL, CollectionInstance instances) {ListURL urls buildURLs(consumerURL, instances);if (urls.size() 0) { // 这里URL empty URLBuilder.from(consumerURL).setProtocol(EMPTY_PROTOCOL).addParameter(CATEGORY_KEY, DEFAULT_CATEGORY).build();urls.add(empty);}return urls;}
}当没有可用的服务时instances是空的 当有可用的服务时instances是不为空的 是怎么通知的
public class ServiceInfoHolder implements Closeable {public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {String serviceKey serviceInfo.getKey();if (serviceKey null) {return null;}ServiceInfo oldService serviceInfoMap.get(serviceInfo.getKey());if (isEmptyOrErrorPush(serviceInfo)) {//empty or error push, just ignorereturn oldService;}serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);boolean changed isChangedServiceInfo(oldService, serviceInfo);if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));}MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());if (changed) { // 这里NAMING_LOGGER.info(current ips:( serviceInfo.ipCount() ) service: serviceInfo.getKey() - JacksonUtils.toJson(serviceInfo.getHosts()));NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),serviceInfo.getClusters(), serviceInfo.getHosts())); // 这里DiskCache.write(serviceInfo, cacheDir);}return serviceInfo;}
}public class DefaultPublisher extends Thread implements EventPublisher {private BlockingQueueEvent queue;Overridepublic void init(Class? extends Event type, int bufferSize) {setDaemon(true);setName(nacos.publisher- type.getName());this.eventType type;this.queueMaxSize bufferSize;this.queue new ArrayBlockingQueue(bufferSize); // 这里start();}Overridepublic boolean publish(Event event) {checkIsStart();boolean success this.queue.offer(event); // 这里if (!success) {LOGGER.warn(Unable to plug in due to interruption, synchronize sending time, event : {}, event);receiveEvent(event);return true;}return true;}Overridepublic void run() {openEventHandler();}void openEventHandler() {try {// This variable is defined to resolve the problem which message overstock in the queue.int waitTimes 60;// To ensure that messages are not lost, enable EventHandler when// waiting for the first Subscriber to registerfor (; ; ) {if (shutdown || hasSubscriber() || waitTimes 0) {break;}ThreadUtils.sleep(1000L);waitTimes--;}for (; ; ) {if (shutdown) {break;}final Event event queue.take(); // 这里receiveEvent(event); // 这里UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence()));}} catch (Throwable ex) {LOGGER.error(Event listener exception : , ex);}}void receiveEvent(Event event) {final long currentEventSequence event.sequence();if (!hasSubscriber()) {LOGGER.warn([NotifyCenter] the {} is lost, because there is no subscriber.);return;}// Notification single event listenerfor (Subscriber subscriber : subscribers) {// Whether to ignore expiration eventsif (subscriber.ignoreExpireEvent() lastEventSequence currentEventSequence) {LOGGER.debug([NotifyCenter] the {} is unacceptable to this subscriber, because had expire,event.getClass());continue;}// Because unifying smartSubscriber and subscriber, so here need to think of compatibility.// Remove original judge part of codes.notifySubscriber(subscriber, event); // 这里}}Overridepublic void notifySubscriber(final Subscriber subscriber, final Event event) {LOGGER.debug([NotifyCenter] the {} will received by {}, event, subscriber);final Runnable job () - subscriber.onEvent(event);final Executor executor subscriber.executor(); if (executor ! null) {executor.execute(job); // 这里} else {try {job.run(); // 这里} catch (Throwable e) {LOGGER.error(Event callback exception: , e);}}}
}public class InstancesChangeNotifier extends SubscriberInstancesChangeEvent {Overridepublic void onEvent(InstancesChangeEvent event) {String key ServiceInfo.getKey(NamingUtils.getGroupedName(event.getServiceName(), event.getGroupName()), event.getClusters());ConcurrentHashSetEventListener eventListeners listenerMap.get(key);if (CollectionUtils.isEmpty(eventListeners)) {return;}for (final EventListener listener : eventListeners) {final com.alibaba.nacos.api.naming.listener.Event namingEvent transferToNamingEvent(event);if (listener instanceof AbstractEventListener ((AbstractEventListener) listener).getExecutor() ! null) {((AbstractEventListener) listener).getExecutor().execute(() - listener.onEvent(namingEvent)); // 这里} else {listener.onEvent(namingEvent); // 这里}}}
}public class NacosRegistry extends FailbackRegistry {Overridepublic void onEvent(Event event) {if (event instanceof NamingEvent) {NamingEvent e (NamingEvent) event;notifier.notify(e.getInstances()); // 这里}}
}public abstract class RegistryNotifier {public synchronized void notify(Object rawAddresses) {this.rawAddresses rawAddresses;long notifyTime System.currentTimeMillis();this.lastEventTime notifyTime;long delta (System.currentTimeMillis() - lastExecuteTime) - delayTime;// more than 10 calls next execute time is in the futureboolean delay shouldDelay.get() delta 0;if (delay) {scheduler.schedule(new NotificationTask(this, notifyTime), -delta, TimeUnit.MILLISECONDS); // 这里} else {// check if more than 10 callsif (!shouldDelay.get() executeTime.incrementAndGet() DEFAULT_DELAY_EXECUTE_TIMES) {shouldDelay.set(true);}scheduler.submit(new NotificationTask(this, notifyTime)); // 这里}}public static class NotificationTask implements Runnable {private final RegistryNotifier listener;private final long time;public NotificationTask(RegistryNotifier listener, long time) {this.listener listener;this.time time;}Overridepublic void run() {try {if (this.time listener.lastEventTime) {listener.doNotify(listener.rawAddresses); // 这里listener.lastExecuteTime System.currentTimeMillis();synchronized (listener) {if (this.time listener.lastEventTime) {listener.rawAddresses null;}}}} catch (Throwable t) {logger.error(Error occurred when notify directory. , t);}}}}
}public class NacosRegistry extends FailbackRegistry {private class RegistryChildListenerImpl implements EventListener {private RegistryNotifier notifier;public RegistryChildListenerImpl(String serviceName, URL consumerUrl, NotifyListener listener) {notifier new RegistryNotifier(getUrl(), NacosRegistry.this.getDelay()) {Overrideprotected void doNotify(Object rawAddresses) {ListInstance instances (ListInstance) rawAddresses;if (isServiceNamesWithCompatibleMode(consumerUrl)) {/*** Get all instances with corresponding serviceNames to avoid instance overwrite and but with empty instance mentioned* in https://github.com/apache/dubbo/issues/5885 and https://github.com/apache/dubbo/issues/5899*/NacosInstanceManageUtil.initOrRefreshServiceInstanceList(serviceName, instances);instances NacosInstanceManageUtil.getAllCorrespondingServiceInstanceList(serviceName);}NacosRegistry.this.notifySubscriber(consumerUrl, listener, instances); // 这里}};}
}然后就调用了上面的
什么时候添加监听器的
public class NacosRegistry extends FailbackRegistry {private void subscribeEventListener(String serviceName, final URL url, final NotifyListener listener)throws NacosException {EventListener eventListener new RegistryChildListenerImpl(serviceName, url, listener); // 这里namingService.subscribe(serviceName,getUrl().getGroup(Constants.DEFAULT_GROUP),eventListener); // 这里}private void doSubscribe(final URL url, final NotifyListener listener, final SetString serviceNames) {try {if (isServiceNamesWithCompatibleMode(url)) {ListInstance allCorrespondingInstanceList Lists.newArrayList();/*** Get all instances with serviceNames to avoid instance overwrite and but with empty instance mentioned* in https://github.com/apache/dubbo/issues/5885 and https://github.com/apache/dubbo/issues/5899** namingService.getAllInstances with {link org.apache.dubbo.registry.support.AbstractRegistry#registryUrl}* default {link DEFAULT_GROUP}** in https://github.com/apache/dubbo/issues/5978*/for (String serviceName : serviceNames) {ListInstance instances namingService.getAllInstances(serviceName,getUrl().getGroup(Constants.DEFAULT_GROUP));NacosInstanceManageUtil.initOrRefreshServiceInstanceList(serviceName, instances);allCorrespondingInstanceList.addAll(instances);}notifySubscriber(url, listener, allCorrespondingInstanceList); for (String serviceName : serviceNames) {subscribeEventListener(serviceName, url, listener); // 这里}} else {for (String serviceName : serviceNames) {ListInstance instances new LinkedList();instances.addAll(namingService.getAllInstances(serviceName, getUrl().getGroup(Constants.DEFAULT_GROUP)));String serviceInterface serviceName;String[] segments serviceName.split(SERVICE_NAME_SEPARATOR, -1);if (segments.length 4) {serviceInterface segments[SERVICE_INTERFACE_INDEX];}URL subscriberURL url.setPath(serviceInterface).addParameters(INTERFACE_KEY, serviceInterface,CHECK_KEY, String.valueOf(false));notifySubscriber(subscriberURL, listener, instances);subscribeEventListener(serviceName, subscriberURL, listener);}}} catch (Throwable cause) {throw new RpcException(Failed to subscribe url to nacos getUrl() , cause: cause.getMessage(), cause);}}
}org.apache.dubbo.rpc.RpcException: Directory already destroyed
public abstract class AbstractDirectoryT implements DirectoryT {Overridepublic ListInvokerT list(Invocation invocation) throws RpcException {if (destroyed) {throw new RpcException(Directory already destroyed .url: getUrl());}return doList(invocation);}Overridepublic void destroy() {destroyed true; // 这里}
}public class ReferenceConfigT extends ReferenceConfigBaseT {private void checkInvokerAvailable() throws IllegalStateException {if (shouldCheck() !invoker.isAvailable()) {invoker.destroy(); // 这里throw new IllegalStateException(Should has at least one way to know which services this interface belongs to, subscription url: invoker.getUrl());}}protected synchronized void init() {// ......checkInvokerAvailable(); // 这里}}public abstract class ReferenceConfigBaseT extends AbstractReferenceConfig {public boolean shouldCheck() {checkDefault();Boolean shouldCheck isCheck(); // 这里if (shouldCheck null getConsumer() ! null) {shouldCheck getConsumer().isCheck(); }if (shouldCheck null) {// default true // 这里shouldCheck true;}return shouldCheck;}
}public class RegistryDirectoryT extends DynamicDirectoryT {Overridepublic boolean isAvailable() {if (isDestroyed() || this.forbidden) { // 这里return false;}MapURL, InvokerT localUrlInvokerMap urlInvokerMap; // 这里return CollectionUtils.isNotEmptyMap(localUrlInvokerMap) localUrlInvokerMap.values().stream().anyMatch(Invoker::isAvailable);}
}如果没有设置check字段那么就会在启动的时 候检查提供方是否可用如果不可用就销毁了。