青岛网站排名多少钱,怎么做pc端移动网站,网页设计与制作大赛,衡阳网页定制zookeeper实际应用场景 zookeeper能够实现哪些场景 1#xff09;订阅发布/配置中心 watcher机制 统一配置管理#xff08;disconf#xff09; 实现配置信息的集中式原理和数据的动态更新 实现配置中心有俩种模式#xff1a;push,pull 长轮询 zookeeper采用的是推拉相结合的…zookeeper实际应用场景 zookeeper能够实现哪些场景 1订阅发布/配置中心 watcher机制 统一配置管理disconf 实现配置信息的集中式原理和数据的动态更新 实现配置中心有俩种模式push,pull 长轮询 zookeeper采用的是推拉相结合的方式。客户端向服务器端注册自己需要关注的节点。一旦节点数据发生变化name服务器端会向客户端发送watcher事件通知。客户端收到通知后主动到服务器端获取更新后的数据。 a 数据量比较小 b 数据内容在运行时发生动态变更 c 集群中的各个机器共享变量 2分布式锁 2.1 redis setNX 存在则会返回0 不存在则返回数据 2.2 数据库 创建一个表 通过唯一索引的方式 create tableid,methodname.. methodname增加唯一索引 insert 一条数据 xxx delete 删除数据 mysql 有innodb来设置表锁或者行锁 2.3 zookeeper 有序节点 排他锁 共享锁(读锁) 3负载均衡 请求/数据分摊多个计算单元 4ID生成器 5分布式队列 activeMQ kafka a 先进先出队列 getchildren获取指定根节点下面的子节点 确定自己节点在子节点中的顺序 如果自己不是最小的子节点监听比自己小的上一个子节点否则处于等待 接受watcher通知重复流程 b Barrier模式 阻碍模式 围栏模型 满足条件才会触发执行 6统一命名服务 7master选举 7*24小时可用 99.999%可用 master-slave模式 master出现故障 slave上位作为master 心跳机制去维持状态 脑裂 1分布式锁实现 package com.lulf.DistrubuteLock.JavaApi;import java.io.IOException;
import java.util.concurrent.CountDownLatch;import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;public class ZookeeperClient {private final static String CONNECTSTRING 192.168.48.133:2181,192.168.48.134:2181, 192.168.48.135:2181,192.168.48.136:2181;private static int sessionTimeOut 5000;//获取连接public static ZooKeeper getInstance() throws IOException, InterruptedException {final CountDownLatch countDownLatch new CountDownLatch(1);ZooKeeper zooKeeper new ZooKeeper(CONNECTSTRING, sessionTimeOut, new Watcher() {Overridepublic void process(WatchedEvent paramWatchedEvent) {if (paramWatchedEvent.getState() Event.KeeperState.SyncConnected) {countDownLatch.countDown();}}});countDownLatch.await();return zooKeeper;}public static int getSessionTimeOut() {return sessionTimeOut;}
}
复制代码package com.lulf.DistrubuteLock.JavaApi;import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;public class DistributeLock {private static final String ROOT_LOCKS /LOCKS; // 根节点private ZooKeeper zooKeeper;private int sessionTimeOut 5000;// 会话超时时间private String lockID;// 记录锁节点IDprivate CountDownLatch countDownLatch new CountDownLatch(1);private final static byte[] data { 1, 2 };public DistributeLock() throws IOException, InterruptedException {this.zooKeeper ZookeeperClient.getInstance();this.sessionTimeOut ZookeeperClient.getSessionTimeOut();}// 获取锁的方法public boolean lock() {try {lockID zooKeeper.create(ROOT_LOCKS /, data, ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);System.out.println(Thread.currentThread().getName() -成功创建lock节点[ lockID ]开始去竞争锁);ListString childrenNodes zooKeeper.getChildren(ROOT_LOCKS, true);// 排序从小到大SortedSetString sortedSet new TreeSetString();for (String children : childrenNodes) {sortedSet.add(ROOT_LOCKS / children);}String first sortedSet.first();// 拿到最小的节点if (lockID.equals(first)) {// 表示当前就是最小的System.out.println(Thread.currentThread().getName() success get lock,lock节点[ lockID ]);return true;}SortedSetString lessThanLockID sortedSet.headSet(lockID);if (!lessThanLockID.isEmpty()) {String preLockId lessThanLockID.last(); // 拿到比当前lockid这个节点更小的上一个节点zooKeeper.exists(preLockId, new LockWatcher(countDownLatch));countDownLatch.await(sessionTimeOut, TimeUnit.MILLISECONDS);// 上面这段代码意味着如果会话超时或者节点被删除System.out.println(Thread.currentThread().getName() 成功获取锁,lock节点[ lockID ]);}return true;} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}return false;}// 释放锁的方法public boolean unlock() {System.out.println(Thread.currentThread().getName() --开始释放锁:[ lockID ]);try {zooKeeper.delete(lockID, -1);System.out.println(节点[ lockID ]被成功删除);return true;} catch (Exception e) {e.getStackTrace().toString();}return false;}public static void main(String[] args) {CountDownLatch latch new CountDownLatch(10);Random random new Random();for (int i 0; i 10; i) {new Thread(() - {DistributeLock lock null;try {lock new DistributeLock();latch.countDown();latch.await();lock.lock();Thread.sleep(random.nextInt(500));} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}).start();}}
}
复制代码 运行程序 2master选举 实现共享锁 package com.lulf.DistrubuteLock.zkclient;import java.io.Serializable;public class UserCenter implements Serializable{/*** */private static final long serialVersionUID -4060228979536051295L;private int m_id;//机器信息private String mc_name; //机器名称public int getM_id() {return m_id;}public void setM_id(int m_id) {this.m_id m_id;}public String getMc_name() {return mc_name;}public void setMc_name(String mc_name) {this.mc_name mc_name;}
}
复制代码package com.lulf.DistrubuteLock.zkclient;import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;/*** 选主服务* * author lulf**/
public class MasterSelector {private ZkClient client;private final static String MASTER_PATH /master;// 需要争抢的节点private IZkDataListener dataListener;// 注册节点内容发生变化private UserCenter server; // 其他服务器private UserCenter master; // master节点private static boolean isrunning false;ScheduledExecutorService scheduledExecutorService Executors.newScheduledThreadPool(1);public MasterSelector(UserCenter server,ZkClient client) {this.server server;this.clientclient;this.dataListener new IZkDataListener() {Overridepublic void handleDataDeleted(String dataPath) throws Exception {// 节点如果被删除,发起一个选主操作chooseMaster();}Overridepublic void handleDataChange(String dataPath, Object data) throws Exception {}};}public void start() {// 开始选举if(!isrunning){isrunningtrue;client.subscribeDataChanges(MASTER_PATH, dataListener);//注冊节点时间chooseMaster();}}public void stop() {// 停止if(isrunning){isrunningfalse;scheduledExecutorService.shutdown();client.unsubscribeDataChanges(MASTER_PATH, dataListener);//取消订阅releaseMaster();}}// 具体选主的服务private void chooseMaster() {if (!isrunning) {System.out.println(当前服务没有启动。。。);return;}try {client.createEphemeral(MASTER_PATH, server);master server;// 把server节点赋值给masterSystem.out.println(master.getMc_name() --我已经是master,开始领导你们);// 定时器// master释放锁,出现故障scheduledExecutorService.schedule(() - {releaseMaster();}, 5, TimeUnit.SECONDS);//每5秒释放一次锁} catch (ZkNodeExistsException e) {e.getStackTrace().toString();//表示master已经存在UserCenter userCenterclient.readData(MASTER_PATH, true);if(userCenternull){chooseMaster();//再次获取master}else{masteruserCenter;}}}private void releaseMaster() {// 释放锁(故障模拟)//判断当前是否是master只有master才需要释放锁if(checkMaster()){client.delete(MASTER_PATH, -1);//删除}}private boolean checkMaster() {// 判断当前的server是否是masterUserCenter userCenterclient.readData(MASTER_PATH);if(userCenter.getMc_name().equals(server.getMc_name())){masteruserCenter;return true;}return false;}
}
复制代码package com.lulf.DistrubuteLock.zkclient;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;public class MasterChooseDemo {private final static String CONNECTSTRING 192.168.48.133:2181,192.168.48.134:2181, 192.168.48.135:2181,192.168.48.136:2181;public static void main(String[] args) {ListMasterSelectorselectorListnew ArrayListMasterSelector();try { for (int i 0; i 10; i) {ZkClient zkClientnew ZkClient(CONNECTSTRING, 5000,5000,new SerializableSerializer());UserCenter userCenternew UserCenter();userCenter.setM_id(i);userCenter.setMc_name(lulf_i);MasterSelector selectornew MasterSelector(userCenter,zkClient);selectorList.add(selector);selector.start();//触发选举操作TimeUnit.SECONDS.sleep(4);} } catch (Exception e) {e.getStackTrace().toString();}finally {for (MasterSelector masterSelector : selectorList) {masterSelector.stop();}}}
}
复制代码curator 提供应用场景封装 curator-reciples 提供了api调用 如:master/leader选举 分布式锁 读锁 写锁 分布式队列 。。。 LeaderLatch 阻塞 写一个master LeaderSelector 自动抢 每个应用都写一个临时有序节点根据最小的节点来获取优先点 package com.lulf.DistrubuteLock.curator;import java.util.concurrent.TimeUnit;import javax.swing.tree.ExpandVetoException;import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;public class MasterSelector {private final static String CONNECTSTRING 192.168.48.133:2181,192.168.48.134:2181, 192.168.48.135:2181,192.168.48.136:2181;private final static String MASTER_PATH/curator_master_path;public static void main(String[] args) {CuratorFramework curatorFrameworkCuratorFrameworkFactory.builder().connectString(CONNECTSTRING).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();LeaderSelector leaderSelectornew LeaderSelector(curatorFramework, MASTER_PATH, new LeaderSelectorListenerAdapter() {Overridepublic void takeLeadership(CuratorFramework arg0) throws Exception {System.out.println(获取leader成功);TimeUnit.SECONDS.sleep(2);}});leaderSelector.autoRequeue();leaderSelector.start();//开始选举}
}
复制代码zookeeper的集群角色 leader leader是zookeeper集群的核心。 1 事务请求的唯一调度的矗立着他需要保证事务处理的顺序性 2 集群内部各个服务器的调度者 follower 1 处理客户端非事务请求以及转发事务请求给leader服务器 2 参与事务请求提议的proposal投票 【客户端的一个事务请求需要半数服务投票通过才能通知leader commitleader会发起一个提案要求follower投票】 3 参与leader选举的投票 observer 1 观察zookeeper集群中最新状态的变化并且把这些状态同步到observer服务器上。 2 增加observer不影响集群事务处理能力同时能提升集群的非事务处理能力 zookeeper的集群组成 zookeeper一般是由2n1台服务器组成 leader选举 1leaderElection 2AuthFastLeaderElection 3FastLeaderElection serverID 配置server集群的时候给定服务器的标识id myid zxid服务器它运行时产生的数据IDzxid值越大标识数据越新 Epoch选举轮次 sid server的状态LookingFollowingObserveringLeading 第一次初始化启动的时候是Looking 1所有集群中的server都会推荐自己为leader然后把myidzxidepoch作为广播信息广播给集群中的其他server然后等待其他服务器返回。 2每个服务器都会接受到来自集群中的其他服务器的投票及群众的每个服务器在接受到投票之后都会判断投票的有效性 a判断逻辑时钟epoch 如果epoch大于自己当前的epoch说明自己保存的epoch是过期更新epoch同事clear其他服务器送过来的选举数据判断是否需要更新当前自己的选举情况 b如果Epoch小于目前的epoch说明对方的epoch过期意味着对方服务器的选举轮次是过期的只需要把自己的信息发送给对方 c如果接收到的epoch等于当前的epoch根据规则来判断是否有资格获得leader 接受到来自其他服务器的投票后针对每一个投标都需要将别人的投票和自己的投票进行pk ZXID最大的服务器优先 3统计投票 ZAB协议 拜占庭问题 paxos协议主要就是如何保证在分布式网络环境下各个服务器如何达成一致最终保证数据的一致性问题。 ZAB协议基于paxos协议的一个改进。 ZAB协议为分布式协调服务zookeeper专门设计的一种支持奔溃恢复的原子广播协议。 zookeeper并没有完全采用paxos算法而是采用zab zookeeper stomic broadcast zab协议的原理 1在zookeeper的主备模式下通过zab协议来保证集群中的各个副本数据的一致性 2zookeeper是使用单一的主进程来接受并处理所有的事务请求并采用zab协议把数据的状态变更以事务请求的形式广播到其他节点 3zab协议在主备模型架构中保证了同一时刻只能有一个主进程来广播服务器的状态变更 4所有的事务请求必须由全局唯一的服务器来协调处理这个服务器叫leader其他叫follower leader节点主要是负责把客户端的请求转化为一个事务提议proposal并且分发给集群中的所有follower节点再等待所有follower节点反馈一旦超过半数服务器进行了正确的反馈nameleader就会commit这个消息。 奔溃恢复 原子广播 zab协议的工作原理 1什么情况下zab协议会进入奔溃恢复模式 a 当服务器启动时 b 当leader服务器出现网络中断 奔溃 重启的情况 c 集群中已经不存在过半的服务器与该leader保持正常通行 2zab协议进入奔溃恢复模式会做什么 a 当leader出现问题zab协议进入奔溃恢复模式并且选举出新的leader。当新的leader选举出来以后如果集群中已经有过半机器完成了leader服务器的状态同步数据同步退出崩溃恢复。进入消息广播模式 b 当新的机器加入到集群中的时候如果已经存在leader服务器那么新加入的服务器就会自觉进入数据恢复模式找到leader进行数据同步 问题 假设一个事务在leader服务器被提交了并且已经有过半的follower返回了ack。在leader节点把commit消息发送给follower机器之前leader服务器挂了怎么办 zab协议一定需要保证已经被leader提交的事务也能够被所有follower提交。 zab协议需要协议在奔溃恢复过程中跳过那些已经被丢弃的事务。