首页 > 其他 > 详细

redis实现分布式锁

时间:2019-09-10 14:13:22      阅读:86      评论:0      收藏:0      [点我收藏+]

  jedis原生方式

  相关依赖

        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.5.0</version>
        </dependency>

  代码如下

package jedisLock;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

/**
 * @author asd on 2019-07-16
 * @description jedisUtil
 */
public class JedisUtil {

    private static JedisPool jedisPool;
    static {
        JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
        jedisPoolConfig.setMaxTotal(10000);
        jedisPoolConfig.setMaxWaitMillis(10000);
        jedisPoolConfig.setMaxIdle(1000);
        jedisPool = new JedisPool(jedisPoolConfig,"localhost",6379,100000);

    }

    public static Jedis getRerouse(){
        return  jedisPool.getResource();
    }


}
package jedisLock;

import redis.clients.jedis.Jedis;

import java.util.Collections;

/**
 * @author hehang on 2019-07-16
 * @description 使用jedis
 */
public class JedisDistributeLock {

    private static final String SUCCESS ="OK";
    private static final String SET_IF_NOT_EXIST ="NX";
    private static final String SET_WITH_EXPIRE_TIME ="EX";

    //释放锁成功标示
    private static final Long RELEASE_SUCCESS = 1L;

    //获取锁时的睡眠等待时间片,单位毫秒
    private static final long SLEEP_PER = 5;

    //默认过期时间
    public static final int DEFAULT_EXPIRE_1000_Milliseconds = 1000;



    private static Boolean tryGetLock(Jedis jedis,String key,String requestId,int expireTime){

        String result = jedis.set(key,requestId,SET_IF_NOT_EXIST,SET_WITH_EXPIRE_TIME,expireTime);
        if(SUCCESS.equals(result)){
            return true;
        }
        return false;
    }


    private static Boolean releaseLock(Jedis jedis,String key,String requestId){
        String script = "if redis.call(‘get‘, KEYS[1]) == ARGV[1] then return redis.call(‘del‘, KEYS[1]) else return 0 end";
        Object result = jedis.eval(script, Collections.singletonList(key),Collections.singletonList(requestId));
        if(RELEASE_SUCCESS.equals(result)){
            return true;
        }
        return false;
    }

