驻马店网站建设价格,phpcms安装教程,微信怎么做网站推广,广东省深圳市目录 1. Zookeeper集群操作
1.1 客户端操作zk集群
1.2 模拟集群异常操作
1.3 curate客户端连接zookeeper集群
2. Zookeeper实战案例
2.1 创建项目引入依赖
2.2 获取zk客户端对象
2.3 常用API
2.4 客户端向服务端写入数据流程
2.5 服务器动态上下线、客户端动态监听
2…目录 1. Zookeeper集群操作
1.1 客户端操作zk集群
1.2 模拟集群异常操作
1.3 curate客户端连接zookeeper集群
2. Zookeeper实战案例
2.1 创建项目引入依赖
2.2 获取zk客户端对象
2.3 常用API
2.4 客户端向服务端写入数据流程
2.5 服务器动态上下线、客户端动态监听
2.6 测试
3.Zookeeper分布式锁
3.1 什么是分布式锁
3.2 Zookeeper分布式锁分析
3.3 分布式锁实现 1. Zookeeper集群操作
1.1 客户端操作zk集群
1) 第一步启动集群,启动后查看Zookeeper进程。 jps命令 作用是显示当前所有java 进程的pid 的命令QuorumPeerMain是zookeeper集群的启动入口类 2) 客户端连接 连接集群所有客户端
[rootlocalhost zookeeper-1]# ./bin/zkCli.sh -server 192.168.58.200:2181,192.168.58.200:2182,192.168.58.200:2183 连接集群单个客户端
# 连接2181
[rootlocalhost zookeeper-1]# ./bin/zkCli.sh -server 192.168.58.200:2181
# 连接2182
[rootlocalhost zookeeper-1]# ./bin/zkCli.sh -server 192.168.58.200:2182
# 在2181中创建节点
[zk: 192.168.58.200:2181(CONNECTED) 0] create /test2
# 在2182中查询发现数据已同步
[zk: 192.168.58.200:2182(CONNECTED) 0] ls /
[test1, test2, zookeeper]
以上两种方式的区别在于 如果只连接单个客户端如果当前连接的服务器挂掉当前客户端连接也会挂掉连接失败。 如果是连接所有客户端的形式则允许集群中半数以下的服务挂掉当半数以上服务挂掉才会停止服务可用性更高一点
3集群节点信息查看
集群中的节点信息被存放在每一个节点/zookeeper/config/目录下 1.2 模拟集群异常操作
Leader选举 Serverid : 服务器ID 三台服务 编号分别是 1 2 3 编号越大在选择算法中权重越大 Zxid: 数据ID 服务器中存放的最大的数据ID 值越大数据越新 在Leader选举的过程中 如果某台Zookeeper获得了超过半数的选票就可以当选Leader
1首先我们先测试如果是从服务器挂掉会怎么样
把3号服务器停掉观察1号和2号发现状态并没有变化
/usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh stop
/usr/local/zookeeper-cluster/zookeeper-1/bin/zkServer.sh status
/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh status 由此得出结论3个节点的集群从服务器挂掉集群正常
2我们再把1号服务器从服务器也停掉查看2号主服务器的状态发现已经停止运行了。
/usr/local/zookeeper-cluster/zookeeper-1/bin/zkServer.sh stop
/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh status 由此得出结论3个节点的集群2个从服务器都挂掉主服务器也无法运行。因为可运行的机器没有超过集群总数量的半数。
3我们再次把1号服务器启动起来发现2号服务器又开始正常工作了。而且依然是领导者。 4我们把3号服务器也启动起来把2号服务器停掉,停掉后观察1号和3号的状态。
/usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh start
/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh stop
/usr/local/zookeeper-cluster/zookeeper-1/bin/zkServer.sh status
/usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh status 发现新的leader产生了~
由此我们得出结论当集群中的主服务器挂了集群中的其他服务器会自动进行选举状态然后产生新得leader 。
5我们再次测试当我们把2号服务器重新启动起来启动后会发生什么2号服务器会再次成为新的领导吗我们看结果
/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh start
/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh status
/usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh status
我们会发现2号服务器启动后依然是跟随者从服务器3号服务器依然是领导者主服务器没有撼动3号服务器的领导地位。
由此我们得出结论当领导者产生后再次有新服务器加入集群不会影响到现任领导者。 1.3 curate客户端连接zookeeper集群
public class CuratorCluster {
//zookeeper连接private final static String CLUSTER_CONNECT 192.168.58.200:2181,192.168.58.200:2182,192.168.58.200:2183;
//session超时时间private static final int sessionTimeoutMs 60 * 1000;
//连接超时时间private static final int connectionTimeoutMs 5000;
private static CuratorFramework client;
public static String getClusterConnect() {return CLUSTER_CONNECT;}
Beforepublic void init(){
// 重试策略RetryPolicy retryPolicy new ExponentialBackoffRetry(3000,10);
// zookeeper连接client CuratorFrameworkFactory.builder().connectString(getClusterConnect()).sessionTimeoutMs(60*1000).connectionTimeoutMs(15*1000).retryPolicy(retryPolicy).namespace(mashibing) //当前程序创建目录的根目录.build();
// 添加监听器client.getConnectionStateListenable().addListener(new ConnectionStateListener() {Overridepublic void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {System.out.println(连接成功);}});
client.start();}
//创建节点public void createIfNeed(String path) throws Exception {Stat stat client.checkExists().forPath(path);if(stat null){String s client.create().forPath(path);System.out.println(创建节点 s);}}
//从集群中获取数据Testpublic void testCluster() throws Exception {createIfNeed(/test);
//每隔一段时间 获取一次数据while(true){byte[] data client.getData().forPath(/test);System.out.println(new String(data));
TimeUnit.SECONDS.sleep(5);}}
}
在集群中的任意服务器节点为test设置数据
[zk: 192.168.58.200:2181,192.168.58.200:2182,192.168.58.200:2183(CONNECTED) 2] set /mashibing/test 12345 2. Zookeeper实战案例
2.1 创建项目引入依赖
dependencygroupIdcom.101tec/groupIdartifactIdzkclient/artifactIdversion0.10/version
/dependency
2.2 获取zk客户端对象
public class ZkClientTest {private String connectString 192.168.58.200:2181,192.168.58.200:2182,192.168.58.200:2183;private int sessionTimeout 2000;private ZkClient zkClient;/*** 获取zk客户端连接*/Beforepublic void Before(){/*** 参数1服务器的IP和端口* 参数2会话的超时时间* 参数3会话的连接时间* 参数4序列化方式*/zkClient new ZkClient(connectString, sessionTimeout, 1000 * 15, new SerializableSerializer());}Afterpublic void after(){zkClient.close();}
}
2.3 常用API 创建节点 /*** 创建节点*/Testpublic void testCreateNode(){//创建方式 返回创建节点名称String nodeName zkClient.create(/node1, lisi, CreateMode.PERSISTENT);System.out.println(路径名称为 nodeName);zkClient.create(/node2,wangwu,CreateMode.PERSISTENT_SEQUENTIAL);zkClient.create(/node3,hehe,CreateMode.EPHEMERAL);zkClient.create(/node4,haha,CreateMode.EPHEMERAL_SEQUENTIAL);while(true){}} 删除节点 /*** 删除节点*/Testpublic void testDeleteNode(){// 删除没有子节点的节点
// boolean b1 zkClient.delete(/node2);
// System.out.println(删除成功 b1);// 递归删除节点信息boolean b2 zkClient.deleteRecursive(/node2);System.out.println(删除成功 b2);} 查看节点的子节点 /*** 查询节点的子节点*/Testpublic void testFindNodes(){//返回指定路径的节点信息ListString ch zkClient.getChildren(/);for (String c1 : ch) {System.out.println(c1);}} 查看当前节点的数据 注意:如果出现:org.I0Itec.zkclient.exception.ZkMarshallingError: java.io.StreamCorruptedException: invalid stream header: 61616161 异常的原因是: 在shell中的数据序列化方式 和 java代码中使用的序列化方式不一致导致 因此要解决这个问题只需要保证序列化一致即可 都使用相同端操作即可 /*** 获取节点数据*/Testpublic void testFindNodeData(){String nodeName zkClient.create(/node3, taotao, CreateMode.PERSISTENT);Object data zkClient.readData(/node3);System.out.println(data);} 查看当前节点的数据并获取状态信息 /*** 获取数据以及当前节点状态信息*/Testpublic void testFindNodeDataAndStat(){Stat stat new Stat();Object data zkClient.readData(/node20000000004, stat);System.out.println(data);System.out.println(stat);} 修改节点数据 /*** 修改节点*/Testpublic void testUpdateNodeData(){zkClient.writeData(/node3,123456);} 监听节点数据的变化 /*** 监听节点数据*/Testpublic void testNodeChange(){zkClient.subscribeDataChanges(/node3, new IZkDataListener() {// 当节点的值在修改时会自动调用这个方法Overridepublic void handleDataChange(String nodeName, Object result) throws Exception {System.out.println(节点名称 nodeName);System.out.println(节点数据 result);}// 当节点被删除时会调用该方法Overridepublic void handleDataDeleted(String nodeName) throws Exception {System.out.println(节点名称 nodeName);}});while(true){}} 监听节点目录的变化 /*** 监听节点目录的变化*/Testpublic void testNodesChange(){zkClient.subscribeChildChanges(/node3, new IZkChildListener() {Overridepublic void handleChildChange(String nodeName, ListString list) throws Exception {System.out.println(父节点名称 nodeName);System.out.println(发生变更后所有子节点名称 );for (String name : list) {System.out.println(name);}}});while(true){}} 判断某一个节点是否存在 //判断节点是否存在Testpublic void exist(){boolean exists zkClient.exists(/node3);System.out.println(exists true ? 节点存在 : 节点不存在);}
2.4 客户端向服务端写入数据流程 写流程之写入请求直接发送给Leader 写流程之写入请求发送给follower节点 2.5 服务器动态上下线、客户端动态监听 某分布式系统中主节点可以有多台可以动态上下线任意一台客户端都能实时感知到主节点服务器的上下线。 1根节点下创建servers节点
[zk: 192.168.58.200:2181,192.168.58.200:2182,192.168.58.200:2183(CONNECTED) 0]
create /servers servers
Created /servers
2服务端代码
完成服务端向zookeeper注册、动态上下线的代码。
/*** 服务端*/
public class DistributeServer {private ZooKeeper client;// 连接信息private String connectString 192.168.58.200:2181,192.168.58.200:2182,192.168.58.200:2183;// 超时时间private int sessionTimeOut 30000;public static void main(String[] args) throws Exception {DistributeServer server new DistributeServer();//1.获取zk连接server.getConnect();//2.将服务器注册到zk集群args参数通过启动 main方法时传入即可server.register(args[0]);//3.启动业务逻辑线程睡眠Thread.sleep(Long.MAX_VALUE);}/*** 注册操作* param hostName 将服务器注册到zk集群时所需的服务名称*/private void register(String hostName) throws Exception {/*** ZooDefs.Ids.OPEN_ACL_UNSAFE: 此权限表示允许所有人访问该节点服务器* CreateMode.EPHEMERAL_SEQUENTIAL: 由于服务器是动态上下线的上线后存在下线后不存在所以是临时节点* 而服务器一般都是有序号的所以是临时、有序的节点.*/String node client.create(/servers/ hostName,hostName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);System.out.println(已成功创建 node 节点);System.out.println(hostName 已经上线);}/*** 获取连接*/private void getConnect() throws IOException {client new ZooKeeper(connectString, sessionTimeOut, new Watcher() {Overridepublic void process(WatchedEvent watchedEvent) {}});}
}
2客户端代码
服务端代码写好之后再来完成客户端动态监听zk服务端各个节点的代码。
/*** 客户端*/
public class DistributeClient {private ZooKeeper zk;// 连接信息private String connectString 192.168.58.200:2181,192.168.58.200:2182,192.168.58.200:2183;// 超时时间private int sessionTimeOut 30000;public static void main(String[] args) throws Exception {DistributeClient client new DistributeClient();//1.获取zk连接client.getConnection();//2.监听 /servers下面所有的子节点变化client.getServerList();//3.业务逻辑Thread.sleep(Long.MAX_VALUE);}/*** 获取连接*/private void getConnection() throws Exception {zk new ZooKeeper(connectString, sessionTimeOut, new Watcher() {Overridepublic void process(WatchedEvent watchedEvent) {//监听服务器地址的上下线try {getServerList();} catch (Exception e) {e.printStackTrace();}}});}/*** 监听 /servers路径下的所有子节点变化true表示启动监听器*/private void getServerList() throws Exception {ListString zkChildren zk.getChildren(/servers, true);ListString servers new ArrayList();zkChildren.forEach(node - {//拼接服务完整信息try {byte[] data zk.getData(/servers/ node, false, null);servers.add(new String(data));} catch (Exception e) {e.printStackTrace();}});System.out.println(servers);System.out.println();}
}
2.6 测试
1Zookeeper命令行完成测试
[zk: localhost:2181(CONNECTED) 1] create -e -s /servers/zk01 192.168.58.200:2181
Created /servers/zk010000000000 [zk: localhost:2181(CONNECTED) 2] create -e -s /servers/zk02 192.168.58.200:2182
Created /servers/zk020000000001 [zk: localhost:2181(CONNECTED) 3] create -e -s /servers/zk03 192.168.58.200:2183
Created /servers/zk030000000002 上面的执行结果可以看到在servers下面依次创建子结点客户端代码都可以成功监听到。
下面我们删除节点看看客户端能不能做到动态监听功能也即删除的节点不会再被监听到。
[zk: localhost:2181(CONNECTED) 5] delete /servers/zk010000000000[zk: localhost:2181(CONNECTED) 7] delete /servers/zk020000000001[zk: localhost:2181(CONNECTED) 8] delete /servers/zk030000000002 使用Java代码来测试 先启动客户端代码 再启动服务端代码 只是在服务端代码中我们的 register 方法中传参用到了 args 所以启动之前要传入这个参数。 传入 192.168.58.200 之后可以看到服务端代码已经能够实现动态上线了。 这里我们的 192.168.58.200 动态上线之后可以看到客户端也正常的监听到它了。 转到zk命令行中也可以看到这台服务器的节点信息。 由于我们之前的服务端代码还启动着此时我们再传入新的参数 192.168.58.200:2182那么之前的2181服务器肯定会被挤掉这里模拟就是main方法同一个类肯定只能同时启动一次main方法了那么我们看看客户端能不能动态监听到2181下线、2182上线。 服务端自然可以正常实现2182这台服务器的动态上线。在客户端代码中可以看到List集合中已经没有2181了即2181已经下线了而2182正常上线 3.Zookeeper分布式锁
3.1 什么是分布式锁
传统单体应用单机部署的情况下可以使用并发处理相关的功能进行互斥控制但是原单体单机部署的系统被演化成分布式集群系统后由于分布式系统多线程、多进程并且分布在不同机器上这将使原单机部署情况下的并发控制锁策略失效。提出分布式锁的概念是为了解决跨机器的互斥机制来控制共享资源的访问。 3.2 Zookeeper分布式锁分析
客户端对zookeeper集群而言向zookeeper集群进行上线注册,并在一个永久节点下创建有序的临时子节点后根据编号顺序最小顺序的子节点获取到锁其他子节点由小到大监听前一个节点。 当拿到锁的节点处理完事务后释放锁后一个节点监听到前一个节点释放锁后立刻申请获得锁以此类推 3.3 分布式锁实现
1创建 Distributedlock类, 获取与zookeeper的连接 构造方法中获取连接 添加 CountDownLatch CountDownLatch是具有synchronized机制的一个工具目的是让一个或者多个线程等待直到其他线程的一系列操作完成。 CountDownLatch初始化的时候需要提供一个整形数字数字代表着线程需要调用countDown()方法的次数当计数为0时线程才会继续执行await()方法后的其他内容。
/*** 分布式锁*/
public class DistributedLock {private ZooKeeper client;// 连接信息private String connectString 192.168.58.200:2181,192.168.58.200:2182,192.168.58.200:2183;// 超时时间private int sessionTimeOut 30000;private CountDownLatch countDownLatch new CountDownLatch(1);//1. 在构造方法中获取连接public DistributedLock() throws Exception {client new ZooKeeper(connectString, sessionTimeOut, new Watcher() {Overridepublic void process(WatchedEvent watchedEvent) {}});//等待Zookeeper连接成功连接完成继续往下走countDownLatch.await();//2. 判断节点是否存在}//3.对ZK加锁public void zkLock(){//创建 临时带序号节点//判断 创建的节点是否是最小序号节点如果是 就获取到锁如果不是就监听前一个节点}//4.解锁public void unZkLock(){//删除节点}
}
2对zk加锁
/*** 分布式锁*/
public class DistributedLock {private ZooKeeper client;// 连接信息private String connectString 192.168.58.200:2181,192.168.58.200:2182,192.168.58.200:2183;// 超时时间private int sessionTimeOut 30000;// 等待zk连接成功private CountDownLatch countDownLatch new CountDownLatch(1);// 等待节点变化private CountDownLatch waitLatch new CountDownLatch(1);//当前节点private String currentNode;//前一个节点路径private String waitPath;//1. 在构造方法中获取连接public DistributedLock() throws Exception {client new ZooKeeper(connectString, sessionTimeOut, new Watcher() {Overridepublic void process(WatchedEvent watchedEvent) {//countDownLatch 连上ZK可以释放if(watchedEvent.getState() Event.KeeperState.SyncConnected){countDownLatch.countDown();}//waitLatch 需要释放 (节点被删除并且删除的是前一个节点)if(watchedEvent.getType() Event.EventType.NodeDeleted watchedEvent.getPath().equals(waitPath)){waitLatch.countDown();}}});//等待Zookeeper连接成功连接完成继续往下走countDownLatch.await();//2. 判断节点是否存在Stat stat client.exists(/locks, false);if(stat null){//创建一下根节点client.create(/locks,locks.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}}//3.对ZK加锁public void zkLock(){//创建 临时带序号节点try {currentNode client.create(/locks/ seq-, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);//判断 创建的节点是否是最小序号节点如果是 就获取到锁如果不是就监听前一个节点ListString children client.getChildren(/locks, false);//如果创建的节点只有一个值就直接获取到锁如果不是监听它前一个节点if(children.size() 1){return;}else{//先排序Collections.sort(children);//获取节点名称String nodeName currentNode.substring(/locks/.length());//通过名称获取该节点在集合的位置int index children.indexOf(nodeName);//判断if(index -1){System.out.println(数据异常);}else if(index 0){//就一个节点可以获取锁return;}else{//需要监听前一个节点变化waitPath /locks/ children.get(index-1);client.getData(waitPath,true,null);//等待监听执行waitLatch.await();return;}}} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}
}
3zk删除锁 //4.解锁public void unZkLock() throws KeeperException, InterruptedException {//删除节点client.delete(currentNode,-1);}
4测试
public class DistributedLockTest {
public static void main(String[] args) throws Exception {
final DistributedLock lock1 new DistributedLock();final DistributedLock lock2 new DistributedLock();
new Thread(new Runnable() {Overridepublic void run() {
try {lock1.zkLock();System.out.println(线程1 启动 获取到锁);
Thread.sleep(5 * 1000);lock1.unZkLock();System.out.println(线程1 释放锁);} catch (InterruptedException | KeeperException e) {e.printStackTrace();}}}).start();
new Thread(new Runnable() {Overridepublic void run() {
try {lock2.zkLock();System.out.println(线程2 启动 获取到锁);
Thread.sleep(5 * 1000);lock2.unZkLock();System.out.println(线程2 释放锁);} catch (InterruptedException | KeeperException e) {e.printStackTrace();}}}).start();}
}