当前位置: 首页 > news >正文

网站策划包括什么怎么做盗版电影网站

网站策划包括什么,怎么做盗版电影网站,美观网站建设价格,青岛优化网站技术1.前言 QuorumPeer是一个线程对象#xff0c;里面比较核心的方法是run方法#xff0c;但QuorumPeer的run方法比较复杂#xff0c;里面包含着针对QuorumPeer的各种状态的判断#xff0c;里面的代码比较长#xff0c;zk节点的looking状态下的操作#xff0c;下面这块代码是…1.前言 QuorumPeer是一个线程对象里面比较核心的方法是run方法但QuorumPeer的run方法比较复杂里面包含着针对QuorumPeer的各种状态的判断里面的代码比较长zk节点的looking状态下的操作下面这块代码是针对QuorumPeer是Looking状态下的话进行执行的代码逻辑会有两个分支根据判断节点是否配置readonlymode.enabled参数,然后有两个分支逻辑这两个分支逻辑都会走同一个代码逻辑。readonlymode.enabled参数为true的时候会进行开启一个异步线程执行ReadOnlyZooKeeperServer的startup方法。 2.LOOKING状态下QuorumPeer的执行逻辑 下面这个是LOOKING状态下的代码逻辑 LOG.info(LOOKING); ServerMetrics.getMetrics().LOOKING_COUNT.add(1); //判断节点是否是一个只读节点的配置if (Boolean.getBoolean(readonlymode.enabled)) {LOG.info(Attempting to start ReadOnlyZooKeeperServer);// Create read-only server but dont start it immediatelyfinal ReadOnlyZooKeeperServer roZk new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);// Instead of starting roZk immediately, wait some grace// period before we decide were partitioned.//// Thread is used here because otherwise it would require// changes in each of election strategy classes which is// unnecessary code coupling.Thread roZkMgr new Thread() {public void run() {try {// lower-bound grace period to 2 secssleep(Math.max(2000, tickTime));if (ServerState.LOOKING.equals(getPeerState())) {roZk.startup();}} catch (InterruptedException e) {LOG.info(Interrupted while attempting to start ReadOnlyZooKeeperServer, not started);} catch (Exception e) {LOG.error(FAILED to start ReadOnlyZooKeeperServer, e);}}};try {roZkMgr.start();reconfigFlagClear();if (shuttingDownLE) {shuttingDownLE false;startLeaderElection();}setCurrentVote(makeLEStrategy().lookForLeader());checkSuspended();} catch (Exception e) {LOG.warn(Unexpected exception, e);setPeerState(ServerState.LOOKING);} finally {// If the thread is in the the grace period, interrupt// to come out of waiting.roZkMgr.interrupt();roZk.shutdown();}} else {try {reconfigFlagClear();if (shuttingDownLE) {shuttingDownLE false;startLeaderElection();}setCurrentVote(makeLEStrategy().lookForLeader());} catch (Exception e) {LOG.warn(Unexpected exception, e);setPeerState(ServerState.LOOKING);}} 走同一块的代码逻辑 //将reconfigFlag这个字段的值设置为false //不是很清楚这个 reconfigFlag字段的作用 reconfigFlagClear();if (shuttingDownLE) {shuttingDownLE false;//QuorumPeer#start方法中已经进行了startLeaderElection方法的调用//这块看了下shuttingDownLE这个属性默认值为false 感觉一般情况下不会调用这个方法//开启选举算法 这块开始选择startLeaderElection();}//设置当前的选票 //makeLEStrategy().lookForLeader() 这个逻辑看上去是进行选举leader节点操作setCurrentVote(makeLEStrategy().lookForLeader());checkSuspended(); 3.创建选举算法 虽然startLeaderElection这方法在QuorumPeer的start方法中已经被进行调用了此处在looking状态下很有可能是不会被调用的我们可以简单的看下startLeaderElection这个方法我们这边看下的是zookeeper-3.9.1版本的代码 public synchronized void startLeaderElection() {try {//判断当前QuorumPeer的状态如果是LOOKING的状态 会进行构建一个选票信息if (getPeerState() ServerState.LOOKING) {currentVote new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());}} catch (IOException e) {RuntimeException re new RuntimeException(e.getMessage());re.setStackTrace(e.getStackTrace());throw re;}//根据选举的类型进行创建一个选举的算法逻辑this.electionAlg createElectionAlgorithm(electionType);} /*** * 以前版本还支持 多个选举类型 会有不同的选举算法来进行对应 * 现在支持1种选举算法 FastLeaderElection **/ protected Election createElectionAlgorithm(int electionAlgorithm) {Election le null;switch (electionAlgorithm) {case 1:throw new UnsupportedOperationException(Election Algorithm 1 is not supported.);case 2:throw new UnsupportedOperationException(Election Algorithm 2 is not supported.);case 3://进行构建一个网络通信的managerQuorumCnxManager qcm createCnxnManager();QuorumCnxManager oldQcm qcmRef.getAndSet(qcm);if (oldQcm ! null) {LOG.warn(Clobbering already-set QuorumCnxManager (restarting leader election?));oldQcm.halt();}//获取网络通信的组件的一个ListenerQuorumCnxManager.Listener listener qcm.listener;if (listener ! null) {//启动Listener 这个Listener主要是用来接收别的节点的信息listener.start();//构建FastLeaderElection对象FastLeaderElection fle new FastLeaderElection(this, qcm);//启动选举算法 这个选举算法应该也是一个线程fle.start();le fle;} else {LOG.error(Null listener when initializing cnx manager);}break;default:assert false;}return le;} FastLeaderElection方法的构造方法在FastLeaderElection的构造方法中主要进行发送队列和接收队列的初始化并对QuorumPeer的网络通信组件进行封装用于后期进行网络通信。 //构造方法 public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager) {this.stop false;this.manager manager;starter(self, manager);}private void starter(QuorumPeer self, QuorumCnxManager manager) {this.self self;proposedLeader -1;proposedZxid -1;//初始化一个发送队列sendqueue new LinkedBlockingQueue();//初始化的一个接收队列recvqueue new LinkedBlockingQueue();this.messenger new Messenger(manager);} //FastLeaderElection的start方法public void start() {this.messenger.start();} //Messenger的start方法 此时会进行启动两个线程 void start() {//发送线程的启动 this.wsThread.start();//接收线程的启动this.wrThread.start(); } 4.lookForLeader 从前端的代码逻辑中分析得出org.apache.zookeeper.server.quorum.FastLeaderElection#lookForLeader这个方法开启leader节点的选举操作当QuorumPeer的状态为LOOKING状态的时候会进行调用此方法。 public Vote lookForLeader() throws InterruptedException {//.....省略了部分代码try {//当前选票存放的集合MapLong, Vote recvset new HashMap();MapLong, Vote outofelection new HashMap();int notTimeout minNotificationInterval;synchronized (this) {logicalclock.incrementAndGet();updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());}LOG.info(New election. My id {}, proposed zxid0x{},self.getMyId(),Long.toHexString(proposedZxid));//发送选票信息sendNotifications();SyncedLearnerTracker voteSet null;//循环交换选票信息 直达选出leader节点while ((self.getPeerState() ServerState.LOOKING) (!stop)) {// 从接收队列中进行获取选票信息Notification n recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);//如果选票信息为null 当zkServer节点第一次启动的时候肯定是nullif (n null) {//manager.haveDelivered() 这个方法主要进行判断是否有已经连接的机器信息if (manager.haveDelivered()) {//如果已经有连接的机器信息的话 就进行给所有的节点发送选票信息sendNotifications();} else {//zkServer节点第一次启动的时候 连接机器的信息列表肯定为空//所以第一次的时候肯定会进行连接其他机器信息manager.connectAll();}/** Exponential backoff*/notTimeout Math.min(notTimeout 1, maxNotificationInterval);/** When a leader failure happens on a master, the backup will be supposed to receive the honour from* Oracle and become a leader, but the honour is likely to be delay. We do a re-check once timeout happens** The leader election algorithm does not provide the ability of electing a leader from a single instance* which is in a configuration of 2 instances.* */if (self.getQuorumVerifier() instanceof QuorumOracleMaj self.getQuorumVerifier().revalidateVoteset(voteSet, notTimeout ! minNotificationInterval)) {setPeerState(proposedLeader, voteSet);Vote endVote new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);leaveInstance(endVote);return endVote;}LOG.info(Notification time out: {} ms, notTimeout);} else if (validVoter(n.sid) validVoter(n.leader)) {//....省略很多代码} else {//....省略很多代码}}return null;} finally {try {if (self.jmxLeaderElectionBean ! null) {MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);}} catch (Exception e) {LOG.warn(Failed to unregister with JMX, e);}self.jmxLeaderElectionBean null;LOG.debug(Number of connection processing threads: {}, manager.getConnectionThreadCount());}} 5.集群中机器互联 zk节点最开始启动的时候会进行leader节点的选举在选举的过程中要进行选票的统计但要进行选票的统计的时候需要接收zk集群中所有节点的数据。 //连接集群中的其他机器 public void connectAll() {long sid;//循环遍历进行拦截机器信息for (EnumerationLong en queueSendMap.keys(); en.hasMoreElements(); ) {sid en.nextElement();connectOne(sid);}}通过代码的一步一步的进去查看我们找到了最终进行连接的操作 这块zookeeper是进行启动一个异步线程进行连接操作 public boolean initiateConnectionAsync(final MultipleAddresses electionAddr, final Long sid) {if (!inprogressConnections.add(sid)) {LOG.debug(Connection request to server id: {} is already in progress, so skipping this request, sid);return true;}try {//线程池启动一个异步线程进行连接别的zk节点信息connectionExecutor.execute(new QuorumConnectionReqThread(electionAddr, sid));connectionThreadCnt.incrementAndGet();} catch (Throwable e) {inprogressConnections.remove(sid);LOG.error(Exception while submitting quorum connection request, e);return false;}return true;} //异步连接请求线程private class QuorumConnectionReqThread extends ZooKeeperThread {final MultipleAddresses electionAddr;final Long sid;QuorumConnectionReqThread(final MultipleAddresses electionAddr, final Long sid) {super(QuorumConnectionReqThread- sid);this.electionAddr electionAddr;this.sid sid;}Overridepublic void run() {try {//连接请求initiateConnection(electionAddr, sid);} finally {inprogressConnections.remove(sid);}}} initiateConnection方法是初始化连接请求的数据 //初始化连接请求 public void initiateConnection(final MultipleAddresses electionAddr, final Long sid) {Socket sock null;try {LOG.debug(Opening channel to server {}, sid);//根据是否是SSL的类型进行创建不同的socketif (self.isSslQuorum()) {sock self.getX509Util().createSSLSocket();} else {sock SOCKET_FACTORY.get();}//设置socket的一些属性//tcpNoDelay soTimeout keepAlive等参数信息setSockOpts(sock);//开始正儿八经的连接操作sock.connect(electionAddr.getReachableOrOne(), cnxTO);if (sock instanceof SSLSocket) {SSLSocket sslSock (SSLSocket) sock;sslSock.startHandshake();LOG.info(SSL handshake complete with {} - {} - {},sslSock.getRemoteSocketAddress(),sslSock.getSession().getProtocol(),sslSock.getSession().getCipherSuite());}LOG.debug(Connected to server {} using election address: {}:{},sid, sock.getInetAddress(), sock.getPort());} catch (X509Exception e) {LOG.warn(Cannot open secure channel to {} at election address {}, sid, electionAddr, e);closeSocket(sock);return;} catch (UnresolvedAddressException | IOException e) {LOG.warn(Cannot open channel to {} at election address {}, sid, electionAddr, e);closeSocket(sock);return;}try {//连接完成之后的一些处理//包含设置一些输入输出流读写线程的启动等startConnection(sock, sid);} catch (IOException e) {LOG.error(Exception while connecting, id: {}, addr: {}, closing learner connection,sid,sock.getRemoteSocketAddress(),e);closeSocket(sock);}} 机器连接完成的一些操作当机器连接完成之后会进行调用startConnection方法 private boolean startConnection(Socket sock, Long sid) throws IOException {//data数据的输出流 从socket中进行获取并进行封装DataOutputStream dout null;//data数据的输入流从socket中获取并进行封装DataInputStream din null;LOG.debug(startConnection (myId:{} -- sid:{}), self.getMyId(), sid);try {BufferedOutputStream buf new BufferedOutputStream(sock.getOutputStream());dout new DataOutputStream(buf);//连接完成之后 向连接的zk节点输出自己的节点idlong protocolVersion self.isMultiAddressEnabled() ? PROTOCOL_VERSION_V2 : PROTOCOL_VERSION_V1;dout.writeLong(protocolVersion);dout.writeLong(self.getMyId());CollectionInetSocketAddress addressesToSend protocolVersion PROTOCOL_VERSION_V2? self.getElectionAddress().getAllAddresses(): Arrays.asList(self.getElectionAddress().getOne());String addr addressesToSend.stream().map(NetUtils::formatInetAddr).collect(Collectors.joining(|));byte[] addr_bytes addr.getBytes();dout.writeInt(addr_bytes.length);dout.write(addr_bytes);dout.flush();din new DataInputStream(new BufferedInputStream(sock.getInputStream()));} catch (IOException e) {LOG.warn(Ignoring exception reading or writing challenge: , e);closeSocket(sock);return false;}QuorumPeer.QuorumServer qps self.getVotingView().get(sid);if (qps ! null) {authLearner.authenticate(sock, qps.hostname);}//这块有一个逻辑 我觉得可以变更一下 这块是判断sid 如果小于自身的sid的时候//会进行关闭连接 那这块我觉得是不是可以在连接的时候 只连接比自己大的sid就可以了//而且这块的sid也是自己的sid 根本就不会大于 这个if里的操作就不会执行if (sid self.getMyId()) {LOG.info(Have smaller server identifier, so dropping the connection: (myId:{} -- sid:{}), self.getMyId(), sid);closeSocket(sock);} else {LOG.debug(Have larger server identifier, so keeping the connection: (myId:{} -- sid:{}), self.getMyId(), sid);// 开始两个线程个 一个读的线程 一个写的线程 并进行启动 读写线程SendWorker sw new SendWorker(sock, sid);RecvWorker rw new RecvWorker(sock, din, sid, sw);sw.setRecv(rw);SendWorker vsw senderWorkerMap.get(sid);if (vsw ! null) {vsw.finish();}senderWorkerMap.put(sid, sw);queueSendMap.putIfAbsent(sid, new CircularBlockingQueue(SEND_CAPACITY));//启动读写线程sw.start();rw.start();return true;}return false;} 有连接就会有被连接的比方说sid1的节点进行发起连接的时候别的zk节点是如何进行接收他的连接请求呢。这块代码逻辑是在哪里还记得我们在创建选举算法的时候会进行创建网络连接器在网络连接器中有一个QuorumCnxManager.Listener这个Listener会根据集群的数量启动一定的数量的ListenerHandler来进行监听连接。 public void run() {if (!shutdown) {LOG.debug(Listener thread started, myId: {}, self.getMyId());//获取所有的连接地址的大小SetInetSocketAddress addresses;if (self.getQuorumListenOnAllIPs()) {addresses self.getElectionAddress().getWildcardAddresses();} else {addresses self.getElectionAddress().getAllAddresses();}//启动一个CountDownLatch 大小为连接地址集合的大小CountDownLatch latch new CountDownLatch(addresses.size());listenerHandlers addresses.stream().map(address -new ListenerHandler(address, self.shouldUsePortUnification(), self.isSslQuorum(), latch)).collect(Collectors.toList());// 异步线程提交 listenerHandlersfinal ExecutorService executor Executors.newFixedThreadPool(addresses.size());try {listenerHandlers.forEach(executor::submit);} finally {executor.shutdown();}try {//在此进行等待等待所有的节点都连接成功latch.await();} catch (InterruptedException ie) {LOG.error(Interrupted while sleeping. Ignoring exception, ie);} finally {//解释所有的ListenerHandler监听for (ListenerHandler handler : listenerHandlers) {try {handler.close();} catch (IOException ie) {LOG.debug(Error closing server socket, ie);}}}}LOG.info(Leaving listener);if (!shutdown) {LOG.error(As Im leaving the listener thread, I wont be able to participate in leader election any longer: {},self.getElectionAddress().getAllAddresses().stream().map(NetUtils::formatInetAddr).collect(Collectors.joining(|)));if (socketException.get()) {// After leaving listener thread, the host cannot join the quorum anymore,// this is a severe error that we cannot recover from, so we need to exitsocketBindErrorHandler.run();}}} ListenerHandler的主要逻辑 进行创建ServerSocket这个东西然后调用serverSocket.accept()进行接受别的scoket的连接接收到别的连接之后也会进行封装输入流和输出流然后启动读写线程用来进行接收后续的消息但是这块有一个不同的地方。在接收完连接之后会有一个handleConnection方法在这个方法中会进行读取连接请求发送过来的sid当sid小于当前sid的时候会尽心关闭连接然后自己在主动发起一个连接请求。 public void run() {try {Thread.currentThread().setName(ListenerHandler- address);//接收请求参数信息acceptConnections();try {close();} catch (IOException e) {LOG.warn(Exception when shutting down listener: , e);}} catch (Exception e) {// Output of unexpected exception, should never happenLOG.error(Unexpected error , e);} finally {//lietener中传入的countDownLatch 进行减一的操作latch.countDown();}}//acceptConnections方法private void acceptConnections() {int numRetries 0;Socket client null;//如果机器没有宕机并且重试次数还没达到最大次数的时候 会在这里进行循环等待连接while ((!shutdown) (portBindMaxRetry 0 || numRetries portBindMaxRetry)) {try {serverSocket createNewServerSocket();LOG.info({} is accepting connections now, my election bind port: {}, QuorumCnxManager.this.mySid, address.toString());while (!shutdown) {try {client serverSocket.accept();setSockOpts(client);LOG.info(Received connection request from {}, client.getRemoteSocketAddress());if (quorumSaslAuthEnabled) {//异步接收连接请求receiveConnectionAsync(client);} else {//同步接收连接请求receiveConnection(client);}numRetries 0;} catch (SocketTimeoutException e) {LOG.warn(The socket is listening for the election accepted and it timed out unexpectedly, but will retry. see ZOOKEEPER-2836);}}} catch (IOException e) {if (shutdown) {break;}LOG.error(Exception while listening to address {}, address, e);if (e instanceof SocketException) {socketException.set(true);}numRetries;try {close();Thread.sleep(1000);} catch (IOException ie) {LOG.error(Error closing server socket, ie);} catch (InterruptedException ie) {LOG.error(Interrupted while sleeping. Ignoring exception, ie);}closeSocket(client);}}if (!shutdown) {LOG.error( Leaving listener thread for address {} after {} errors. Use {} property to increase retry count.,formatInetAddr(address),numRetries,ELECTION_PORT_BIND_RETRY);} }
http://www.zqtcl.cn/news/589942/

