|
基于Redission实现分布式调度任务选主
在Spring Cloud微服务架构中,基于Redisson实现分布式调度任务的选主和心跳监听,可以通过以下完整方案实现。该方案结合了Redisson的分布式锁和发布/订阅功能,确保高可靠性和实时性:
一、整体架构设计
基于Redission实现分布式调度任务选主
二、核心实现步骤1. 添加依赖- <!-- Redisson Starter -->
- <dependency>
- <groupId>org.redisson</groupId>
- <artifactId>redisson-spring-boot-starter</artifactId>
- <version>3.23.2</version>
- </dependency>
复制代码 2. 选主服务实现
- import org.redisson.api.RLock;
- import org.redisson.api.RedissonClient;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
- import javax.annotation.PostConstruct;
- import javax.annotation.PreDestroy;
- import java.util.concurrent.TimeUnit;
- @Service
- public class LeaderElectionService {
- private static final String LEADER_LOCK_KEY = "scheduler:leader:lock";
- private static final String HEARTBEAT_CHANNEL = "scheduler:heartbeat";
- @Autowired
- private RedissonClient redisson;
-
- private volatile boolean isLeader = false;
- private RLock leaderLock;
- private Thread heartbeatThread;
- @PostConstruct
- public void init() {
- leaderLock = redisson.getLock(LEADER_LOCK_KEY);
- startElection();
- startHeartbeatListener();
- }
- private void startElection() {
- // 尝试获取领导权(非阻塞式)
- new Thread(() -> {
- while (!Thread.currentThread().isInterrupted()) {
- try {
- // 尝试获取锁,锁过期时间30秒
- boolean acquired = leaderLock.tryLock(0, 30, TimeUnit.SECONDS);
- if (acquired) {
- isLeader = true;
- System.out.println("当前节点当选为Leader");
- startHeartbeatTask(); // 启动心跳任务
- break;
- }
- Thread.sleep(5000); // 每5秒重试一次
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- }).start();
- }
- private void startHeartbeatTask() {
- heartbeatThread = new Thread(() -> {
- while (isLeader && !Thread.currentThread().isInterrupted()) {
- try {
- // 1. 续期锁(看门狗机制会自动处理)
- // 2. 发布心跳
- redisson.getTopic(HEARTBEAT_CHANNEL)
- .publish(System.currentTimeMillis());
-
- Thread.sleep(10000); // 每10秒发送一次心跳
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- });
- heartbeatThread.start();
- }
- private void startHeartbeatListener() {
- // 监听Leader心跳
- redisson.getTopic(HEARTBEAT_CHANNEL)
- .addListener(Long.class, (channel, heartbeatTime) -> {
- System.out.println("收到Leader心跳: " + heartbeatTime);
- // 可在此更新最后一次心跳时间
- });
- }
- @PreDestroy
- public void shutdown() {
- if (isLeader && leaderLock.isHeldByCurrentThread()) {
- leaderLock.unlock();
- isLeader = false;
- if (heartbeatThread != null) {
- heartbeatThread.interrupt();
- }
- }
- }
- public boolean isLeader() {
- return isLeader;
- }
- }
复制代码 3. 健康检查增强
- @Service
- public class HealthCheckService {
-
- @Autowired
- private RedissonClient redisson;
-
- private volatile long lastHeartbeatTime = 0;
-
- @PostConstruct
- public void init() {
- // 定时检查Leader状态
- Executors.newSingleThreadScheduledExecutor()
- .scheduleAtFixedRate(this::checkLeaderStatus, 0, 5, TimeUnit.SECONDS);
- }
-
- private void checkLeaderStatus() {
- Long currentTime = redisson.getBucket("scheduler:leader:heartbeat").get();
- if (currentTime != null) {
- lastHeartbeatTime = currentTime;
- }
-
- // 超过30秒未收到心跳认为Leader失效
- if (System.currentTimeMillis() - lastHeartbeatTime > 30000) {
- System.out.println("Leader可能已宕机,触发重新选举");
- // 可在此触发主动抢锁逻辑
- }
- }
- }
复制代码 三、关键优化点1. 多级故障检测[td]检测方式 | 触发条件 | 恢复动作 | Redisson看门狗超时 | 锁续期失败(默认30秒) | 自动释放锁,其他节点可竞争 | 主动心跳超时 | 自定义阈值(如30秒) | 强制释放锁并重新选举 | Redis连接断开 | ConnectionState.LOST | 暂停选举直到连接恢复 |
2. 选举性能优化配置- # application.yml
- redisson:
- lock:
- watchdog-timeout: 30000 # 看门狗超时时间(ms)
- threads: 16 # 事件处理线程数
- netty-threads: 32 # Netty工作线程数
复制代码 3. 脑裂防护方案
- // 使用Redisson的MultiLock实现多Redis节点锁
- RLock lock1 = redissonClient1.getLock(LEADER_LOCK_KEY);
- RLock lock2 = redissonClient2.getLock(LEADER_LOCK_KEY);
- RLock multiLock = redisson.getMultiLock(lock1, lock2);
- boolean acquired = multiLock.tryLock(0, 30, TimeUnit.SECONDS);
复制代码 四、生产环境部署建议1. Redis架构选择[td]部署模式 | 适用场景 | 建议配置 | 哨兵模式 | 高可用要求高 | 3哨兵+3Redis实例 | Cluster模式 | 大数据量+高性能 | 至少6节点(3主3从) | 单节点 | 仅开发测试 | 不推荐生产使用 |
2. 监控指标- // 暴露Redisson指标(配合Spring Boot Actuator)
- @Bean
- public RedissonMetricsBinder redissonMetrics(RedissonClient redisson) {
- return new RedissonMetricsBinder(redisson);
- }
复制代码监控关键指标: - redisson.executor.active_threads:活跃线程数
- redisson.pubsub.subscriptions:订阅数量
- redisson.connections.active:活跃连接数
3. 灾备方案- 双活数据中心:通过RedissonClient配置多区域端点
- Config config = new Config();
- config.useClusterServers()
- .addNodeAddress("redis://dc1-node1:6379")
- .addNodeAddress("redis://dc2-node1:6379");
复制代码
- 降级策略:本地缓存最后已知状态
- @Bean
- @Primary
- public LeaderService fallbackLeaderService() {
- return new FallbackLeaderService(redisLeaderService, localCache);
- }
复制代码
五、与Spring Cloud集成1. 调度任务控制- @Scheduled(fixedRate = 5000)
- public void scheduledTask() {
- if (leaderElectionService.isLeader()) {
- // 只有Leader执行的逻辑
- processBatchData();
- }
- }
复制代码 2. 动态配置更新- @RefreshScope
- @RestController
- @RequestMapping("/leader")
- public class LeaderController {
-
- @Value("${election.timeout:30000}")
- private long electionTimeout;
-
- @Autowired
- private LeaderElectionService electionService;
-
- @PostMapping("/timeout")
- public void updateTimeout(@RequestParam long timeout) {
- // 动态调整选举超时
- electionService.setElectionTimeout(timeout);
- }
- }
复制代码 六、方案优势总结- 亚秒级故障检测:通过Redis Pub/Sub实现实时通知
- 自动故障转移:Redisson看门狗机制保障锁释放
- 弹性扩展:支持动态增减微服务实例
- 最小依赖:仅需Redis集群,无需额外组件
- 与Spring生态无缝集成:完美配合@Scheduled等组件
|
|