    public static void lock(String key,String value,int expireTime){
        try (Jedis jedis = JedisUtil.getRerouse()){
            while(!tryGetLock(jedis,key,value,expireTime)){
                //缺点,睡眠期间其它线程释放锁不能及时收到
                try {
                    Thread.sleep(SLEEP_PER);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static boolean tryLock(String key,String value,int expireTime){
        try (Jedis jedis = JedisUtil.getRerouse()){
            return  tryGetLock(jedis,key,value,expireTime);
        }
    }

    public static void unlock(String key,String value){
        try (Jedis jedis = JedisUtil.getRerouse()){
            releaseLock(jedis,key,value);
        }
    }
}
package jedisLock;

import java.util.UUID;

/**
 * @author hehang on 2019-07-17
 * @description redisService
 */
public class JedisLockService {

    private static String lockkey = "lock";

    private ThreadLocal<String> threadLocal = new ThreadLocal<>();

    private void setThreadLocal(String uuid){
        threadLocal.set(uuid);
    }

    private String getThreadLocal(){
        return threadLocal.get();
    }

    public void lock(){
        String value = UUID.randomUUID().toString();
        JedisDistributeLock.lock(lockkey,value,5000);
        setThreadLocal(value);
    }


    public void unlock(){
        String value = getThreadLocal();
        JedisDistributeLock.unlock(lockkey,value);
    }
}

  测试相关类,为了方便在SellTicketTask定义一个成员变量作为共享变量,同时三个线程取改变其值,来模拟分布式场景

package jedisLock;

/**
 * @author hehang on 2019-07-17
 * @description 卖票
 */
public class SellTicketTask implements Runnable {

    private int tickets = 100;

    JedisLockService jedisLockService = new JedisLockService();
    @Override
    public void run() {
        while (tickets>0){
            try {
                jedisLockService.lock();
                if(tickets>0){
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + "正在出售第 " + tickets-- + " 张票");

                }
            }finally {
                jedisLockService.unlock();
            }
        }
        System.out.println(Thread.currentThread().getName()+"线程结束");

    }
}
package jedisLock;

/**
 * @author hehang on 2019-07-17
 * @description 测试类
 */
public class JedisLockTest {

    public static void main(String[] args) {
        SellTicketTask sellTicketTask = new SellTicketTask();
        for (int i = 0; i <3 ; i++) {
            new Thread(sellTicketTask,"窗口"+i).start();
        }
    }
}

  redission方式

  相关依赖

     <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.9.1</version>
        </dependency>

  redission方式使用相对简单,其底层也是采用lua脚本

package redission;

import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;

/**
 * @author hehang on 2019-07-17
 * @description Redisson lock
 * 推荐使用成熟的开源项目,实现分布式锁
 */
public class SellTicketTask implements Runnable{

    private int tickets = 100;
    private RLock lock = getRlock();

    private RLock getRlock(){
        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");
        RedissonClient client = Redisson.create(config);
        RLock rLock = client.getLock("zxczxc");
        return rLock;
    }


    @Override
    public void run() {
        while (tickets>0){
            try {
                lock.lock();
                if(tickets>0){
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + "正在出售第 " + tickets-- + " 张票");

                }
            }finally {
                lock.unlock();
            }
        }
        System.out.println(Thread.currentThread().getName()+"线程结束");

    }


}
package redission;

/**
 * @author hehang on 2019-07-17
 * @description asd
 */
public class TestRedissonLock {

    public static void main(String[] args) {

        SellTicketTask sellTicketTask = new SellTicketTask();

        for (int i = 0; i < 3; i++) {
            new Thread(sellTicketTask,"窗口" +i).start();
        }


    }
}

  springboot2.0.x配置lettuce

  相关依赖

     <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <!-- 2.0以上版本默认连接池是lettuce -->
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.5.0</version>
        </dependency>

  相关代码,关键点是通过lua脚本来获取和释放锁、redis发布订阅机制来通知其它锁释放、本地lock来避免同一个服务内竞争锁减少redis压力。

package com.jlwj.redislock.Config;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

/**
 * @author hehang on 2019-07-17
 * @description asd
 */
@Configuration
@EnableCaching
public class RedisConfig {

    @Bean
    public RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
        //设置序列化
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        // 配置redisTemplate
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>();
        redisTemplate.setConnectionFactory(lettuceConnectionFactory);
        RedisSerializer stringSerializer = new StringRedisSerializer();
        redisTemplate.setKeySerializer(stringSerializer); // key序列化
        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); // value序列化
        redisTemplate.setHashKeySerializer(stringSerializer); // Hash key序列化
        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer); // Hash value序列化
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }

}
package com.jlwj.redislock.springRedisLock;

import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;

import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author hehang on 2019-07-17
 * @description asd
 *
 */
public class SpringRedisLock {

    private static final RedisScript<Long> lua_lock = new DefaultRedisScript<>("if redis.call(\"setnx\", KEYS[1], KEYS[2]) == 1 then return redis.call(\"pexpire\", KEYS[1], KEYS[3]) else return 0 end", Long.class);

    private static final RedisScript<Long> lua_unlock = new DefaultRedisScript<>("if redis.call(\"get\",KEYS[1]) == KEYS[2] then return redis.call(\"del\",KEYS[1]) else return -1 end", Long.class);

    RedisTemplate redisTemplate;
    String resourceName;
    Integer timeOut =1;
    ThreadLocal<String> threadLocal = new ThreadLocal<>();
    // 多台机器的情况下,会出现大量的等待,加重redis的压力。 在lock方法上,加入同步关键字。单机同步,多机用redis
    Lock lock = new ReentrantLock();

    public SpringRedisLock(String resourceName,RedisTemplate redisTemplate,int timeOut){
        this.resourceName ="lock_" + resourceName;
        this.redisTemplate = redisTemplate;
        this.timeOut = timeOut;
    }