相关文章:

  • 网站页面上的悬浮窗怎么做简单好看的版面设计图
  • 我要在58上面做网站硬件开发和嵌入式的区别
  • 西安网站推广慧创新手怎么开网店
  • 做羞羞事视频网站网站策划书基本项目
  • 对网站建设的维护优秀设计网站推荐
  • 口红机网站怎么做wordpress 搭建个人网站
  • 黄金网站房地产网站建设意义
  • 百度网站联盟公司做网站计入那个科目
  • 越秀电子商务网站建设国外的ui设计思想网站
  • 网站关键词优化公司网站建设完成确认书
  • 企业微信网站怎么建设山东有哪些网络公司
  • 做任务领佣金的网站源码页面设计参评
  • 淘宝联盟个人网站怎么做企业年度报告公示系统
  • 长沙企业网页设计哪家专业网站优化seo
  • 网站设计 西安漂亮企业网站源码
  • 云南省科技网站网页设计师有前途吗
  • 漳州网站开发找出博大科技慈溪市建设局网站表格下载
  • 网站到期怎么续费公司网站asp源码
  • 多个域名 一个网站域名解析官网
  • 中国建设银行企业网站旅游网页代码模板
  • 湛江免费企业建站wordpress archives
  • 建个企业网站多少钱图书管理系统网站开发教程
  • 淘宝客网站建设详细教程wordpress转dz
  • 银川网站推广i深建官方网站
  • 有什么较好的网站开发框架娱乐网站模板
  • 宿迁网站建设托管wordpress 萝莉
  • 定制网站开发冬天里的白玫瑰制作复杂的企业网站首页
  • 网站开发及设计演讲海报免费做网站app下载
  • 做动态图片的网站吗自考网站建设与实践
  • 建外贸网站需要多少钱胖咯科技 网站建设