服务器之家:专注于服务器技术及软件下载分享
分类导航

Mysql|Sql Server|Oracle|Redis|MongoDB|PostgreSQL|Sqlite|DB2|mariadb|Access|数据库技术|

服务器之家 - 数据库 - Redis - 分布式锁,原来这么简单!

分布式锁,原来这么简单!

2023-09-23 05:00未知服务器之家 Redis

作者 | 蔡柱梁 审校 | 重楼 目录 分布式锁介绍 如何实现分布式锁 实现分布式锁 1 分布式锁介绍 现在的服务往往都是多节点,在一些特定的场景下容易产生并发问题 , 比如扣减库存,送完即止活动,中台的批量导入(有唯一校验

作者 | 蔡柱梁

审校 | 重楼

目录

  1. 分布式锁介绍
  2. 如何实现分布式锁
  3. 实现分布式锁

1 分布式锁介绍

现在的服务往往都是多节点,在一些特定的场景下容易产生并发问题比如扣减库存,送完即止活动,中台的批量导入(有唯一校验要求)等等。这时,我们可以通过分布式锁解决这些问题。

2 如何实现分布式锁

实现的方式有很多种,如:

  • 基于 MySQL 等数据库实现
  • 基于 ZooKeeper 实现
  • 基于 Redis 实现不管采用什么技术栈实现,但是逻辑流程都是大体不差的。下面是笔者自己在工作中基于Redis 实践过的流程图

分布式锁,原来这么简单!

3 实现分布式锁

其实可以不用自己手写,现在有一个中间件Redisson 相当好用,十分推荐。这里的实现更多是用于学习。

3.1 Redis 是单节点的情况下实现的分布式锁

需要使用分布式锁的业务代码如下

package com.example.demo.test.utils;

import com.example.demo.utils.RedisLockUtil;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

/**
 * @author CaiZhuliang
 * @date 2023/8/31
 */
@Slf4j
@SpringBootTest
public class RedisLockUtilTest {
 @Autowired
 private RedisLockUtil redisLockUtil;

 @Test
 public void simpleLockTest() {
 String key = "redis:lock:" + System.currentTimeMillis();
 boolean result = redisLockUtil.lock(key, 8_000L);
 if (result) {
 try {
 // do something
 } catch (Exception e) {
 log.error("simpleLockTest - 系统异常!", e);
 } finally {
 boolean unlock = redisLockUtil.unlock(key);
 if (!unlock) {
 log.error("simpleLockTest - 释放锁失败,key : {}", key);
 }
 }
 }
 }
}

分布式锁工具类代码如下

package com.example.demo.utils;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Component;

import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author CaiZhuliang
 * @date 2023/8/31
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class RedisLockUtil {
 private static final ScheduledExecutorService EXECUTOR_SERVICE = new ScheduledThreadPoolExecutor(50,
 new BasicThreadFactory.Builder()
 .namingPattern("redisLockUtil-schedule-pool-%d")
 .daemon(true)
 .build());

 private static final ThreadLocal<String> THREAD_LOCAL = new ThreadLocal<>();

 private final RedisTemplate<String, String> redisTemplate;

 /**
 * 释放锁
 * <p>必须和RedisLockUtil#simpleLock是同一个线程</p>
 * @param key 需要释放锁的key
 * @return true-成功 false-失败
 */
 public boolean releaseSimpleLock(String key) {
 String token = THREAD_LOCAL.get();
 try {
 String remoteToken = redisTemplate.opsForValue().get(key);
 if (!token.equals(remoteToken)) {
 // 当前线程不再持有锁
 return false;
 }
 // 是自己持有锁才能释放
 return Boolean.TRUE.equals(redisTemplate.delete(key));
 } catch (Exception e) {
 log.error("非cluster模式简单分布式锁 - 释放锁发生异常,key : {}", key, e);
 return false;
 } finally {
 THREAD_LOCAL.remove();
 }
 }

 /**
 * 这个方法不考虑Redis的集群架构,不考虑脑裂问题,当只有一个Redis来考虑。
 * @param key 需要上锁的key
 * @param expireTime 过期时间,单位:毫秒
 * @return true-成功 false-失败
 */
 public boolean simpleLock(String key, Long expireTime) {
 if (StringUtils.isBlank(key)) {
 log.warn("非cluster模式简单分布式锁 - key is blank");
 return false;
 }
 if (null == expireTime || expireTime <= 0) {
 expireTime = 0L;
 }
 String token = UUID.randomUUID().toString();
 // 续约周期,单位纳秒
 long renewPeriod = expireTime / 2 * 1000_000;
 try {
 // 设置锁
 Boolean result = redisTemplate.opsForValue().setIfAbsent(key, token, expireTime, TimeUnit.MILLISECONDS);
 if (Boolean.FALSE.equals(result)) {
 return false;
 }
 // 上锁成功后将令牌绑定当前线程
 THREAD_LOCAL.set(token);
 if (renewPeriod > 0) {
 // 续约任务
 renewTask(key, token, expireTime, renewPeriod);
 }
 return true;
 } catch (Exception e) {
 log.error("非cluster模式简单分布式锁 - 上锁失败。", e);
 THREAD_LOCAL.remove();
 return false;
 }
 }

 /**
 * 锁续约任务
 * @param key 需要续命的key
 * @param token 成功获锁的线程持有的令牌
 * @param expireTime 过期时间,单位:毫秒
 * @param renewPeriod 续约周期,单位:纳秒
 */
 private void renewTask(String key, String token, long expireTime, long renewPeriod) {
 EXECUTOR_SERVICE.schedule(() -> {
 ValueOperations<String, String> valueOperator = redisTemplate.opsForValue();
 String val = valueOperator.get(key);
 if (token.equals(val)) {
 // 是自己持有锁才能续约
 try {
 Boolean result = valueOperator.setIfPresent(key, token, expireTime, TimeUnit.MILLISECONDS);
 if (Boolean.TRUE.equals(result)) {
 // 续约成功
 log.debug("非cluster模式简单分布式锁 - 锁续约成功,key : {}", key);
 // 开启下一次续约任务
 renewTask(key, token, expireTime, renewPeriod);
 } else {
 log.error("非cluster模式简单分布式锁 - 锁续约失败,key : {}", key);
 }
 } catch (Exception e) {
 // 这里异常是抛不出去的,所以需要 catch 打印
 log.error("非cluster模式简单分布式锁 - 锁续约发生异常,key : {}", key, e);
 }
 } else {
 log.error("非cluster模式简单分布式锁 - 锁续约失败,不再持有token,key : {}", key);
 }
 }, renewPeriod, TimeUnit.NANOSECONDS);
 }
}

这就是一个最简单的实现方式。不过这里存在着许多问题:

  • 续约任务

这里判断是否持有令牌和续约这两个动作不在同一个事务里,可能发生覆盖现象。假设A线程判断自己持有令牌,但是一直没有请求 Redis 导致锁过期。B线程成功获锁,这时A线程往下执行 Redis 请求,结果A线程抢了B线程的锁。

  • 释放锁

这里判断是否持有令牌和删除key这两个动作不在同一个事务里,可能出现误删现象。假设A线程现在要释放锁,通过了令牌判断,准备删除 key 但是还没执行。这时 key 过期了,B线程成功获锁。接着A线程执行删除 key 导致了 B 线程的锁被删除。

因此,判断持有令牌与续约/删除key这两个动作是需要原子性的,我们可以通过 lua 来实现。

扩展,了解管道与 lua 的区别

  • pipeline(多用于命令简单高效,无关联的场景)

优点:使用简单,有效减少网络IO

缺点:本质还是发送命令请求Redis 服务,如果效率过低,就会阻塞 Redis,导致 Redis 无法处理其他请求

  • lua(多用于命令复杂,命令间有关联的场景)

优点:

  1. Redis 支持 lua 脚本,Redis 服务执行 lua 的同时是可以处理别的请求的,不会产生阻塞
  2. 命令都在脚本中,有效减少网络IO
  3. 具有原子性

缺点:

有一定的学习成本

3.1.1 使用 lua 进行优化

RedisLockUtil 代码如下:

package com.example.demo.utils;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author CaiZhuliang
 * @date 2023/8/31
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class RedisLockUtil {
 private static final ScheduledExecutorService EXECUTOR_SERVICE = new ScheduledThreadPoolExecutor(50,
 new BasicThreadFactory.Builder()
 .namingPattern("redisLockUtil-schedule-pool-%d")
 .daemon(true)
 .build());

 private static final ThreadLocal<String> THREAD_LOCAL = new ThreadLocal<>();
 private static final String SUCCESS = "1";
 /**
 * 允许当前token续约
 */
 private static final Integer CAN_RENEW = 0;
 /**
 * 记录token的状态,0-可以续约,其他情况均不能续约
 */
 private static final Map<String, Integer> TOKEN_STATUS = Maps.newConcurrentMap();

 private final RedisTemplate<String, String> redisTemplate;

 /**
 * 释放锁,这个方法与 com.example.demo.utils.RedisLockUtil#simpleLock(java.lang.String, java.lang.Long) 配对。
 * <p>必须和RedisLockUtil#simpleLock是同一个线程</p>
 * @param key 需要释放锁的key
 * @return true-成功 false-失败
 */
 public boolean releaseSimpleLock(String key) {
 String token = THREAD_LOCAL.get();
 if (null != token) {
 TOKEN_STATUS.put(token, 1);
 }
 try {
 String lua = "if (redis.call('get', KEYS[1]) == ARGV[1]) " +
 "then redis.call('expire', KEYS[1], 0) return '1' end " +
 "return '0'";
 DefaultRedisScript<String> luaScript = new DefaultRedisScript<>(lua, String.class);
 String result = redisTemplate.execute(luaScript, Lists.newArrayList(key), token);
 log.info("非cluster模式简单分布式锁 - 释放key: {}, result : {}, token : {}", key, result, token);
 return SUCCESS.equals(result);
 } catch (Exception e) {
 log.error("非cluster模式简单分布式锁 - 释放锁发生异常,key : {}", key, e);
 return false;
 } finally {
 THREAD_LOCAL.remove();
 if (null != token) {
 TOKEN_STATUS.remove(token);
 }
 }
 }

 /**
 * 简单分布式锁实现,续约周期是 expireTime 的一半。举个例子, expireTime = 8000,那么锁续约将会是每 4000 毫秒续约一次
 * <p>这个方法不考虑Redis的集群架构,不考虑脑裂问题,当只有一个 Redis来考虑。</p>
 * <p>这个方法使用 com.example.demo.utils.RedisLockUtil#releaseSimpleLock(java.lang.String) 来释放锁</p>
 * @param key 需要上锁的key
 * @param expireTime 过期时间,单位:毫秒
 * @return true-成功 false-失败
 */
 public boolean simpleLock(String key, Long expireTime) {
 if (StringUtils.isBlank(key)) {
 log.warn("非cluster模式简单分布式锁 - key is blank");
 return false;
 }
 if (null == expireTime || expireTime <= 0) {
 expireTime = 0L;
 }
 // 续约周期,单位纳秒
 long renewPeriod = expireTime / 2 * 1000_000;
 try {
 String token = System.currentTimeMillis() + ":" + UUID.randomUUID();
 // 设置锁
 Boolean result = redisTemplate.opsForValue().setIfAbsent(key, token, expireTime, TimeUnit.MILLISECONDS);
 if (Boolean.FALSE.equals(result)) {
 return false;
 }
 log.info("非cluster模式简单分布式锁 - 上锁成功,key : {}, token : {}", key, token);
 // 上锁成功后将令牌绑定当前线程
 THREAD_LOCAL.set(token);
 TOKEN_STATUS.put(token, 0);
 if (renewPeriod > 0) {
 // 续约任务
 renewTask(key, token, expireTime, renewPeriod);
 }
 return true;
 } catch (Exception e) {
 log.error("非cluster模式简单分布式锁 - 上锁发生异常,key : {}", key, e);
 String token = THREAD_LOCAL.get();
 if (StringUtils.isNotBlank(token)) {
 if (!releaseSimpleLock(key)) {
 log.warn("非cluster模式简单分布式锁 - 释放锁发生失败,key : {}, token : {}", key, token);
 }
 }
 return false;
 }
 }

 /**
 * 锁续约任务
 * @param key 需要续命的key
 * @param token 成功获锁的线程持有的令牌
 * @param expireTime 过期时间,单位:毫秒
 * @param renewPeriod 续约周期,单位:纳秒
 */
 private void renewTask(String key, String token, long expireTime, long renewPeriod) {
 if (CAN_RENEW.equals(TOKEN_STATUS.get(token))) {
 EXECUTOR_SERVICE.schedule(() -> {
 if (CAN_RENEW.equals(TOKEN_STATUS.get(token))) {
 try {
 String lua = "if (redis.call('get', KEYS[1]) == ARGV[1]) " +
 "then " +
 " if (redis.call('set', KEYS[1], ARGV[1], 'PX', ARGV[2])) " +
 " then return '1' else return redis.call('get', KEYS[1]) end " +
 "end " +
 "return redis.call('get', KEYS[1])";
 DefaultRedisScript<String> luaScript = new DefaultRedisScript<>(lua, String.class);
 String result = redisTemplate.execute(luaScript, Lists.newArrayList(key), token, String.valueOf(expireTime));
 if (SUCCESS.equals(result)) {
 // 续约成功
 log.debug("非cluster模式简单分布式锁 - 锁续约成功,key : {}", key);
 // 开启下一次续约任务
 renewTask(key, token, expireTime, renewPeriod);
 } else {
 // 打印下 result,看下是否因为不再持有令牌导致的续约失败
 log.warn("非cluster模式简单分布式锁 - 锁续约失败,key : {}, token : {}, result : {}", key, token, result);
 }
 } catch (Exception e) {
 // 这里异常是抛不出去的,所以需要 catch 打印
 log.error("非cluster模式简单分布式锁 - 锁续约发生异常,key : {}", key, e);
 }
 }
 }, renewPeriod, TimeUnit.NANOSECONDS);
 }
 }
}

这里还有一个问题:如果redis.call('get', KEYS[1]) == ARGV[1] 成立,但是执行redis.call('expire', KEYS[1], 0) 失败,怎么办?我这里已经执行了THREAD_LOCAL.remove(),想重复释放是不可能的了,但是我这里不能不 remove 或者仅当 Redis 释放锁成功才 remove,这样存在内存泄漏的风险。要怎么处理呢?

这是优化后的代码

package com.example.demo.utils;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author CaiZhuliang
 * @date 2023/8/31
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class RedisSimpleLockUtil {
 private static final ScheduledExecutorService EXECUTOR_SERVICE = new ScheduledThreadPoolExecutor(50,
 new BasicThreadFactory.Builder()
 .namingPattern("redisSimpleLockUtil-schedule-pool-%d")
 .daemon(true)
 .build());

 private static final ThreadLocal<String> THREAD_LOCAL_TOKEN = new ThreadLocal<>();
 private static final String SUCCESS = "1";
 /**
 * 允许当前token续约
 */
 private static final Integer CAN_RENEW = 0;
 /**
 * 记录token的状态,0-可以续约,其他情况均不能续约
 */
 private static final Map<String, Integer> TOKEN_STATUS = Maps.newConcurrentMap();

 private final RedisTemplate<String, String> redisTemplate;

 /**
 * 释放锁
 * <p>必须和 RedisSimpleLockUtil#lock 是同一个线程</p>
 * @param key key 需要释放锁的key
 * @param token 持有的令牌
 * @return true-成功 false-失败
 */
 public boolean releaseLock(String key, String token) {
 if (StringUtils.isBlank(token)) {
 return false;
 }
 TOKEN_STATUS.put(token, 1);
 try {
 String lua = "if (redis.call('get', KEYS[1]) == ARGV[1]) " +
 "then redis.call('expire', KEYS[1], 0) return '1' end " +
 "return '0'";
 DefaultRedisScript<String> luaScript = new DefaultRedisScript<>(lua, String.class);
 String result = redisTemplate.execute(luaScript, Lists.newArrayList(key), token);
 log.info("非cluster模式简单分布式锁 - 释放key: {}, result : {}, token : {}", key, result, token);
 if (SUCCESS.equals(result)) {
 return true;
 }
 String remoteToken = redisTemplate.opsForValue().get(key);
 if (token.equals(remoteToken)) {
 log.warn("非cluster模式简单分布式锁 - 释放锁失败,key : {}, token : {}", key, token);
 return false;
 }
 return true;
 } catch (Exception e) {
 log.error("非cluster模式简单分布式锁 - 释放锁发生异常,key : {}, token : {}", key, token, e);
 return false;
 } finally {
 THREAD_LOCAL_TOKEN.remove();
 TOKEN_STATUS.remove(token);
 }
 }

 /**
 * 简单分布式锁实现,续约周期是 expireTime 的一半。举个例子, expireTime = 8000,那么锁续约将会是每 4000 毫秒续约一次
 * <p>这个方法不考虑Redis的集群架构,不考虑脑裂问题,当只有一个Redis来考虑。</p>
 * @param key 需要上锁的key
 * @param expireTime 过期时间,单位:毫秒
 * @return 上锁成功返回令牌,失败则返回空串
 */
 public String lock(String key, Long expireTime) {
 if (StringUtils.isBlank(key)) {
 log.warn("非cluster模式简单分布式锁 - key is blank");
 return StringUtils.EMPTY;
 }
 if (null == expireTime || expireTime <= 0) {
 expireTime = 0L;
 }
 // 续约周期,单位纳秒
 long renewPeriod = expireTime * 500_000;
 try {
 String token = System.currentTimeMillis() + ":" + UUID.randomUUID();
 // 设置锁
 Boolean result = redisTemplate.opsForValue().setIfAbsent(key, token, expireTime, TimeUnit.MILLISECONDS);
 if (Boolean.FALSE.equals(result)) {
 return StringUtils.EMPTY;
 }
 log.info("非cluster模式简单分布式锁 - 上锁成功,key : {}, token : {}", key, token);
 // 上锁成功后将令牌绑定当前线程
 THREAD_LOCAL_TOKEN.set(token);
 TOKEN_STATUS.put(token, 0);
 if (renewPeriod > 0) {
 // 续约任务
 log.info("非cluster模式简单分布式锁 - 添加续约任务,key : {}, token : {}, renewPeriod : {}纳秒", key, token, renewPeriod);
 renewTask(key, token, expireTime, renewPeriod);
 }
 return token;
 } catch (Exception e) {
 String token = THREAD_LOCAL_TOKEN.get();
 log.error("非cluster模式简单分布式锁 - 上锁发生异常,key : {}, token : {}", key, token, e);
 return StringUtils.isBlank(token) ? StringUtils.EMPTY : token;
 }
 }

 /**
 * 锁续约任务
 * @param key 需要续命的key
 * @param token 成功获锁的线程持有的令牌
 * @param expireTime 过期时间,单位:毫秒
 * @param renewPeriod 续约周期,单位:纳秒
 */
 private void renewTask(String key, String token, long expireTime, long renewPeriod) {
 try {
 EXECUTOR_SERVICE.schedule(() -> {
 try {
 String lua = "if (redis.call('get', KEYS[1]) == ARGV[1]) " +
 "then " +
 " if (redis.call('set', KEYS[1], ARGV[1], 'PX', ARGV[2])) " +
 " then return '1' else return redis.call('get', KEYS[1]) end " +
 "end " +
 "return redis.call('get', KEYS[1])";
 DefaultRedisScript<String> luaScript = new DefaultRedisScript<>(lua, String.class);
 String result = redisTemplate.execute(luaScript, Lists.newArrayList(key), token, String.valueOf(expireTime));
 if (SUCCESS.equals(result)) {
 // 续约成功
 log.debug("非cluster模式简单分布式锁 - 锁续约成功,key : {}, token : {}", key, token);
 // 这里加判断是为了减少定时任务
 if (CAN_RENEW.equals(TOKEN_STATUS.get(token))) {
 // 开启下一次续约任务
 renewTask(key, token, expireTime, renewPeriod);
 }
 } else {
 // 这里加判断是为了防止误打印warn日志
 if (CAN_RENEW.equals(TOKEN_STATUS.get(token))) {
 log.warn("非cluster模式简单分布式锁 - 锁续约失败,key : {}, token : {}, result : {}", key, token, result);
 }
 }
 } catch (Exception e) {
 // 这里异常是抛不出去的,所以需要 catch 打印
 log.error("非cluster模式简单分布式锁 - 锁续约发生异常,key : {}, token : {}", key, token, e);
 }
 }, renewPeriod, TimeUnit.NANOSECONDS);
 } catch (Exception e) {
 log.error("非cluster模式简单分布式锁 - 添加锁续约任务发生异常,key : {}, token : {}", key, token, e);
 }
 }
}
package com.example.demo.utils;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author CaiZhuliang
 * @date 2023/8/31
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class RedisSimpleLockUtil {
 private static final ScheduledExecutorService EXECUTOR_SERVICE = new ScheduledThreadPoolExecutor(50,
 new BasicThreadFactory.Builder()
 .namingPattern("redisSimpleLockUtil-schedule-pool-%d")
 .daemon(true)
 .build());

 private static final ThreadLocal<String> THREAD_LOCAL_TOKEN = new ThreadLocal<>();
 private static final String SUCCESS = "1";
 /**
 * 允许当前token续约
 */
 private static final Integer CAN_RENEW = 0;
 /**
 * 记录token的状态,0-可以续约,其他情况均不能续约
 */
 private static final Map<String, Integer> TOKEN_STATUS = Maps.newConcurrentMap();

 private final RedisTemplate<String, String> redisTemplate;

 /**
 * 释放锁
 * <p>必须和 RedisSimpleLockUtil#lock 是同一个线程</p>
 * @param key key 需要释放锁的key
 * @param token 持有的令牌
 * @return true-成功 false-失败
 */
 public boolean releaseLock(String key, String token) {
 if (StringUtils.isBlank(token)) {
 return false;
 }
 TOKEN_STATUS.put(token, 1);
 try {
 String lua = "if (redis.call('get', KEYS[1]) == ARGV[1]) " +
 "then redis.call('expire', KEYS[1], 0) return '1' end " +
 "return '0'";
 DefaultRedisScript<String> luaScript = new DefaultRedisScript<>(lua, String.class);
 String result = redisTemplate.execute(luaScript, Lists.newArrayList(key), token);
 log.info("非cluster模式简单分布式锁 - 释放key: {}, result : {}, token : {}", key, result, token);
 if (SUCCESS.equals(result)) {
 return true;
 }
 String remoteToken = redisTemplate.opsForValue().get(key);
 if (token.equals(remoteToken)) {
 log.warn("非cluster模式简单分布式锁 - 释放锁失败,key : {}, token : {}", key, token);
 return false;
 }
 return true;
 } catch (Exception e) {
 log.error("非cluster模式简单分布式锁 - 释放锁发生异常,key : {}, token : {}", key, token, e);
 return false;
 } finally {
 THREAD_LOCAL_TOKEN.remove();
 TOKEN_STATUS.remove(token);
 }
 }

 /**
 * 简单分布式锁实现,续约周期是 expireTime 的一半。举个例子, expireTime = 8000,那么锁续约将会是每 4000 毫秒续约一次
 * <p>这个方法不考虑Redis的集群架构,不考虑脑裂问题,当只有一个Redis来考虑。</p>
 * @param key 需要上锁的key
 * @param expireTime 过期时间,单位:毫秒
 * @return 上锁成功返回令牌,失败则返回空串
 */
 public String lock(String key, Long expireTime) {
 if (StringUtils.isBlank(key)) {
 log.warn("非cluster模式简单分布式锁 - key is blank");
 return StringUtils.EMPTY;
 }
 if (null == expireTime || expireTime <= 0) {
 expireTime = 0L;
 }
 // 续约周期,单位纳秒
 long renewPeriod = expireTime * 500_000;
 try {
 String token = System.currentTimeMillis() + ":" + UUID.randomUUID();
 // 设置锁
 Boolean result = redisTemplate.opsForValue().setIfAbsent(key, token, expireTime, TimeUnit.MILLISECONDS);
 if (Boolean.FALSE.equals(result)) {
 return StringUtils.EMPTY;
 }
 log.info("非cluster模式简单分布式锁 - 上锁成功,key : {}, token : {}", key, token);
 // 上锁成功后将令牌绑定当前线程
 THREAD_LOCAL_TOKEN.set(token);
 TOKEN_STATUS.put(token, 0);
 if (renewPeriod > 0) {
 // 续约任务
 log.info("非cluster模式简单分布式锁 - 添加续约任务,key : {}, token : {}, renewPeriod : {}纳秒", key, token, renewPeriod);
 renewTask(key, token, expireTime, renewPeriod);
 }
 return token;
 } catch (Exception e) {
 String token = THREAD_LOCAL_TOKEN.get();
 log.error("非cluster模式简单分布式锁 - 上锁发生异常,key : {}, token : {}", key, token, e);
 return StringUtils.isBlank(token) ? StringUtils.EMPTY : token;
 }
 }

 /**
 * 锁续约任务
 * @param key 需要续命的key
 * @param token 成功获锁的线程持有的令牌
 * @param expireTime 过期时间,单位:毫秒
 * @param renewPeriod 续约周期,单位:纳秒
 */
 private void renewTask(String key, String token, long expireTime, long renewPeriod) {
 try {
 EXECUTOR_SERVICE.schedule(() -> {
 try {
 String lua = "if (redis.call('get', KEYS[1]) == ARGV[1]) " +
 "then " +
 " if (redis.call('set', KEYS[1], ARGV[1], 'PX', ARGV[2])) " +
 " then return '1' else return redis.call('get', KEYS[1]) end " +
 "end " +
 "return redis.call('get', KEYS[1])";
 DefaultRedisScript<String> luaScript = new DefaultRedisScript<>(lua, String.class);
 String result = redisTemplate.execute(luaScript, Lists.newArrayList(key), token, String.valueOf(expireTime));
 if (SUCCESS.equals(result)) {
 // 续约成功
 log.debug("非cluster模式简单分布式锁 - 锁续约成功,key : {}, token : {}", key, token);
 // 这里加判断是为了减少定时任务
 if (CAN_RENEW.equals(TOKEN_STATUS.get(token))) {
 // 开启下一次续约任务
 renewTask(key, token, expireTime, renewPeriod);
 }
 } else {
 // 这里加判断是为了防止误打印warn日志
 if (CAN_RENEW.equals(TOKEN_STATUS.get(token))) {
 log.warn("非cluster模式简单分布式锁 - 锁续约失败,key : {}, token : {}, result : {}", key, token, result);
 }
 }
 } catch (Exception e) {
 // 这里异常是抛不出去的,所以需要 catch 打印
 log.error("非cluster模式简单分布式锁 - 锁续约发生异常,key : {}, token : {}", key, token, e);
 }
 }, renewPeriod, TimeUnit.NANOSECONDS);
 } catch (Exception e) {
 log.error("非cluster模式简单分布式锁 - 添加锁续约任务发生异常,key : {}, token : {}", key, token, e);
 }
 }
}

下面是并发单元测试代码

@Test
 public void concurrencyTest() {
 String[] nums = {"1", "2", "3", "4", "5"};
 List<CompletableFuture<Void>> list = Lists.newArrayListWithExpectedSize(100);
 for (int i = 0; i < 50; i++) {
 CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
 for (int count = 0; count < 10; count++) {
 int random = new Random().nextInt(100) % 5;
 String key = "test_" + nums[random];
 while (true) {
 String token = redisSimpleLockUtil.lock(key, 3_000L);
 if (StringUtils.isNotBlank(token)) {
 log.info("concurrencyTest - key : {}", key);
 try {
 Thread.sleep(new Random().nextInt(1500));
 } catch (Exception e) {
 log.error("concurrencyTest - 发生异常, key : {}", key, e);
 } finally {
 boolean unlock = redisSimpleLockUtil.releaseLock(key, token);
 if (!unlock) {
 log.error("concurrencyTest - 释放锁失败,key : {}", key);
 }
 }
 break;
 }
 }
 }
 });
 list.add(future);
 }
 CompletableFuture<?>[] futures = new CompletableFuture[list.size()];
 list.toArray(futures);
 CompletableFuture.allOf(futures).join();
 }