    public void lock() {
        try {
            lock.lock();
            while(!tryLock()){

                redisTemplate.execute(new RedisCallback<Boolean>() {
                    @Override
                    public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
                        CountDownLatch waiter = new CountDownLatch(1);
                        // 等待通知结果,使用jedis在此处会阻塞
                        connection.subscribe((message, pattern) -> {
                            // 收到通知,不管结果,立刻再次抢锁
                            waiter.countDown();
                        }, (resourceName + "_unlock_channel").getBytes());
                        try {
                            // 等待一段时间,超过这个时间都没收到消息,肯定有问题
                            waiter.await(timeOut, TimeUnit.SECONDS);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        return true; //随便返回一个值都没问题
                    }
                });
                System.out.println("继续下一次循环");
            }


        }  finally {
            lock.unlock();
        }



    }


    public boolean tryLock() {
        String value =UUID.randomUUID().toString();
        List<String> keys = Arrays.asList(resourceName, value, String.valueOf(1000));
        Long result = (Long) redisTemplate.execute(lua_lock,keys);
        if(result==1){
            System.out.println(Thread.currentThread().getName() + "获取到锁");
            threadLocal.set(value);
            return true;
        }else{
            return false;
        }
    }


    public void unlock() {
        //1、 要比对内部的值,同一个线程,才能够去释放锁。 2、 同时发出通知
        String value = threadLocal.get();
        try {
            List<String> keys = Arrays.asList(resourceName, value);
            Long result = (Long) redisTemplate.execute(lua_unlock,keys);
            if(result !=-1){
                System.out.println(Thread.currentThread().getName() + "释放锁");
                redisTemplate.execute(new RedisCallback() {
                    @Override
                    public Object doInRedis(RedisConnection redisConnection) throws DataAccessException {
                        redisConnection.publish((resourceName + "_unlock_channel").getBytes(),"".getBytes());
                        return null;
                    }
                });
            }

        }finally {
            threadLocal.remove();
        }
    }

}
package com.jlwj.redislock.springRedisLock;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;

/**
 * @author hehang on 2019-07-17
 *
 */

@Service
public class SellTicketSpringService{

    @Autowired
    RedisTemplate redisTemplate;

    private SpringRedisLock springRedisLock;

    private Long tickets = 100L;

    @PostConstruct
    public void init() {
        springRedisLock = new SpringRedisLock("test_lock",redisTemplate, 1);
    }
    public boolean buyTicket(String userId) {
        Boolean status = false;
        try {
            springRedisLock.lock();
            if(tickets>0){
                System.out.println(userId + "正在买第 " + tickets-- + " 张票");
                status = true;
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            springRedisLock.unlock();
        }
        return status;
    }
}

  测试类

package com.jlwj.redislock;

import com.jlwj.redislock.springRedisLock.SellTicketSpringService;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.concurrent.CountDownLatch;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RedisLockApplicationTests {


    long timed = 0L;

    @Before
    public void start() {
        System.out.println("开始测试");
        timed  =System.currentTimeMillis();
    }

    @After
    public void end() {
        System.out.println("结束测试,执行时长:" + (System.currentTimeMillis() - timed));
    }

    @Autowired
    private SellTicketSpringService sellTicketSpringService;




    /**
     * 注意切换redis客户端为lettuce
     */
    @Test
    public void buy() throws InterruptedException {
        // 模拟的请求数量
        final int threadNum = 100;
        // 倒计数器,用于模拟高并发(信号枪机制)
        CountDownLatch cdl = new CountDownLatch(threadNum);
        // 循环创建N个线程
        Thread[] threads = new Thread[threadNum];
        for (int i = 0; i < threadNum; i++) {
            String userId = "Tony" +i;
            Thread thread = new Thread(() -> {
                try {
                    // 等待cdl值为0,也就是其他线程就绪后,再运行后续的代码
                    cdl.await();
                    // http请求实际上就是多线程调用这个方法
                    sellTicketSpringService.buyTicket(userId);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            threads[i] = thread;
            thread.start();
            // 倒计时器 减一
            cdl.countDown();

        }

        // 等待上面所有线程执行完毕之后,结束测试
        for (Thread thread : threads) {
            thread.join();
        }
    }

}

 

redis实现分布式锁

原文:https://www.cnblogs.com/hhhshct/p/11496109.html

(1)
(1)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!