只做记录,直接上代码
父类:
package com.ylcloud.common.lock; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.serialize.SerializableSerializer; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; /** * @author cjh * @Description: zk分布锁 * @date: 2018/9/27 11:36 */ public class ZkLock { private static Logger logger = LogManager.getLogger(ZkLock.class); public String ZOOKEEPER_IP_PORT = "127.0.0.1:2181"; public Integer sessionTimeout = 30000; public Integer connectTimeout = 50000; //节点锁标记 public String LOCK_PATH; // 前一个节点(设置用户添加监听器) public String beforeNode; // 当前执行节点(设置用于删除) public String currentNode; //当前请求节点 public String threadTag = null; public ZkClient client; private static Object lock1 = new Object(); private static Object lock2 = new Object(); public ZkLock() { client = new ZkClient(ZOOKEEPER_IP_PORT, sessionTimeout, connectTimeout, new SerializableSerializer()); } public void lock() { synchronized (lock1) { if (!client.exists(LOCK_PATH)) { client.createPersistent(LOCK_PATH); } if (!tryLock()) { waitForLock(); } /*else { logger.info(Thread.currentThread().getName() + " 获得分布式锁!{} " + currentNode); }*/ } } public void unlock() { synchronized (lock2) { if (currentNode != null) { client.delete(currentNode); currentNode = null; } } } private boolean tryLock() { ZkClient zkClient = client; threadTag = zkClient.createEphemeralSequential(LOCK_PATH + ‘/‘, ""); List<String> childrens = zkClient.getChildren(LOCK_PATH); Collections.sort(childrens); currentNode = LOCK_PATH + ‘/‘ + childrens.get(0); if (threadTag.equals(currentNode)) { //logger.info( " {} 创建 !{}" ,Thread.currentThread().getName() , threadTag); return true; } else { int wz = Collections.binarySearch(childrens, threadTag.substring(LOCK_PATH.length() + 1)); beforeNode = LOCK_PATH + ‘/‘ + childrens.get(wz - 1); } return false; } private void waitForLock() { final CountDownLatch latch = new CountDownLatch(1); final IZkDataListener listener = new IZkDataListener() { @Override public void handleDataDeleted(String dataPath) throws Exception { //logger.info( "{}:捕获到DataDelete事件!--------------------------- {} " ,Thread.currentThread().getName() , dataPath ); if (latch != null && latch.getCount() > 0) { latch.countDown(); } } @Override public void handleDataChange(String dataPath, Object data) throws Exception { } }; ZkClient zkClient = client; if (zkClient.exists(beforeNode)) { zkClient.subscribeDataChanges(beforeNode, listener); try { //logger.info( "{} 等待 2 ! {} 当前执行 {}" ,Thread.currentThread().getName() , beforeNode , currentNode); latch.await(sessionTimeout, TimeUnit.MILLISECONDS); } catch (Exception e) { logger.info(e.getMessage()); } zkClient.unsubscribeDataChanges(beforeNode, listener); //logger.info( "{} 获得分布式锁 2 !" , Thread.currentThread().getName()); currentNode = threadTag; } beforeNode = null; } }
子类
package com.ylcloud.common.lock.ext; import com.ylcloud.common.lock.ZkLock; public class ZkLockUserCode extends ZkLock { public ZkLockUserCode() { super.LOCK_PATH = "/USER_CODE"; } }
使用示例:
private ZkLock zkLock = new ZkLockUserCode(); @Override public void addUser() { zkLock.lock(); //业务实现 zkLock.unlock(); }
转载请注明博客出处:http://www.cnblogs.com/cjh-notes/
原文:https://www.cnblogs.com/cjh-notes/p/9744183.html