首页 > 其他 > 详细

分布式锁

时间:2020-05-18 21:38:22      阅读:73      评论:0      收藏:0      [点我收藏+]

分布式锁


1. 分布式锁

集群环境下,变量A会同时存在于三台服务器内存中。
需要保证它们的一致性,也就是说同时只能有一个线程对它进行操作。

在单机情况下锁很好处理,但在集群环境中,多线程、多进程并且分布在不同机器上,这将使原单机情况下的并发控制锁策略失效,单纯的应用并不能提供分布式锁的能力。为了解决这个问题就需要一种跨机器的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!

分布式锁应该具备哪些条件:

  1. 互斥性:在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行;
  2. 高可用的获取锁与释放锁;
  3. 高性能的获取锁与释放锁;
  4. 重入特性:同一线程可以重复多次加锁。
  5. 锁超时:具备锁失效机制,防止死锁;
  6. 非阻塞锁特性,即没有获取到锁将直接返回获取锁失败
  7. 支持公平锁和非公平锁:公平锁是指按照请求加锁的顺序获得锁,非公平锁真好相反请求加锁是无序的。

1.1. 基于redis实现分布式锁

1.1.1. 实现思想:

  1. 获取锁的时候,使用setnx加锁,并使用expire命令为锁添加一个超时时间,超过该时间则自动释放锁,锁的value值为一个随机生成的UUID,通过此在释放锁的时候进行判断。
  2. 获取锁的时候还设置一个获取的超时时间,若超过这个时间则放弃获取锁。
  3. 释放锁的时候,通过UUID判断是不是该锁,若是该锁,则执行delete进行锁释放。

1.2. 简单分布式锁的实现

  

redis_url = "redis://"  
conn = redis.StrictRedis().from_url(redis_url)  

def acquire_lock(conn, lock_name, acquire_timeout=5, lock_timeout=3):  
    """  
    基于redis的分布式锁实现  
    :param conn: redis连接  
    :param lock_name: 锁名  
    :param acquire_timeout: 获取锁的超时时间  
    :param lock_timeout: 锁的超时时间  
    :return:  
    """  

    identifier = str(uuid.uuid4())  
    lockname = "lock:" + lock_name  
    end = time.time() + acquire_timeout  

    while time.time() < end:  
        if conn.set(lockname, identifier, ex=lock_timeout, nx=True):  
            return identifier  
        time.sleep(0.001)  
    return False  

def release_lock(conn, lock_name, identifier):  
    """  
    释放锁  
    :param conn: redis连接  
    :param lock_name: 锁名  
    :param identifier: 锁的标识  
    :return:  
    """  
    with conn.pipeline() as pipe:  
        lockname = ‘lock:‘ + lock_name  

        while True:  
            try:  
                pipe.watch(lockname)  
                ident = pipe.get(lockname)  
                if ident and ident.decode(‘utf-8‘) == identifier:  
                    # 事务开始  
                    pipe.multi()  
                    pipe.delete(lockname)  
                    pipe.execute()  
                    return True  

                pipe.unwatch()  
                break  
            except redis.exceptions.WatchError:  
                pass  
        return False  

1.3. 重构一下,使用with,便于使用

  

import time  
import uuid  
import threading  
import redis  

redis_url = ""  

# 类实现  
class DistributedLockRedis():  
    def __init__(self, lock_name=None, conn=None, redis_url=None):  
        # 建立redis连接  
        if redis_url:  
            self._conn = redis.StrictRedis().from_url(redis_url)  
        elif conn is not None:  
            self._conn = conn  
        else:  
            raise AttributeError("redis数据库连接异常。")  

        if not lock_name:  
            raise ValueError(‘锁名为空。‘)  
        self._lockname = ‘lock:‘ + lock_name  

        self._identifier = str(uuid.uuid4())  

    def acquire_lock(self, acquire_timeout=1000, lock_timeout=3):  
        """  
        基于redis的分布式锁实现  
        :param conn: redis连接  
        :param acquire_timeout: 获取锁的超时时间  
        :param lock_timeout: 锁的超时时间  
        :return:  
        """  
        end = time.time() + acquire_timeout  
        while time.time() < end:  
            if self._conn.set(self._lockname, self._identifier, ex=lock_timeout, nx=True):  
                return True  
            time.sleep(0.001)  
        return False  

    def release_lock(self):  
        """  
        释放锁  
        """  
        with self._conn.pipeline() as pipe:  
            while True:  
                try:  
                    pipe.watch(self._lockname)  
                    ident = pipe.get(self._lockname)  
                    if ident and ident.decode(‘utf-8‘) == self._identifier:  
                        # 事务开始  
                        pipe.multi()  
                        pipe.delete(self._lockname)  
                        pipe.execute()  
                        return True  

                    pipe.unwatch()  
                    break  
                except redis.exceptions.WatchError:  
                    pass  
            return False  

    def __enter__(self):  
        if self.acquire_lock():  
            return self  
        else:  
            raise RuntimeError("获取锁失败。")  

    def __exit__(self, exc_type, exc_val, exc_tb):  
        if self.release_lock():  
            return  
        else:  
            raise RuntimeError("解锁失败。")  


