
- 一、什么是zookeeper
- 1、zookeeper概述
- 2、zookeeper特点
- 3、zookeeper数据结构
- 4、zookeeper应用场景
- 二、Zookeeper本地安装
- 1、本地模式安装
- 2、配置参数解读
- 三、Zookeeper集群 *** 作
- 1、集群 *** 作
- 1.1集群安装
- 1.2 选举机制(面试重点)
- 1.3 ZK集群启动停止脚本
- 2、客户端命令行 *** 作
- 2.1 命令行基本语法
- 2.2 znode节点数据信息
- 2.3 节点类型
- 2.4 监听器原理
- 2.5 节点删除与查看
- 3、客户端API *** 作
- 3.1 创建zookeeper客户端
- 3.2 创建子节点
- 3.3 获取子节点并监听节点变化
- 3.4 判断Znode是否存在
- 4、客户端向服务端写数据流程
- 四、服务器动态上下线监听案例
- 1、需求
- 2、需求分析
- 3、具体实现
- 五、zookeeper分布式锁案例
- 1、JAVA原生API *** 作
- 2、Curator框架实现分布式锁
- 六、ZAB协议
- 1. Zab协议内容
- 2. 消息广播
- 3. 崩溃恢复
- 七、CAP
- 1.CAP理论
- 2.zookeeper的CP理论
- Zookeeper是一个开源的分布式的,为分布式框架提供协调服务的Apache项目。
- 什么是分布式?分布式就可以理解为多台服务器共同完成一个复杂的任务。
没错我就是zookeeper~~
你可以形象的觉得它就是一个铲屎官,为其他组件提供统一服务。
- Zookeeper的工作机制
zookeeper从设计模式上来说的话:是一个基于观察者模式设计的分布式服务管理框架。
它负责存储和管理大家都关心的数据,然后接受观察者的注册,一旦这些数据发生了变化,zookeeper就负责通知已经在zookeeper上注册的那些观察者做出相应的反应。
- 那什么是观察者模式?
当对象间存在一对多关系时,则使用观察者模式(Observer Pattern)。比如,当一个对象被修改时,则会自动通知依赖它的对象。
上图就是zookeeper的大概工作机制了。
形象的讲一下,图中包括,服务器,zookeeper集群,客户端。这里我们分别形象的比喻成洗脚城的技师,洗脚城的预约APP,以及预约洗脚的客户。
首先服务端启动时会去zookeeper集群注册信息并创建临时节点,什么意思呢?技师要上班肯定要去APP注册上自己的信息,这样客人才能预约她,但是临时节点的原因就是,她可能有一天突然有事走了。
这是zookeeper集群里面就存着服务器端注册的信息。
然后客户端从zookeeper集群获取到当前在线的服务器列表,并且注册监听。什么意思呢?就是客户要从这个APP上能看到现在是哪个技师在岗,然后要在APP上注册信息,你只要注册你才能去预约或者APP给你发推送嘛。
此时这三者之前就形成了一条龙的关系。
有一天,一个服务器宕机了,那这个zookeeper集群收到宕机后会通知客户端,服务器节点下线了,然后客户端会process()重新再获取服务器的列表,重新注册监听。什么意思呢?就是有一天,技师小李家中有事,回家了,这时APP上没有了小李的信息,然后同时通知这个客户,说,我这里小李不在了,那么这个客户会收到这个通知,并获取到新的一个技师在线列表,并对这些技师的在岗信息进行一个预约。
ok,over!! - 综上,我们就说**zookeeper=文件系统+通知机制**
文件系统就是可以存储服务器的节点信息;通知机制就是如果节点信息丢失可以及时对客户端进行通知。
上图中每一个Server代表一个安装Zookeeper服务的服务器。组成 ZooKeeper 服务的服务器都会在内存中维护当前的服务器状态,并且每台服务器之间都互相保持着通信。集群间通过 Zab 协议(Zookeeper Atomic Broadcast)来保持数据的一致性。(这个协议后边说)
- zookeeper:一个领导者(Leader),多个跟随着(Follower)组成的集群。
- 集群中只要有半数以上的节点存活,zookeeper集群就可以正常服务。
- 全局数据一致:每个server保存一份相同的数据副本,Client无论连接到哪个客server,数据都是一致的。
- 更新请求顺序执行,来自同一个client的更新请求按其发送顺序依次执行。对于来自客户端的每个更新请求,ZooKeeper 都会分配一个全局唯一的递增编号,这个编号反应了所有事务 *** 作的先后顺序, 这个编号也叫做时间戳——zxid
- 数据更新原子性,一次数据要么全部更新成功,要么失败。所有事务请求的处理结果在整个集群中所有机器上的应用情况是一致的,也就是说,要么整个集群中所有的机器都成功应用了某一个事务,要么都没有应用。
- 实时性,在一定时间内,client能读到最新数据。
4、zookeeper应用场景zookeeper数据模型的结构,整体上可以看作是一棵树,每个节点称作一个ZNode,每一个ZNode默认能够存储1MB的数据,每个ZNode都可以通过其路径唯一标识。
提供的服务包括:统一命名服务、统一配置管理、统一集群管理、服务器节点动态上下线、负载均衡等
- 统一命名服务
因为我们在分布式的环境下,通过使用命名服务,对应用或者服务统一命名,客户端可以根据指定名字来获取资源或服务的地址,提供者等信息,这样便于识别。
例如:IP不容易记住,而域名容易记住。
- 统一配置管理
1)分布式环境下,配置文件同步很常见
(a).一般要求一个集群中,所有节点的配置信息是一致的,比如Kafka集群。
(b).对配置文件修改后,希望能快速同步到各个节点上
2)配置管理可交由zookeeper实现
(a).可将配置信息写入zookeeper上的一个ZNode。
(b).每个客户端服务器监听这个ZNode,一旦ZNode中的数据被修改,zookeeper将通知每个客户端服务器。
其实就是我们所说的发布与订阅模型:
应用中用到的一些配置信息放到ZK上进行集中管理。这类场景通常是这样:应用在启动的时候会主动来获取一次配置,同时,在节点上注册一个Watcher,这样一来,以后每次配置有更新的时候,都会实时通知到订阅的客户端,从来达到获取最新配置信息的目的。
- 统一集群管理
分布式环境中,实时掌握每个节点的状态是必要的,可以根据节点状态做出实时的调整。
ZooKeeper可以实现实时监控节点状态变化
(1)可将节点信息写入ZooKeeper上的一个ZNode。
(2)监听这个ZNode可获取它的实时状态变化。
- 服务器动态上下线
客户端能实时的感知服务器上下线的变化
- 软负载均衡
在Zookeeper中记录每台服务器的访问数,让访问数最少的服务器去处理最新的客户端请求
那么假如来了新的客户端访问,会优先访问那些访问数少的服务器,达到一个负载均衡。
- 安装前准备
1)安装JDK
2)拷贝 apache-zookeeper-3.5.7-bin.tar.gz 安装包到 Linux 系统下
3)解压到指定目录,我这里到software下(自己创建的)
tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/module/
4)修改一下名字,因为太长感觉不方便
mv apache-zookeeper-3.5.7 -bin/ zookeeper-3.5.7
- 配置修改
1)先改下配置文件的名字,因为感觉太长,这个文件在conf路径下
mv zoo_sample.cfg zoo.cfg
2)打开这个配置文件,修改dataDir路径,为啥要修改,因为/tmp/一般都存储临时的文件,到一定时间会自动删除,所以我们把它修改为自己创建的文件里
vim zoo.cfg #修改 dataDir=/opt/module/zookeeper-3.5.7/zkData
- *** 作zookeeper
1)启动zookeeper(bin目录下的zkServer.sh)
[atguigu@hadoop102 zookeeper-3.5.7]$ bin/zkServer.sh start
2)查看进程是否成功
[atguigu@hadoop102 zookeeper-3.5.7]$ jps 4020 Jps 4001 QuorumPeerMain
3)查看状态(status)
[atguigu@hadoop102 zookeeper-3.5.7]$ bin/zkServer.sh status ZooKeeper JMX enabled by default Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg Mode: standalone
4)启动客户端(bin下的zkCli.sh,启动客户端不需要start)
[atguigu@hadoop102 zookeeper-3.5.7]$ bin/zkCli.sh
5)退出客户端
[zk: localhost:2181(CONNECTED) 0] quit
6)停止zookeeper
bin/zkServer.sh stop2、配置参数解读
Zookeeper中的配置文件zoo.cfg中参数含义解读如下:
-
tickTime=2000:
通知心跳时间,zookeeper服务器与客户端心跳时间。毫秒。
-
initLimit=10:LF初始通信时限
Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量)===>也就是10*2000ms,如果大于这个,就认为没连上。 -
syncLimit=5:LF同步通信时限
Leader和Follower之间通信时间如果超过syncLimit * tickTime(也就死5*2000ms),Leader认为Follwer死掉,从服务器列表中删除Follwer。 -
dataDir:保存zookeeper中的数据
注意:默认的tmp目录,容易被Linux系统定期删除,所以一般不用默认的tmp目录。 -
clientPort = 2181:客户端连接端口,通常不做修改。
- 1)集群规划,安多少台合适?
生产集群安装多少 zk 合适?
安装奇数台
生产经验:
⚫ 10 台服务器:3 台 zk;
⚫ 20 台服务器:5 台 zk;
⚫ 100 台服务器:11 台 zk;
⚫ 200 台服务器:11 台 zk
服务器台数多:好处,提高可靠性;坏处:提高通信延时
- 2) 解压安装
tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/module/ #顺便改个名字,方便点 mv apache-zookeeper-3.5.7-bin/ zookeeper-3.5.7
- 3)配置服务器编号
在/opt/module/zookeeper-3.5.7/这个目录下创建 zkData
mkdir zkData
并在这个文件下创建一个myid文件,在里面配置集群的编号,并分发
vi myid 2 xsync zookeeper-3.5.7
在其它集群配置各自的编号。
- 4)配置zoo.cfg文件
先改个名字,方便点。重命名/opt/module/zookeeper-3.5.7/conf 这个目录下的 zoo_sample.cfg 为 zoo.cfg
mv zoo_sample.cfg zoo.cfg
打开修改它的数据存储路径,并增加配置
vim zoo.cfg dataDir=/opt/module/zookeeper-3.5.7/zkData #######################cluster########################## server.2=hadoop102:2888:3888 server.3=hadoop103:2888:3888 server.4=hadoop104:2888:3888
对上面的参数解读一下
server.A=B:C:D
这里的A是一个数字,表示第几号服务器。刚才我们创建并配置了myid文件,也就是配置了我们的服务器编号,这个编号也就是A的值。
为什么要配置这个A?Zookeeper 启动时读取此文件,拿到里面的数据(就是那个编号)与 zoo.cfg 里面的配置信息比较从而判断到底是哪个 server。
这里的B数字是这个服务器的地址。
这里的C是这个服务器Follower与集群中的leader服务器交换信息的窗口。也就是它们用来通讯的端口号。
这里的D是万一集群中的leader服务器挂了,需要一个端口来重新进行选举,选出一个新的leader,而这个端口就使用来执行选举时服务器相互通信的端口。
接下来同步分发配置文件
xsync zoo.cfg
- 5)集群 *** 作
(1)分别启动zookeeper(bin/zkServer.sh start)
[atguigu@hadoop102 zookeeper-3.5.7]$ bin/zkServer.sh start [atguigu@hadoop103 zookeeper-3.5.7]$ bin/zkServer.sh start [atguigu@hadoop104 zookeeper-3.5.7]$ bin/zkServer.sh start
(2)查看状态( bin/zkServer.sh status)
[atguigu@hadoop102 zookeeper-3.5.7]# bin/zkServer.sh status JMX enabled by default Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg Mode: follower [atguigu@hadoop103 zookeeper-3.5.7]# bin/zkServer.sh status JMX enabled by default Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg Mode: leader [atguigu@hadoop104 zookeeper-3.4.5]# bin/zkServer.sh status JMX enabled by default Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg Mode: follower1.2 选举机制(面试重点)
- zookeeper选举机制——第一次启动时
(1)服务器1启动,发起一次选举,服务器1投自己一票,此时服务器1票数一票,不够半数以上(3票),选举无法完成,服务器1状态保持为LOOKING;
(2)服务器2启动,再发起一次选举,服务器1和服务器2分别投自己一票并交换选票信息:此时服务器1发现服务器2的myid比自己目前投票推举的(服务器1) 大,更改选票为推举服务器2。此时服务器1票数0票,服务器2票数2票,没有半数以上结果,选举无法完成,服务器1,2状态保持LOOKING;
(3)服务器3启动,发起一次选举。此时服务器1和2都会更改选票为服务器3。此次投票结果:服务器1为0票,服务器2为0票,服务器3为3票。此时服
务器3的票数已经超过半数,服务器3当选Leader。服务器1,2更改状态为FOLLOWING,服务器3更改状态为LEADING;
(4)服务器4启动,发起一次选举。此时服务器1,2,3已经不是LOOKING状态,不会更改选票信息。交换选票信息结果:服务器3为3票,服务器4为 1票。此时服务器4服从多数,更改选票信息为服务器3,并更改状态为FOLLOWING;
(5)服务器5启动,同4一样当小弟。
了解非第一次启动前,先介绍几个代名词:
SID:服务器ID。用来唯一标识一台ZooKeeper集群中的机器,每台机器不能重复,和myid一致。
ZXID:事务ID。ZXID是一个事务ID,用来标识一次服务器状态的变更。在某一时刻,集群中的每台机器的ZXID值不一定完全一致,这和ZooKeeper服务器对于客户端“更新请求”的处理逻辑有关。
Epoch:每个Leader任期的代号。没有Leader时投票选举新leader。没有Leader时同一轮投票过程中的逻辑时钟值是相同的每投完一次票这个数据就会增加
- zookeeper选举机制——非第一次启动
(1)当zookeeper集群中的一台服务器出现以下两种情况之一时,就会开始进入Leader选举:
- 服务器初始化启动(这个上面以及讨论过了)
- 服务器运行期间无法和leader保持连接
(2)而当一台机器进入Leader选举流程时,当前集群也可能会处于以下两种状态:
- 集群中本来就已经存在一个Leader。
对于这种已经存在Leader的情况,机器试图去选举Leader时,会被告知当前服务器的Leader信息,对于该机器来说,仅仅需要和Leader机器建立连接,并进行状态同步即可。
上面的通俗来说就是,这个服务器和leader连不上了,他自以为leader坏了,于是试图去选举leader,而和别的服务器商量选举时,结果别的服务器告诉他现在有leader,那他就重新连leader,同步一下状态就可以了。 - 集群中确实不存在Leader(重点)
假设ZooKeeper由5台服务器组成,SID分别为1、2、3、4、5,ZXID分别为8、8、8、7、7,并且此时SID为3的服务器是Leader。某一时刻,3和5服务器出现故障,因此开始进行Leader选举。最后选了服务器2。
选举Leader规则: ①EPOCH大的直接胜出 ②EPOCH相同,事务id大的胜出 ③事务id相同,服务器id大的胜出
为啥要写,因为懒,因为图方便
在 一个服务器下的/home/atguigu/bin 目录下创建脚本zk.sh(名字随你),并在脚本编写群起启动语句。
[atguigu@hadoop102 bin]$ vim zk.sh
#!/bin/bash
case in
"start"){
for i in hadoop102 hadoop103 hadoop104
do
echo ---------- zookeeper $i 启动 ------------
ssh $i "/opt/module/zookeeper-.5.7/bin/zkServer.sh start"
done
};;
"stop"){
for i in hadoop102 hadoop103 hadoop104
do
echo ---------- zookeeper $i 停止 ------------
ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh stop"
done
};;
"status"){
for i in hadoop102 hadoop103 hadoop104
do
echo ---------- zookeeper $i 状态 ------------
ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh status"
done
};;
esac
给脚本增加个权限,这样就变成可以运行状态了
chmod 777 zk.sh
测试一下启动和停止
zk.sh start zk.sh stop2、客户端命令行 *** 作 2.1 命令行基本语法
(1)启动客户端
我这里指定启动102的客户端,而不是localhost的
bin/zkCli.sh -server hadoop102:2181
(2)显示所有的 *** 作命令
[zk: hadoop102:2181(CONNECTED) 1] help2.2 znode节点数据信息
1)查看当前znode中所包含的内容
[zk: hadoop102:2181(CONNECTED) 0] ls / [zookeeper]
2)查看当前节点详细数据(-s)
[zk: hadoop102:2181(CONNECTED) 5] ls -s / [zookeeper]cZxid = 0x0 ctime = Thu Jan 01 08:00:00 CST 1970 mZxid = 0x0 mtime = Thu Jan 01 08:00:00 CST 1970 pZxid = 0x0 cversion = -1 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 0 numChildren = 1
下面对上面的节点数据信息进行解析:
2.3 节点类型(1)czxid:创建节点的事务 zxid(create zxid)
每次修改 ZooKeeper 状态都会产生一个 ZooKeeper 事务 ID。事务 ID 是 ZooKeeper 中所有修改总的次序。每次修改都有唯一的 zxid,如果 zxid1 小于 zxid2,那么 zxid1 在 zxid2 之前发生。
(2)ctime:znode 被创建的毫秒数(从 1970 年开始)
(3)mzxid:znode 最后更新的事务 zxid
(4)mtime:znode 最后修改的毫秒数(从 1970 年开始)
(5)pZxid:znode 最后更新的子节点 zxid
(6)cversion:znode 子节点变化号,znode 子节点修改次数
(7)dataversion:znode 数据变化号
(8)aclVersion:znode 访问控制列表的变化号
(9)ephemeralOwner:如果是临时节点,这个是 znode 拥有者的 session id。如果不是临时节点则是 0。
(10)dataLength:znode 的数据长度
(11)numChildren:znode 子节点数量
- 节点类型——持久(Persistent)
客户端和服务器端断开连接后,创建的节点不删除
(1)持久化目录节点
客户端与zookeeper断开连接后,该节点依旧存在,下图zonde1
(2)持久化顺序编号目录节点
客户端与zookeeper断开连接后,该节点依旧存在,只是zookeeper给该节点名称进行顺序编号,下图znode2
- 节点类型——短暂(Ephemeral)
客户端和服务器端断开连接后,创建的节点自己删除
(1)临时目录节点
客户端与zookeeper断开连接后,该节点被删除,下图znode3
(2)临时顺序编号目录节点
客户端与zookeeper断开连接后,该节点被删除,只是zookeeper给该节点名称进行顺序编号,下图znode4
注:顺序号在分布式系统中,用于全局排序,客户端也可以通过顺序号推断事件的顺序。
- 分别创建两个普通节点(永久节点+不带序号)
[zk: localhost:2181(CONNECTED) 3] create /sanguo "diaochan" Created /sanguo [zk: localhost:2181(CONNECTED) 4] create /sanguo/shuguo "liubei" Created /sanguo/shuguo
注意:创建节点时,要赋值
- 获得节点的值
[zk: localhost:2181(CONNECTED) 5] get -s /sanguo diaochan cZxid = 0x100000003 ctime = Wed Aug 29 00:03:23 CST 2018 mZxid = 0x100000003 mtime = Wed Aug 29 00:03:23 CST 2018 pZxid = 0x100000004 cversion = 1 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 7 numChildren = 1 [zk: localhost:2181(CONNECTED) 6] get -s /sanguo/shuguo liubei cZxid = 0x100000004 ctime = Wed Aug 29 00:04:35 CST 2018 mZxid = 0x100000004 mtime = Wed Aug 29 00:04:35 CST 2018 pZxid = 0x100000004 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 6 numChildren = 0
- 创建带序号的节点(永久节点 + 带序号)
先创建个普通根节点
[zk: localhost:2181(CONNECTED) 1] create /sanguo/weiguo "caocao" Created /sanguo/weiguo
然后创建带序号的节点
[zk: localhost:2181(CONNECTED) 2] create -s /sanguo/weiguo/zhangliao "zhangliao" Created /sanguo/weiguo/zhangliao0000000000 [zk: localhost:2181(CONNECTED) 3] create -s /sanguo/weiguo/zhangliao "zhangliao" Created /sanguo/weiguo/zhangliao0000000001 [zk: localhost:2181(CONNECTED) 4] create -s /sanguo/weiguo/xuchu "xuchu" Created /sanguo/weiguo/xuchu0000000002
注:
如果原来没有序号节点,序号从0开始依次递增。如果源节点下已有两个节点,则排序时从2开始,以此类推。
如果不带序号的节点已经创建了,在创建却会报错,但是带序号的不会报错,而是再创建一个,序号不一样而已。
- 创建短暂节点(短暂节点 + 不带序号 or 带序号)
创建短暂的不带序号的节点
[zk: localhost:2181(CONNECTED) 7] create -e /sanguo/wuguo "zhouyu" Created /sanguo/wuguo
创建短暂的带序号的节点
[zk: localhost:2181(CONNECTED) 2] create -e -s /sanguo/wuguo "zhouyu" Created /sanguo/wuguo0000000001
在当前客户端是能查看到的
[zk: localhost:2181(CONNECTED) 3] ls /sanguo [wuguo, wuguo0000000001, shuguo]
退出当前客户端然后再重启客户端
[zk: localhost:2181(CONNECTED) 12] quit [atguigu@hadoop104 zookeeper-3.5.7]$ bin/zkCli.sh
再次查看根目录下短暂节点已经删除
[zk: localhost:2181(CONNECTED) 0] ls /sanguo [shuguo]
- 修改节点数据值
[zk: localhost:2181(CONNECTED) 6] set /sanguo/weiguo "simayi"2.4 监听器原理
客户端注册监听它关心的目录节点,当目录节点发生变化(数据改变、节点删除、子目录节点增加删除)时,ZooKeeper 会通知客户端。监听机制保证 ZooKeeper 保存的任何的数据的任何改变都能快速的响应到监听了该节点的应用程序。
- 监听器的原理
(1)在Zookeeper的API *** 作中,创建main()主方法即主线程;
(2)在main线程中创建Zookeeper客户端(zkClient),这时会创建两个线程:
线程connet负责网络通信连接,连接服务器; 线程Listener负责监听;
(3)客户端通过connet线程连接服务器;
图中getChildren("/" , true) ," / "表示监听的是根目录,true表示监听,不监听用false
(4)在Zookeeper的注册监听列表中将注册的监听事件添加到列表中,表示这个服务器中的/path,即根目录这个路径被客户端监听了;
(5)一旦被监听的服务器根目录下,数据或路径发生改变,Zookeeper就会将这个消息发送给Listener线程;
(6)Listener线程内部调用process方法,采取相应的措施,例如更新服务器列表等。
- 节点的值变化监听
(1)在 hadoop104 主机上注册监听/sanguo 节点数据变化
[zk: localhost:2181(CONNECTED) 26] get -w /sanguo
(2)在 hadoop103 主机上修改/sanguo 节点的数据
[zk: localhost:2181(CONNECTED) 1] set /sanguo "xisi"
(3)观察 hadoop104 主机收到数据变化的监听
WATCHER:: WatchedEvent state:SyncConnected type:NodeDataChanged path:/sanguo
注意:注意:在hadoop103再多次修改/sanguo的值,hadoop104上不会再收到监听。因为注册一次,只能监听一次。想再次监听,需要再次注册。
- 节点的子节点变化监听(路径变化)
(1)在 hadoop104 主机上注册监听/sanguo 节点的子节点变化
[zk: localhost:2181(CONNECTED) 1] ls -w /sanguo [shuguo, weiguo]
(2)在 hadoop103 主机/sanguo 节点上创建子节点
[zk: localhost:2181(CONNECTED) 2] create /sanguo/jin "simayi" Created /sanguo/jin
(3)观察 hadoop104 主机收到子节点变化的监听
WATCHER:: WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/sanguo
注意:节点的路径变化,也是注册一次,生效一次。想多次生效,就需要多次注册。
2.5 节点删除与查看- 节点的删除
[zk: localhost:2181(CONNECTED) 4] delete /sanguo/jin
- 递归删除节点
[zk: localhost:2181(CONNECTED) 15] deleteall /sanguo/shuguo
- 查看节点状态
[zk: localhost:2181(CONNECTED) 17] stat /sanguo cZxid = 0x100000003 ctime = Wed Aug 29 00:03:23 CST 2018 mZxid = 0x100000011 mtime = Wed Aug 29 00:21:23 CST 2018 pZxid = 0x100000014 cversion = 9 dataVersion = 1 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 4 numChildren = 13、客户端API *** 作
前提:你的zookeeper集群要启动起来
3.1 创建zookeeper客户端 //要连哪个集群 注意,如果多个集群,逗号之间千万别加空格
private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
private int sessionTimeout = 2000; //连接超时
private ZooKeeper zkClient;
//初始化
@Before
public void init() throws IOException {
zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
public void process(WatchedEvent watchedEvent) {
// 收到事件通知后的回调函数(用户的业务逻辑)
// 再次启动监听
System.out.println("==========");
List children = null;
try {
children = zkClient.getChildren("/", true);//此时监听器使用的客户端上面的哪个匿名内部类watcher
for (String child : children) {
System.out.println(child);
}
System.out.println("**********");
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
3.2 创建子节点
@Test
public void create() throws KeeperException, InterruptedException {
// 参数 1:要创建的节点的路径; 参数 2:节点数据 ; 参数 3:节点权限 ;
//参数 4:节点的类型(持久 临时)
String nodeCreated = zkClient.create("/atguigu", "ss.avi".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
3.3 获取子节点并监听节点变化
//监听
@Test
public void getChildren() throws KeeperException, InterruptedException {
//参数说明: 1.监听路径 2.监听器 或 监听器为true(走的自己的初始化的watcher)
//获取路径下的所有节点集合
//注册监听因为用一次就会失效 ,所以把下面的代码再拿一份放到上面的new watcher里再注册一次
List children = zkClient.getChildren("/", true);//此时监听器使用的客户端上面的哪个匿名内部类watcher
for (String child : children) {
System.out.println(child);
}
//为了得到监听,不让程序结束,加个延时
Thread.sleep(Long.MAX_VALUE);
}
3.4 判断Znode是否存在
//判断某个节点是否存在
@Test
public void exist() throws KeeperException, InterruptedException {
//参数,1.哪个路径 2.是否开启监听
Stat exists = zkClient.exists("/atguigu", true);
System.out.println(exists == null ? "not" : "yes");
}
4、客户端向服务端写数据流程
- 写流程之写入请求直接发送给Leader节点
客户端 向服务器数据写流程
1.当clinet向zookeeper的leader上写数据,发送一个写请求
2.这个Leader会把写请求广播给各个server,当各个server写成功后就会通知Leader.
3.当Leader收到大多数Server写成功了,那么说明数据是写成功了,如果此时集群的节点是3个的话,只要有两个集群写成功后,就认为是成功了,写成功后,Leader通知Client写成功, 这时就认为写是成功了。
- 写流程之写入请求发送给follower节点
客户端 向服务器数据写流程
1.当clinet向zookeeper的某个server上写数据,发送一个写请求
2.如果接收到请求的不是Leader,那么server会把请求转给Leader,因为zookeeper的集群中只有一个是Leader,这个Leader会把写请求广播给各个server,当各个server写成功后就会通知Leader.
3.当Leader收到大多数Server写成功了,那么说明数据是写成功了,如果此时集群的节点是3个的话,只要有两个集群写成功后,就认为是成功了,写成功后,Leader会告知向他提交申请的server,
4.Server会进一步将通知Client写成功, 这时就认为写是成功了。
四、服务器动态上下线监听案例 1、需求某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知
到主节点服务器的上下线。
洗脚城的例子
(1)先在集群上创建/server节点
[zk: localhost:2181(CONNECTED) 10] create /servers "servers" Created /servers
(2)服务端向zookeeper注册代码
技师向APP注册信息
//服务端和zookeeper集群创建连接
public class DistributeServer {
private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
private int sessionTimeout = 2000;
private ZooKeeper zk;
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
DistributeServer server = new DistributeServer();
//1.连接上zookeeper集群 获取zk连接====创建zk
server.getConnect();
//2.服务器向zk注册==== 注册服务器到zk集群
//创建节点
server.regist(args[0]);
//3.(服务器有自己的的业务逻辑代码) 启动业务逻辑(睡觉,否则这个进程一下子就结束了)
server.business(); //这里是服务器自己的业务逻辑,我们这里就让他的业务逻辑是睡觉
}
private void business() throws InterruptedException {
Thread.sleep(Long.MAX_VALUE);
}
//注册服务器 创建节点
private void regist(String hostname) throws KeeperException, InterruptedException {
//创建节点的内容是主机名称
String s = zk.create("/servers/"+hostname, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(hostname+"id online");
}
//zookeeper连接
private void getConnect() throws IOException {
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
public void process(WatchedEvent watchedEvent) {
}
});
}
}
(3)客户端代码
//客户端
//监听集群节点的动态变化
public class DistributeClient {
private ZooKeeper zk;
//connectString时指连接的客户端地址名称以及端口号
// 注意:逗号前后不能有空格
private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
private int sessionTimeout = 2000;
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
DistributeClient client = new DistributeClient();
//1.获取zk连接
client.getConnect();
//2.监听集群节点路径上的变化(监听/servers 下面子节点的增加和删除
client.getServerList(); //获取servers上的所有节点的上线和下线
//3.业务逻辑(睡觉)
client.business();
}
//业务功能
private void business() throws InterruptedException {
Thread.sleep(Long.MAX_VALUE);
}
private void getServerList() throws KeeperException, InterruptedException {
//获取servers下的所有节点信息
List children = zk.getChildren("/servers", true);//对父节点监听
ArrayList servers = new ArrayList(); //集合用来存所有的服务器节点
//遍历所有节点 获取节点中的主机名称信息
for (String child : children) {
byte[] data = zk.getData("/servers/" + child, false, null);
servers.add(new String(data));
}
//大于服务器列表信息
System.out.println(servers);
}
//创建zookeeper客户端
private void getConnect() throws IOException {
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
public void process(WatchedEvent watchedEvent) {
//收到事件通知后的回调函数(用户的业务逻辑)
try {
//再次启动监听
getServerList();//在这里加这个就是避免只监听一次
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
(4)测试
- 1)在 Linux 命令行上 *** 作增加减少服务器
启动 DistributeClient 客户端
在 hadoop102 上 zk 的客户端/servers 目录上创建临时带序号节点
[zk: localhost:2181(CONNECTED) 1] create -e -s /servers/hadoop102 "hadoop102" [zk: localhost:2181(CONNECTED) 2] create -e -s /servers/hadoop103 "hadoop103"
观察 Idea 控制台变化
[hadoop102, hadoop103]
执行删除 *** 作
[zk: localhost:2181(CONNECTED) 8] delete /servers/hadoop1020000000000
观察 Idea 控制台变化
[hadoop103]
- 2)在 Idea 上 *** 作增加减少服务器
启动 DistributeClient 客户端(如果已经启动过,不需要重启)
启动 DistributeServer 服务
①点击 Edit Configurations…
②在d出的窗口中(Program arguments)输入想启动的主机,例如,hadoop102
③回到 DistributeServer 的 main 方 法 , 右 键 , 在 d 出 的 窗 口 中 点 击 Run
④观察 DistributeServer 控制台,提示 hadoop102 is working
⑤观察 DistributeClient 控制台,提示 hadoop102 已经上线
在单体的应用开发场景中,涉及并发同步的时候,大家往往采用synchronized或者Lock的方式来解决多线程间的同步问题。但在分布式集群工作的开发场景中,那么就需要一种更加高级的锁机制,来处理种跨JVM进程之间的数据同步问题,这就是分布式锁。
1、JAVA原生API *** 作什么是分布式锁呢?
比如说进程1在使用该资源的时候,会先去获得锁,进程1获得锁后会对该资源保持独占,这样其他进程无法访问该资源,进程1用完该资源后就把锁释放掉,然后其他进程就可以取获得这个锁了。
通过锁的机制,我们可以保证分布式系统中多个进程能够有序的访问该临界资源,我们把这个分布式环境下的锁叫分布式锁。
大家详细可以看这个博文,个人感觉写的很好(但后面的实现加入了可重入锁判断):https://blog.csdn.net/crazymakercircle/article/details/85956246
- 这里先介绍原生JavaAPI的实现步骤
//分布式锁
public class DistributeLock {
private String connectString = "hadoop102:2181,hadoop103:2181,hadoop1042181";
private int sessionTimeout = 2000;
private ZooKeeper zk;
private String waitPath; //当前等待的子节点
private CountDownLatch connectLatch = new CountDownLatch(1);//增加代码健壮性
private CountDownLatch waitLatch = new CountDownLatch(1);
private String currentNode;
public DistributeLock() throws IOException, InterruptedException, KeeperException {
//1.获取连接 建立服务端与客户端连接
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
public void process(WatchedEvent watchedEvent) {
// 连接建立时, 打开 latch, 唤醒 wait 在该 latch 上的线程
if (watchedEvent.getState()==Event.KeeperState.SyncConnected){
connectLatch.countDown();
}
// 发生了 waitPath 的删除事件
if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)){
waitLatch.countDown();
}
}
});
//等待zookeeper真正连接上,代码才往下继续执行
connectLatch.await();
//2.判断根节点locks是否存在
Stat stat = zk.exists("/locks", false);
if (stat == null){ //不存在则创建根节点
//根节点得是永久
zk.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
//对zk加锁
public void zkLock() throws KeeperException, InterruptedException {
//创建对应的临时带序号节点 返回值为创建的节点路径
currentNode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
//判断当前节点是不是整个根目录的最小节点,
List children = zk.getChildren("/locks", false);
//如果children只有一个值,那就直接获取锁,如果多个节点,那就要判断最小的是谁
// 直接获取锁,不是则监听它序号前一个节点
if (children.size() == 1){
return;
}else {
Collections.sort(children);
//获取对应的节点名称 seq-00000000
String thisNode = currentNode.substring("/locks/".length());
获取当前节点的位置 通过thisNode来获取它在集合第几位
int index = children.indexOf(thisNode);
if (index == -1){
System.out.println("数据异常");
}else if (index == 0){ //只有一个节点index == 0, 说明 thisNode 在列表中最小, 当前client 获得锁
return; //获取锁
}else {
// 获得排名比 currentNode 前 1 位的节点
this.waitPath = "/locks/"+children.get(index-1);
//需要监听前一个节点的变化
//在 waitPath 上注册监听器, 当 waitPath 被删除时, zookeeper 会回调监听器的 process 方法
zk.getData(waitPath,true,new Stat());
进入等待锁状态
waitLatch.await();
return;
}
}
}
//对zk解锁
public void unZkLock() throws KeeperException, InterruptedException {
//删除节点
zk.delete(this.currentNode,-1);
}
}
测试
public class DistributedLockTest {
public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
//创建分布式锁
final DistributeLock lock1 = new DistributeLock();
final DistributeLock lock2 = new DistributeLock();
new Thread(()->{
try {
lock1.zkLock();
System.out.println("1获得锁");
Thread.sleep(5000);
lock1.unZkLock();
System.out.println("1释放锁");
}catch (Exception e){
e.printStackTrace();
}
}).start();
new Thread(()->{
try {
lock2.zkLock();
System.out.println("2获得锁");
Thread.sleep(5000);
lock2.unZkLock();
System.out.println("2释放锁");
}catch (Exception e){
e.printStackTrace();
}
}).start();
}
}
观察控制台变化:
线程 1 获取锁
线程 1 释放锁
线程 2 获取锁
线程 2 释放锁
- Curator框架实现分布式锁
刚才我们用的Java原生API *** 作发现了问题:
每次会话的连接我们都要自己去处理, CountDownLatch。
还有就是watch要重复注册,不然监听不到,不能生效。
不支持多节点删除和创建。需要自己去递归
Curator 是一个专门解决分布式锁的框架,解决了原生 JavaAPI 开发分布式遇到的问题/
public class CuratorLockTest {
private String rootNode = "locks/";
//zookeeper server列表
private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
//connection超时时间
private int connectionTimeout = 2000;
//session超时时间
private int sessionTimeout = 2000;
public static void main(String[] args) {
new CuratorLockTest().test();
}
//测试
private void test() {
//创建分布式锁1
final InterProcessLock lock1 = new InterProcessMutex(getCuratorframework(), rootNode);
//创建分布式锁2
final InterProcessLock lock2 = new InterProcessMutex(getCuratorframework(),rootNode);
new Thread(new Runnable() {
@Override
public void run() {
//获得锁对象
try {
lock1.acquire();
System.out.println("线程1获得锁");
//测试锁重入
lock1.acquire();
System.out.println("线程1再次获得锁");
Thread.sleep(5000);
lock1.release();
System.out.println("线程1释放锁");
lock1.release();
System.out.println("线程1再次释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
//获得锁对象
try {
lock2.acquire();
System.out.println("线程2获得锁");
//测试锁重入
lock2.acquire();
System.out.println("线程2再次获得锁");
Thread.sleep(5000);
lock2.release();
System.out.println("线程2释放锁");
lock2.release();
System.out.println("线程2再次释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
//分布式锁初始化
private Curatorframework getCuratorframework() {
//重试策略 初始时间3秒,重试3次
ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);
//通过工厂创建Curator
Curatorframework client = CuratorframeworkFactory.builder()
.connectString(connectString)
.connectionTimeoutMs(connectionTimeout)
.sessionTimeoutMs(sessionTimeout)
.retryPolicy(policy).build();
//开启连接
client.start();
System.out.println("初始化完成");
return client;
}
}
观察控制台变化:
线程 1 获取锁
线程 1 再次获取锁
线程 1 释放锁
线程 1 再次释放锁
线程 2 获取锁
线程 2 再次获取锁
线程 2 释放锁
线程 2 再次释放锁
Zookeeper 是如何保证数据一致性的?
1. Zab协议内容这个就是为zookeeper设计的一个支持崩溃恢复的协议。Zookeeper 设计为只有一台客户端(Leader)负责处理外部的写事务请求,然后
Leader 客户端将数据同步到其他 Follower 节点。即 Zookeeper 只有一个 Leader 可以发起提案。
2. 消息广播ZAB协议包括两种基本的模式——消息广播、崩溃恢复
1)首先客户端发起一个写 *** 作请求
2)leader服务器将客户端的请求变成proposal提案,同时为每个proposal分配一个全局的ID,即zxid,事务id
3)leader服务器为每一个follower服务器分配一个单独的队列,然后将需要广播的提案依次放到队列中,并且根据FIFO策略进行消息发送
4)follower收到proposal后,会首先将其以事务日志的方式写入磁盘中,写入成功向leader发送一个ack响应消息
5)leader接受到半数以上的follower的响应消息后,就认为消息发送成功,可以发送客户端的写 *** 作,commit消息
6)leader向所有的follower广播commit消息,同时自身也要完成事务的提交,follower接收到来自leader的commit消息后,会将上一条事务提交。
7)Zookeeper采用Zab协议的核心,就是只要有一台服务器提交了Proposal,就要确保所有的服务器最终都能正确提交Proposal。
-
上面的过程总结一句话——zab协议针对事务请求的处理过程类似于两个阶段的提交过程:
(1)广播事务阶段 proposal
(2)广播提交阶段 commit -
但是,如果出现问题?leader宕机了
(1)Leader 发起一个事 务Proposal1 后就宕机 ,Follower 都没有Proposal1
(2)Leader收到半数ACK宕机,没来得及向Follower发送Commit
咋办嘞?ZAB这时候有个崩溃恢复模式。
一旦Leader服务器出现崩溃或者由于网络原因导致Leader服务器失去了与过半 Follower的联系,那么就会进入崩溃恢复模式。
-
两种服务器异常情况
1)假设一个事务proposal在Leader提出之后,Leader挂了。
2)一个事务在Leader上提交了,并且过半的Follower都响应Ack了,但是Leader在Commit消息发出之前挂了。 -
这时候ZAB协议崩溃恢复上场,但它有两个前提条件:
1)确保已经被Leader提交的提案Proposal,必须最终被所有的Follower服务器提交。 (已经产生的提案,Follower必须执行,也就是我一旦向你发了提案,你就要响应)
2)确保丢弃已经被Leader提出的,但是没有被提交的Proposal。(丢弃胎死腹中的提案,就是我提出了,但我还没给follower发,那就kill掉) -
下面我们准备工作完成,开始正式进入崩溃恢复模块
崩溃恢复主要包括两部分:Leader选举和数据恢复。
(1)Leader选举
Leader选举,根据上述要求,Zab协议需要保证选举出来的Leader需要满足以下条件:
1)新选举出来的Leader不能包含未提交的Proposal。即新Leader必须都是已经提交了Proposal(回复响应ack)的Follower服务器节点。
2)新选举的Leader节点中含有最大的zxid。这样做的好处是可以避免Leader服务器检查Proposal的提交和丢弃工作。(最大说明你的数据最新昂)
(2)数据恢复
ZAB如何数据同步:
1)完成Leader选举后,在正式开始工作之前(接收事务请求,然后提出新的Proposal),Leader服务器会首先确认事务日志中的所有的Proposal 是否已经被集群中过半的服务器Commit。意思就是之前的所有请求你要先处理干净,完成掉,这样你才能后续再去接受新的事务请求,所谓打扫干净屋子再请客。
2)Leader服务器需要确保所有的Follower服务器能够接收到每一条事务的Proposal,并且能将所有已经提交的事务Proposal应用到内存数据中。等到Follower将所有尚未同步的事务Proposal都从Leader服务器上同步过,并且应用到内存数据中以后,Leader才会把该Follower加入到真正可用的Follower列表中。就好比我要确认你是否可用,要是可用,我把你加入我的部门,如果都不可以正确提交,那就是不可用,我就不认为你是我的部门下的。
1.CAP理论什么是cap呢?
⚫ 一致性(C:Consistency)
⚫ 可用性(A:Available)
⚫ 分区容错性(P:Partition Tolerance)
CAP理论告诉我们,一个分布式系统不可能同时满足三种基本要求,最多只能同时满足其中的两项,因为P是必须的,因此往往选择就在CP或者AP中。
1)一致性(C:Consistency)
在分布式环境中,一致性是指数据在多个副本之间是否能够保持数据一致的特性。在一致性的需求下,当一个系统在数据一致的状态下执行更新 *** 作后,应该保证系统的数据仍然处于一致的状态。
2)可用性(A:Available)
可用性是指系统提供的服务必须一直处于可用的状态,对于用户的每一个 *** 作请求总是能够在有限的时间内返回结果。
3)分区容错性(P:Partition Tolerance)
分布式系统在遇到任何网络分区故障的时候,仍然需要能够保证对外提供满足一致性和可用性的服务,除非是整个网络环境都发生了故障。
ZooKeeper保证的是CP
(1)ZooKeeper不能保证每次服务请求的可用性。(注:在极端环境下,ZooKeeper可能会丢弃一些请求,消费者程序需要重新请求才能获得结果,比如说有个服务器挂了,那你还没数据同步呢,就挂了,等到服务器恢复,你这个请求也无效啊,所以重新请求)。所以说,ZooKeeper不能保证服务可用性。
(2)进行Leader选举时集群都是不可用。在选举的过程中,不能对外提供服务。但在选举的过程中,首先选zxid(zk的事务ID)最大的为leader,zxid最大,表示数据是最新的,然后广播给follower,这样避免数据丢失。
- 对这三个简单说明一下:
小明写了一个分布式架构
数据副本在不同的机器上做冗余,中间有数据的复制,这样保持了一致性。
这样看着感觉是个不错的分布式环境吧。
但是,有一天,用户A访问了机器A,往里头写了他最爱的一些小数据,结果突然机器A挂了,网断了,这时产生了问题:
1)负载均衡没找到机器A,然后把用户A的下一次访问转到B上去了
2)数据复制同步也找不到A了,那没法同步,他也就say拜拜了,所以你这个B上没有用户A的小数据,因为还没复制呢你机器A就挂了,这机器B上也当然没有用户A的小数据,全是老数据。
咋办?????
小黄说,机器B 还活着,还可以提供服务,不就是数据复制不到用户A的小数据嘛,也就是漏几条数据的问题,无伤大雅,等机器A的网好了,数据复制正常,一切也正常了。
小黄这时啥心态,他选择了系统的可用性(A),系统能提供服务就行了,数据不一致我可以接受。
但是,小明说了,老板说这次开发数据很重要,关乎上市,所以小明的心态就是先把机器B上这些数据相关的功能先停掉,等机器A好了,数据同步以后再开工。很明显,小明把一致性(C)放在了首位。
所以问题就很明显了, 在网络节点之间无法通信的情况下, 和数据复制相关的功能, 要么选择可用性A, 要么选择一致性C, 不能同时选择两者。
小明发现,其实这两种选择的背后其实隐藏着另外一个事实, 那就是网络节点之间无法通信的情况下, 节点被隔离,产生了网络分区, 整个系统仍然是可以工作的, 大胖给它起了个名: 分区容错性(Partition tolerance, 简称P)。
如果选择了可用性A + 分区容错性P , 就要放弃一致性C。
如果选择了一致性C+ 分区容错性P, 就得放弃可用性A,这种情况下,虽然系统的有些功能是不能使用的, 因为需要等待数据的同步, 但是那些和数据同步无关的功能还是可以访问的 , 相当于系统做了功能的降级。
(这个很详细,建议看看:http://blog.itpub.net/69947338/viewspace-2656369/)
晕了…完结~
下期源码趴…
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)