3.2 红锁

一般公司使用Redis 时都不可能是单节点的,要么主从+哨兵架构,要么就是 cluster 架构。面对集群,我们不得不思考如何应对脑裂这个问题。而 Redlock 是Redis官方网站给出解决方案

下面看下针对这两种集群架构的处理方式:

  1. 主从+哨兵

通过访问哨兵获取当前 master 节点,统计票数,超过半数的 master 节点就是真的 master。我们可以对比我们成功上锁的节点是否是真的 master node,从而避免脑裂问题。

  1. cluster
  2. 上锁需要在集群中半数以上的 master 操作成功了才算成功

3.2.1 红锁的问题

锁通过过半原则来规避脑裂,但是这就让我们不得不考虑访问节点的等待超时时间应该要多长。而且,也会降低Redis 分布式锁的吞吐量。如果有半数节点不可用,那么分布式锁也将变得不可用。因此,实际使用中我们还要结合自己实际的业务场景来权衡要不要用红锁或者修改实现方案。

作者介绍

蔡柱梁,51CTO社区编辑,从事Java后端开发8年,做过传统项目广电BOSS系统,后投身互联网电商,负责过订单,TMS,中间件等。


延伸 · 阅读

精彩推荐
  • RedisDocker Compose搭建Redis7.0.4高可用一主二从三哨兵集群并整合SpringBoot

    Docker Compose搭建Redis7.0.4高可用一主二从三哨兵集群并整合Spring

    一、前言 redis在我们企业级开发中是很常见的,但是单个redis不能保证我们的稳定使用,所以我们要建立一个集群。 redis有两种高可用的方案: High availab...

    未知1982023-05-07
  • RedisRedis集群方案

    Redis集群方案

    前段时间搞了搞Redis集群,想用做推荐系统的线上存储,说来挺有趣,这边基础架构不太完善,因此需要我们做推荐系统的自己来搭这个存储环境,就自己...

    EE_NovRain5552020-07-21
  • Redisredis3.2配置文件redis.conf详细说明

    redis3.2配置文件redis.conf详细说明

    redis3.2配置详解,Redis启动的时候,可以指定配置文件,详细说明请看本文说明 ...

    wdc4802019-11-11
  • RedisRedis总结笔记(一):安装和常用命令

    Redis总结笔记(一):安装和常用命令

    这篇文章主要介绍了Redis总结笔记(一):安装和常用命令,本文着重总结了常用命令,如对value操作的命令、对String操作的命令、对List操作的命令、对Set操作...

    junjie5092019-10-22
  • Redis巧用Redis实现分布式锁详细介绍

    巧用Redis实现分布式锁详细介绍

    大家好,本篇文章主要讲的是巧用Redis实现分布式锁详细介绍,感兴趣的同学赶快来看一看吧,对你有帮助的话记得收藏一下,方便下次浏览...

    Monster_起飞10912022-01-25
  • Redisredis快照模式_动力节点Java学院整理

    redis快照模式_动力节点Java学院整理

    这篇文章主要为大家详细介绍了redis快照模式的相关资料,具有一定的参考价值,感兴趣的小伙伴们可以参考一下 ...

    huangxincheng3822019-11-07
  • RedisRedis源码环境构建过程详解

    Redis源码环境构建过程详解

    这篇文章主要介绍了Redis源码环境构建过程,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下...

    辉度10922021-08-19
  • RedisRedis分布式锁python-redis-lock使用方法

    Redis分布式锁python-redis-lock使用方法

    这篇文章主要介绍了Redis分布式锁python-redis-lock使用方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可...

    -零9162021-01-03