# 测试部分  

def thread_worker_2(i, lock, *ar):  
    global count  
    with lock:  
        # identifier = lock.acquire_lock(acquire_timeout=1000)  
        print(‘线程{}获取锁。‘.format(i))  
        time.sleep(1)  
        if count < 1:  
            print("线程{}获取资源失败,资源为空。".format(i))  
            result[‘not done‘].append(i)  
            return  
        else:  
            for x in range(50000):  
                count -= 1  
            print("线程{}获取资源成功,剩余资源{}个。".format(i, count))  
            result[‘done‘].append(i)  
        # lock.release_lock()  

def test_distributed_lock():  
    lock = DistributedLockRedis(lock_name=‘mylock‘, redis_url=redis_url)  
    thread_list = []  
    for _ in range(12):  
        t = threading.Thread(target=thread_worker_2, args=(_, lock))  
        thread_list.append(t)  

    for _ in thread_list:  
        _.start()  

    for _ in thread_list:  
        _.join()  
    print(result)  
    print(count)  

# 测试对象  
count = 500000  
# 测试结果  
result = {‘done‘:list(), ‘not done‘:list()}  

if __name__ == ‘__main__‘:  
    pass  
    #run()  
    test_distributed_lock()  
    pass  

1.4. 总结

上述锁基于Redis单实例,假设这个单实例总是可用,这种方法已经足够安全。但是如果 Redis 主节点挂了就会出现一些问题,比如主节点加锁后没有同步到从节点,从节点升为主节点,就会出现锁的丢失。如果你想要使用更加安全的 Redis 分布式锁实现可以参考一下 Redlock 的实现。

2. redlock

从Redis主从架构上来考虑,依然存在问题。因为Redis集群数据同步到各个节点时是异步的,如果在 Master 节点获取到锁后,在没有同步到其它节点时,Master 节点崩溃了,此时新的 Master 节点依然可以获取锁,所以多个应用服务可以同时获取到锁。
基于以上的考虑,Redis之父Antirez提出了一个RedLock算法。

RedLock算法实现过程分析:
假设Redis部署模式是Redis Cluster,总共有5个master节点,通过以下步骤获取一把锁:

  1. 获取当前时间戳,单位是毫秒
  2. 轮流尝试在每个master节点上创建锁,过期时间设置较短,一般就几十毫秒
  3. 尝试在大多数节点上建立一个锁,比如5个节点就要求是3个节点(n / 2 +1)
  4. 客户端计算建立好锁的时间,如果建立锁的时间小于超时时间,就算建立成功了
  5. 要是锁建立失败了,那么就依次删除这个锁
  6. 只要有客户端创建成功了分布式锁,其他客户端就得不断轮询去尝试获取锁

以上过程前文也提到了,进一步分析RedLock算法的实现依然可能存在问题,也是Martain和Antirez两位大佬争论的焦点。

问题1:节点崩溃重启
节点崩溃重启,会出现多个客户端持有锁。

假设一共有5个Redis节点:A、B、 C、 D、 E。设想发生了如下的事件序列:
1)客户端C1成功对Redis集群中A、B、C三个节点加锁成功(但D和E没有锁住)。
2)节点C Duang的一下,崩溃重启了,但客户端C1在节点C加锁未持久化完,丢了。
3)节点C重启后,客户端C2成功对Redis集群中C、D、 E尝试加锁成功了。

这样,悲剧了吧!客户端C1和C2同时获得了同一把分布式锁。
为了应对节点重启引发的锁失效问题,Antirez提出了延迟重启的概念,即一个节点崩溃后,先不立即重启它,而是等待一段时间再重启,等待的时间大于锁的有效时间。
采用这种方式,这个节点在重启前所参与的锁都会过期,它在重启后就不会对现有的锁造成影响。
这其实也是通过人为补偿措施,降低不一致发生的概率。

问题2:时钟跳跃
假设一共有5个Redis节点:A、B、 C、 D、 E。设想发生了如下的事件序列:
1)客户端C1成功对Redis集群中A、B、 C三个节点成功加锁。但因网络问题,与D和E通信失败。
2)节点C上的时钟发生了向前跳跃,导致它上面维护的锁快速过期。
3)客户端C2对Redis集群中节点C、 D、 E成功加了同一把锁。

此时,又悲剧了吧!客户端C1和C2同时都持有着同一把分布式锁。
为了应对时钟跳跃引发的锁失效问题,Antirez提出了应该禁止人为修改系统时间,使用一个不会进行「跳跃式」调整系统时钟的ntpd程序。这也是通过人为补偿措施,降低不一致发生的概率。
但是...,RedLock算法并没有解决,操作共享资源超时,导致锁失效的问题。
存在这么大争议的算法实现,还是不推荐使用的。

分布式锁

原文:https://www.cnblogs.com/wodeboke-y/p/12912840.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!