集群环境下,变量A会同时存在于三台服务器内存中。
需要保证它们的一致性,也就是说同时只能有一个线程对它进行操作。
在单机情况下锁很好处理,但在集群环境中,多线程、多进程并且分布在不同机器上,这将使原单机情况下的并发控制锁策略失效,单纯的应用并不能提供分布式锁的能力。为了解决这个问题就需要一种跨机器的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!
分布式锁应该具备哪些条件:
redis_url = "redis://" conn = redis.StrictRedis().from_url(redis_url) def acquire_lock(conn, lock_name, acquire_timeout=5, lock_timeout=3): """ 基于redis的分布式锁实现 :param conn: redis连接 :param lock_name: 锁名 :param acquire_timeout: 获取锁的超时时间 :param lock_timeout: 锁的超时时间 :return: """ identifier = str(uuid.uuid4()) lockname = "lock:" + lock_name end = time.time() + acquire_timeout while time.time() < end: if conn.set(lockname, identifier, ex=lock_timeout, nx=True): return identifier time.sleep(0.001) return False def release_lock(conn, lock_name, identifier): """ 释放锁 :param conn: redis连接 :param lock_name: 锁名 :param identifier: 锁的标识 :return: """ with conn.pipeline() as pipe: lockname = ‘lock:‘ + lock_name while True: try: pipe.watch(lockname) ident = pipe.get(lockname) if ident and ident.decode(‘utf-8‘) == identifier: # 事务开始 pipe.multi() pipe.delete(lockname) pipe.execute() return True pipe.unwatch() break except redis.exceptions.WatchError: pass return False
import time import uuid import threading import redis redis_url = "" # 类实现 class DistributedLockRedis(): def __init__(self, lock_name=None, conn=None, redis_url=None): # 建立redis连接 if redis_url: self._conn = redis.StrictRedis().from_url(redis_url) elif conn is not None: self._conn = conn else: raise AttributeError("redis数据库连接异常。") if not lock_name: raise ValueError(‘锁名为空。‘) self._lockname = ‘lock:‘ + lock_name self._identifier = str(uuid.uuid4()) def acquire_lock(self, acquire_timeout=1000, lock_timeout=3): """ 基于redis的分布式锁实现 :param conn: redis连接 :param acquire_timeout: 获取锁的超时时间 :param lock_timeout: 锁的超时时间 :return: """ end = time.time() + acquire_timeout while time.time() < end: if self._conn.set(self._lockname, self._identifier, ex=lock_timeout, nx=True): return True time.sleep(0.001) return False def release_lock(self): """ 释放锁 """ with self._conn.pipeline() as pipe: while True: try: pipe.watch(self._lockname) ident = pipe.get(self._lockname) if ident and ident.decode(‘utf-8‘) == self._identifier: # 事务开始 pipe.multi() pipe.delete(self._lockname) pipe.execute() return True pipe.unwatch() break except redis.exceptions.WatchError: pass return False def __enter__(self): if self.acquire_lock(): return self else: raise RuntimeError("获取锁失败。") def __exit__(self, exc_type, exc_val, exc_tb): if self.release_lock(): return else: raise RuntimeError("解锁失败。") # 测试部分 def thread_worker_2(i, lock, *ar): global count with lock: # identifier = lock.acquire_lock(acquire_timeout=1000) print(‘线程{}获取锁。‘.format(i)) time.sleep(1) if count < 1: print("线程{}获取资源失败,资源为空。".format(i)) result[‘not done‘].append(i) return else: for x in range(50000): count -= 1 print("线程{}获取资源成功,剩余资源{}个。".format(i, count)) result[‘done‘].append(i) # lock.release_lock() def test_distributed_lock(): lock = DistributedLockRedis(lock_name=‘mylock‘, redis_url=redis_url) thread_list = [] for _ in range(12): t = threading.Thread(target=thread_worker_2, args=(_, lock)) thread_list.append(t) for _ in thread_list: _.start() for _ in thread_list: _.join() print(result) print(count) # 测试对象 count = 500000 # 测试结果 result = {‘done‘:list(), ‘not done‘:list()} if __name__ == ‘__main__‘: pass #run() test_distributed_lock() pass
上述锁基于Redis单实例,假设这个单实例总是可用,这种方法已经足够安全。但是如果 Redis 主节点挂了就会出现一些问题,比如主节点加锁后没有同步到从节点,从节点升为主节点,就会出现锁的丢失。如果你想要使用更加安全的 Redis 分布式锁实现可以参考一下 Redlock 的实现。
从Redis主从架构上来考虑,依然存在问题。因为Redis集群数据同步到各个节点时是异步的,如果在 Master 节点获取到锁后,在没有同步到其它节点时,Master 节点崩溃了,此时新的 Master 节点依然可以获取锁,所以多个应用服务可以同时获取到锁。
基于以上的考虑,Redis之父Antirez提出了一个RedLock算法。
RedLock算法实现过程分析:
假设Redis部署模式是Redis Cluster,总共有5个master节点,通过以下步骤获取一把锁:
以上过程前文也提到了,进一步分析RedLock算法的实现依然可能存在问题,也是Martain和Antirez两位大佬争论的焦点。
问题1:节点崩溃重启
节点崩溃重启,会出现多个客户端持有锁。
假设一共有5个Redis节点:A、B、 C、 D、 E。设想发生了如下的事件序列:
1)客户端C1成功对Redis集群中A、B、C三个节点加锁成功(但D和E没有锁住)。
2)节点C Duang的一下,崩溃重启了,但客户端C1在节点C加锁未持久化完,丢了。
3)节点C重启后,客户端C2成功对Redis集群中C、D、 E尝试加锁成功了。
这样,悲剧了吧!客户端C1和C2同时获得了同一把分布式锁。
为了应对节点重启引发的锁失效问题,Antirez提出了延迟重启的概念,即一个节点崩溃后,先不立即重启它,而是等待一段时间再重启,等待的时间大于锁的有效时间。
采用这种方式,这个节点在重启前所参与的锁都会过期,它在重启后就不会对现有的锁造成影响。
这其实也是通过人为补偿措施,降低不一致发生的概率。
问题2:时钟跳跃
假设一共有5个Redis节点:A、B、 C、 D、 E。设想发生了如下的事件序列:
1)客户端C1成功对Redis集群中A、B、 C三个节点成功加锁。但因网络问题,与D和E通信失败。
2)节点C上的时钟发生了向前跳跃,导致它上面维护的锁快速过期。
3)客户端C2对Redis集群中节点C、 D、 E成功加了同一把锁。
此时,又悲剧了吧!客户端C1和C2同时都持有着同一把分布式锁。
为了应对时钟跳跃引发的锁失效问题,Antirez提出了应该禁止人为修改系统时间,使用一个不会进行「跳跃式」调整系统时钟的ntpd程序。这也是通过人为补偿措施,降低不一致发生的概率。
但是...,RedLock算法并没有解决,操作共享资源超时,导致锁失效的问题。
存在这么大争议的算法实现,还是不推荐使用的。
原文:https://www.cnblogs.com/wodeboke-y/p/12912840.html