
在上一篇博客中给大家介绍了 Curator框架的LeaderLatch,它是一种Leader选举实现,本篇博客介绍Curator框架的另一种Leader选举实现LeaderSelector。
- ZooKeeper : Curator框架之Leader选举LeaderLatch
这里不再赘述Leader选举的概念。
测试代码CuratorframeworkProperties类(提供Curatorframework需要的一些配置信息):
package com.kaven.zookeeper;
import org.apache.curator.RetryPolicy;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CuratorframeworkProperties {
// 连接地址
public static final String CONNECT_ADDRESS = "192.168.31.175:9000";
// 连接超时时间
public static final int CONNECTION_TIMEOUT_MS = 40000;
// Session超时时间
public static final int SESSION_TIMEOUT_MS = 10000;
// 命名空间
public static final String NAMESPACE = "MyNamespace";
// 重试策略
public static final RetryPolicy RETRY_POLICY = new ExponentialBackoffRetry(1000, 3);
}
LeaderSelectorRunnable类(实现了Runnable接口,模拟分布式服务节点参与Leader选举):
package com.kaven.zookeeper;
import lombok.Setter;
import lombok.SneakyThrows;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.CuratorframeworkFactory;
import org.apache.curator.framework.imps.CuratorframeworkState;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class LeaderSelectorRunnable implements Runnable{
private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();
@SneakyThrows
@Override
public void run() {
// 使用不同的Curatorframework实例,表示不同的分布式服务节点
Curatorframework curator = getCuratorframework();
curator.start();
assert curator.getState().equals(CuratorframeworkState.STARTED);
// 模拟随机加入的分布式服务节点
int randomSleep = new Random().nextInt(1000);
Thread.sleep(randomSleep);
// 创建监听器
LeaderSelectorListenerImpl listener = new LeaderSelectorListenerImpl();
// 创建LeaderSelector实例(用于Leader选举)
// curator是Curatorframework实例,用于与ZooKeeper交互
// "/services/leader"是leaderPath,Leader节点会成功创建该节点(其他节点则会失败)
// EXECUTOR_SERVICE是用于执行业务的Executor实例
// listener是该实例的监听器
LeaderSelector selector = new LeaderSelector(curator, "/services/leader",
EXECUTOR_SERVICE, listener);
// 给自定义的监听器设置LeaderSelector实例
listener.setSelector(selector);
// 将线程名(Thread.currentThread().getName())作为分布式服务节点的id
selector.setId(Thread.currentThread().getName());
System.out.println(selector.getId() + "准备好了!");
// 开始Leader选举
selector.start();
System.out.println(selector.getId() + "开始Leader选举!");
}
private Curatorframework getCuratorframework() {
// 创建Curatorframework实例
return CuratorframeworkFactory.builder()
.connectString(CuratorframeworkProperties.CONNECT_ADDRESS)
.retryPolicy(CuratorframeworkProperties.RETRY_POLICY)
.connectionTimeoutMs(CuratorframeworkProperties.CONNECTION_TIMEOUT_MS)
.sessionTimeoutMs(CuratorframeworkProperties.SESSION_TIMEOUT_MS)
.namespace(CuratorframeworkProperties.NAMESPACE)
.build();
}
@Setter
private static class LeaderSelectorListenerImpl implements LeaderSelectorListener {
private LeaderSelector selector;
// 被选举为Leader时调用
// 该方法结束后会释放领导权,即重新进行Leader选举(还有节点的情况下)
@Override
public void takeLeadership(Curatorframework client) throws Exception {
System.out.println(selector.getId() + "被选举为Leader");
selector.getParticipants().forEach(System.out::println);
// 模拟业务处理
Thread.sleep(5000);
}
// 当连接状态发生变化时调用
@Override
public void stateChanged(Curatorframework client, ConnectionState newState) {
System.out.println(selector.getId() + " : " + newState.name());
}
}
}
启动类;
package com.kaven.zookeeper;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Application {
private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();
public static void main(String[] args) throws Exception {
for (int i = 0; i < 7; i++) {
EXECUTOR_SERVICE.execute(new LeaderSelectorRunnable());
}
Thread.sleep(10000000);
}
}
模拟7个分布式服务节点进行Leader选举,输出如下所示:
pool-1-thread-2准备好了!
pool-1-thread-3准备好了!
pool-1-thread-3开始Leader选举!
pool-1-thread-2开始Leader选举!
pool-1-thread-4准备好了!
pool-1-thread-4开始Leader选举!
pool-1-thread-7准备好了!
pool-1-thread-7开始Leader选举!
pool-1-thread-6准备好了!
pool-1-thread-6开始Leader选举!
pool-1-thread-5准备好了!
pool-1-thread-5开始Leader选举!
pool-1-thread-1准备好了!
pool-1-thread-1开始Leader选举!
pool-1-thread-6 : ConNECTED
pool-1-thread-1 : ConNECTED
pool-1-thread-5 : ConNECTED
pool-1-thread-2 : ConNECTED
pool-1-thread-7 : ConNECTED
pool-1-thread-3 : ConNECTED
pool-1-thread-4 : ConNECTED
pool-1-thread-1被选举为Leader
Participant{id='pool-1-thread-1', isLeader=true}
Participant{id='pool-1-thread-6', isLeader=false}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-5', isLeader=false}
Participant{id='pool-1-thread-3', isLeader=false}
Participant{id='pool-1-thread-4', isLeader=false}
Participant{id='pool-1-thread-2', isLeader=false}
pool-1-thread-6被选举为Leader
Participant{id='pool-1-thread-6', isLeader=true}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-5', isLeader=false}
Participant{id='pool-1-thread-3', isLeader=false}
Participant{id='pool-1-thread-4', isLeader=false}
Participant{id='pool-1-thread-2', isLeader=false}
pool-1-thread-7被选举为Leader
Participant{id='pool-1-thread-7', isLeader=true}
Participant{id='pool-1-thread-5', isLeader=false}
Participant{id='pool-1-thread-3', isLeader=false}
Participant{id='pool-1-thread-4', isLeader=false}
Participant{id='pool-1-thread-2', isLeader=false}
pool-1-thread-5被选举为Leader
Participant{id='pool-1-thread-5', isLeader=true}
Participant{id='pool-1-thread-3', isLeader=false}
Participant{id='pool-1-thread-4', isLeader=false}
Participant{id='pool-1-thread-2', isLeader=false}
pool-1-thread-3被选举为Leader
Participant{id='pool-1-thread-3', isLeader=true}
Participant{id='pool-1-thread-4', isLeader=false}
Participant{id='pool-1-thread-2', isLeader=false}
pool-1-thread-4被选举为Leader
Participant{id='pool-1-thread-4', isLeader=true}
Participant{id='pool-1-thread-2', isLeader=false}
pool-1-thread-2被选举为Leader
Participant{id='pool-1-thread-2', isLeader=true}
重新参与Leader选举
被选举为Leader的LeaderSelector实例并没有调用close方法,却重新进行了Leader选举,这其实是监听器的takeLeadership方法结束后会释放领导权,即重新进行Leader选举(还有节点的情况下)。
相关方法:
@VisibleForTesting
void doWork() throws Exception
{
hasLeadership = false;
try
{
// 尝试获取锁,即成为Leader
mutex.acquire();
// 成功获取锁,成为Leader
hasLeadership = true;
try
{
// 调用监听器的takeLeadership方法
listener.takeLeadership(client);
}
finally
{
// 设置isQueued属性为false
// 即不再参入Leader选举
clearIsQueued();
}
}
finally
{
if ( hasLeadership )
{
// 失去领导权
hasLeadership = false;
// 清除任何中断的状态,以便 mutex.release() 立即工作
boolean wasInterrupted = Thread.interrupted();
try
{
// 释放锁
mutex.release();
}
catch ( Exception e )
{}
}
}
}
因此监听器的takeLeadership方法就是用来处理业务逻辑的,处理完就会释放锁,即退出Leader选举,如果处理完业务后还想再次参与Leader选举,可以调用LeaderSelector实例的autoRequeue方法(将autoRequeue属性设置为true,即自动重新排队,再次参与Leader选举):
// 默认情况下,当LeaderSelectorListener.takeLeadership(Curatorframework)返回时,此实例不会重新排队
// 调用此方法会将实例置于一种自动重新排队的模式
public void autoRequeue()
{
autoRequeue.set(true);
}
相关方法:
private synchronized boolean internalRequeue()
{
// 没有在排队,并且状态为STARTED
if ( !isQueued && (state.get() == State.STARTED) )
{
// 设置排队状态为true
isQueued = true;
// 向Executor实例(创建LeaderSelector实例时传入的)提交任务
Future task = executorService.submit(new Callable()
{
@Override
public Void call() throws Exception
{
try
{
// 执行业务,会间接调用上面介绍的doWork方法
doWorkLoop();
}
finally
{
// 设置isQueued属性为false
// 即不再参入Leader选举
clearIsQueued();
// 是否自动重新入队
if ( autoRequeue.get() )
{
// 自动重新排队
// 继续参与Leader选举
internalRequeue();
}
}
return null;
}
});
// 设置任务
ourTask.set(task);
return true;
}
return false;
}
将autoRequeue属性设置为true,每次失去领导权(之前是Leader的情况下),都会自动重新排队,再次参与Leader选举。
修改代码:
// 开始Leader选举
selector.start();
System.out.println(selector.getId() + "开始Leader选举!");
selector.autoRequeue();
输出如下所示:
pool-1-thread-5被选举为Leader
Participant{id='pool-1-thread-5', isLeader=true}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-1', isLeader=false}
Participant{id='pool-1-thread-6', isLeader=false}
Participant{id='pool-1-thread-3', isLeader=false}
Participant{id='pool-1-thread-4', isLeader=false}
Participant{id='pool-1-thread-2', isLeader=false}
pool-1-thread-7被选举为Leader
Participant{id='pool-1-thread-7', isLeader=true}
Participant{id='pool-1-thread-1', isLeader=false}
Participant{id='pool-1-thread-6', isLeader=false}
Participant{id='pool-1-thread-3', isLeader=false}
Participant{id='pool-1-thread-4', isLeader=false}
Participant{id='pool-1-thread-2', isLeader=false}
Participant{id='pool-1-thread-5', isLeader=false}
pool-1-thread-1被选举为Leader
Participant{id='pool-1-thread-1', isLeader=true}
Participant{id='pool-1-thread-6', isLeader=false}
Participant{id='pool-1-thread-3', isLeader=false}
Participant{id='pool-1-thread-4', isLeader=false}
Participant{id='pool-1-thread-2', isLeader=false}
Participant{id='pool-1-thread-5', isLeader=false}
Participant{id='pool-1-thread-7', isLeader=false}
pool-1-thread-6被选举为Leader
Participant{id='pool-1-thread-6', isLeader=true}
Participant{id='pool-1-thread-3', isLeader=false}
Participant{id='pool-1-thread-4', isLeader=false}
Participant{id='pool-1-thread-2', isLeader=false}
Participant{id='pool-1-thread-5', isLeader=false}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-1', isLeader=false}
pool-1-thread-3被选举为Leader
Participant{id='pool-1-thread-3', isLeader=true}
Participant{id='pool-1-thread-4', isLeader=false}
Participant{id='pool-1-thread-2', isLeader=false}
Participant{id='pool-1-thread-5', isLeader=false}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-1', isLeader=false}
Participant{id='pool-1-thread-6', isLeader=false}
pool-1-thread-4被选举为Leader
Participant{id='pool-1-thread-4', isLeader=true}
Participant{id='pool-1-thread-2', isLeader=false}
Participant{id='pool-1-thread-5', isLeader=false}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-1', isLeader=false}
Participant{id='pool-1-thread-6', isLeader=false}
Participant{id='pool-1-thread-3', isLeader=false}
pool-1-thread-2被选举为Leader
Participant{id='pool-1-thread-2', isLeader=true}
Participant{id='pool-1-thread-5', isLeader=false}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-1', isLeader=false}
Participant{id='pool-1-thread-6', isLeader=false}
Participant{id='pool-1-thread-3', isLeader=false}
Participant{id='pool-1-thread-4', isLeader=false}
...
每个参与Leader选举的LeaderSelector实例在失去领导权后都会自动重新排队,即每次参与Leader选举的Participant列表都包含7个LeaderSelector实例。
需要时才重新排队,而不是每次都自动重新排队,可以调用requeue方法(实际上还是调用internalRequeue方法):
// 重新排队争取领导权
// 如果此实例已排队,则不会发生任何事情并返回false
// 如果实例未排队,则重新排队并返回 true
public boolean requeue()
{
Preconditions.checkState(state.get() == State.STARTED, "close() has already been called");
return internalRequeue();
}
Curator框架的Leader选举实现LeaderSelector就介绍到这里,源码以后会进行分析介绍,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)