基于Zookeeper实现调度任务选主及心跳检测
在微服务架构中使用ZooKeeper实现分布式任务调度选主,并确保Follower节点能实时监控Master状态并及时触发重新选举,可以通过以下方案实现: 一、核心设计原理1. ZooKeeper特性利用ZK功能在选主中的应用
临时节点(EPHEMERAL)Master创建临时节点,会话断开时节点自动删除(相当于心跳检测)
Watcher机制Follower监听Master节点变化
顺序节点(SEQUENTIAL)实现公平的选举排序
2. 状态监控流程
基于Zookeeper实现调度任务选主及心跳检测
二、完整实现方案1. 添加依赖
- <!-- Curator客户端(推荐) -->
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <version>5.5.0</version>
- </dependency>
复制代码 2. 选主服务实现
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.recipes.leader.LeaderSelector;
- import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
- import org.apache.curator.framework.state.ConnectionState;
- import org.springframework.stereotype.Component;
- import javax.annotation.PostConstruct;
- import javax.annotation.PreDestroy;
- @Component
- public class ZkLeaderElection {
- private final CuratorFramework zkClient;
- private LeaderSelector leaderSelector;
- private volatile boolean isLeader = false;
- public ZkLeaderElection(CuratorFramework zkClient) {
- this.zkClient = zkClient;
- }
- @PostConstruct
- public void init() throws Exception {
- leaderSelector = new LeaderSelector(zkClient, "/scheduler/leader",
- new LeaderSelectorListener() {
- @Override
- public void takeLeadership(CuratorFramework client) throws Exception {
- // 成为Leader后的逻辑
- isLeader = true;
- System.out.println("当前节点当选为Leader");
- try {
- while (true) {
- Thread.sleep(1000); // 模拟持续工作
- }
- } finally {
- isLeader = false;
- }
- }
- @Override
- public void stateChanged(CuratorFramework client, ConnectionState newState) {
- // 连接状态变化处理
- if (newState == ConnectionState.LOST) {
- isLeader = false;
- }
- }
- });
- leaderSelector.autoRequeue(); // 自动重新参与选举
- leaderSelector.start();
- }
- @PreDestroy
- public void shutdown() {
- if (leaderSelector != null) {
- leaderSelector.close();
- }
- }
- public boolean isLeader() {
- return isLeader;
- }
- }
复制代码 3. 增强型状态监控(生产级)
- // 在init()方法中添加以下逻辑
- public void init() throws Exception {
- // ...原有代码...
-
- // 添加额外的心跳检测
- zkClient.getConnectionStateListenable().addListener((client, newState) -> {
- if (newState == ConnectionState.RECONNECTED) {
- // 重连后强制检查Leader状态
- checkLeaderStatus();
- }
- });
-
- // 启动定时检查任务
- Executors.newSingleThreadScheduledExecutor()
- .scheduleAtFixedRate(this::checkLeaderStatus, 0, 5, TimeUnit.SECONDS);
- }
- private void checkLeaderStatus() {
- try {
- if (zkClient.checkExists().forPath("/scheduler/leader") == null) {
- System.out.println("Leader节点不存在,触发重新选举");
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
复制代码 三、关键优化点1. 双Watch机制
- // 除了LeaderSelector内置监听,额外添加数据Watch
- zkClient.getData().usingWatcher((Watcher) event -> {
- if (event.getType() == Watcher.Event.EventType.NodeDeleted) {
- System.out.println("Leader节点被删除,立即触发选举");
- }
- }).forPath("/scheduler/leader");
复制代码 2. 选举性能优化参数推荐值说明
sessionTimeoutMs10000-15000ms根据网络状况调整
leaderSelector.autoRequeue()必须启用保证节点退出后重新参与选举
retryPolicy.baseSleepTimeMs1000ms首次重试延迟
3. 故障转移时间控制- // 在ZK配置中优化
- @Bean
- public CuratorFramework zkClient() {
- return CuratorFrameworkFactory.builder()
- .connectString("zk1:2181,zk2:2181,zk3:2181")
- .sessionTimeoutMs(15000) // 会话超时
- .connectionTimeoutMs(5000) // 连接超时
- .retryPolicy(new ExponentialBackoffRetry(1000, 3)) // 重试策略
- .build();
- }
复制代码故障转移时间 = 会话超时时间 + 选举时间(通常可控制在15秒内) 四、生产环境建议1. 监控指标指标名称采集方式告警阈值
ZK选举次数ZK的leader_election计数器1小时内>5次
Master存活时间节点数据中的时间戳连续3次<30秒
节点连接状态Curator事件监听RECONNECTED状态持续>1分钟
2. 部署架构- [微服务实例1] [微服务实例2] [微服务实例3]
- | | |
- +------------+------------+
- |
- [ZooKeeper Ensemble]
- |
- [监控系统(Prometheus + Grafana)]
复制代码 3. 异常场景处理- 脑裂防护:启用ZK的quorum机制(至少3节点)
- 网络分区:配合Sidecar代理检测真实网络状态
- 持久化问题:定期备份/scheduler节点数据
五、与Spring Cloud集成1. 健康检查端点- @RestController
- @RequestMapping("/leader")
- public class LeaderController {
-
- @Autowired
- private ZkLeaderElection election;
- @GetMapping("/status")
- public ResponseEntity<String> status() {
- return election.isLeader()
- ? ResponseEntity.ok("MASTER")
- : ResponseEntity.ok("FOLLOWER");
- }
- }
复制代码 2. 调度任务示例
- @Scheduled(fixedRate = 5000)
- public void scheduledTask() {
- if (zkLeaderElection.isLeader()) {
- System.out.println("只有Master执行的任务...");
- }
- }
复制代码 六、对比Redisson方案维度ZooKeeper方案Redisson方案
实时性秒级(依赖ZK会话超时)秒级(依赖Redis TTL)
可靠性高(CP系统)中(依赖Redis持久化)
运维复杂度较高(需维护ZK集群)较低(复用Redis)
适用场景强一致性要求的系统允许短暂脑裂的场景
通过以上方案,你的微服务可以实现: - 秒级故障检测:基于ZK临时节点和Watcher机制
- 自动快速选主:利用Curator的选举算法
- 生产级可靠性:多重监控和防护机制
- 无缝集成Spring生态:与@Scheduled等组件协同工作
|