// 构造方法public InterProcessMutex(CuratorFramework client, String path)public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver)// 通过acquire获得锁,并提供超时机制:public void acquire() throws Exceptionpublic boolean acquire(long time, TimeUnit unit) throws Exception// 撤销锁public void makeRevocable(RevocationListener<InterProcessMutex> listener)public void makeRevocable(final RevocationListener<InterProcessMutex> listener, Executor executor)
public class FakeLimitedResource{private final AtomicBoolean inUse = new AtomicBoolean(false);// 模拟只能单线程操作的资源public void use() throws InterruptedException{if (!inUse.compareAndSet(false, true)){// 在正确使用锁的情况下,此异常不可能抛出throw new IllegalStateException("Needs to be used by one client at a time");}try{Thread.sleep((long) (3 * Math.random()));}finally{inUse.set(false);}}}
public class ExampleClientThatLocks{private final InterProcessMutex lock;private final FakeLimitedResource resource;private final String clientName;public ExampleClientThatLocks(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName){this.resource = resource;this.clientName = clientName;lock = new InterProcessMutex(client, lockPath);}public void doWork(long time, TimeUnit unit) throws Exception{if (!lock.acquire(time, unit)){throw new IllegalStateException(clientName + " 不能得到互斥锁");}try{System.out.println(clientName + " 已获取到互斥锁");resource.use(); // 使用资源Thread.sleep(1000 * 1);}finally{System.out.println(clientName + " 释放互斥锁");lock.release(); // 总是在finally中释放}}}
public class InterProcessMutexExample{private static final int QTY = 5;private static final int REPETITIONS = QTY * 10;private static final String PATH = "/examples/locks";public static void main(String[] args) throws Exception{final FakeLimitedResource resource = new FakeLimitedResource();final List<CuratorFramework> clientList = new ArrayList<CuratorFramework>();for (int i = 0; i < QTY; i++){CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));client.start();clientList.add(client);}System.out.println("连接初始化完成!");ExecutorService service = Executors.newFixedThreadPool(QTY);for (int i = 0; i < QTY; ++i){final int index = i;Callable<Void> task = new Callable<Void>(){@Overridepublic Void call() throws Exception{try{final ExampleClientThatLocks example = new ExampleClientThatLocks(clientList.get(index), PATH, resource, "Client " + index);for (int j = 0; j < REPETITIONS; ++j){example.doWork(10, TimeUnit.SECONDS);}}catch (Throwable e){e.printStackTrace();}finally{CloseableUtils.closeQuietly(clientList.get(index));}return null;}};service.submit(task);}service.shutdown();service.awaitTermination(10, TimeUnit.MINUTES);System.out.println("OK!");}}
连接初始化完成!Client 4 已获取到互斥锁Client 4 释放互斥锁Client 3 已获取到互斥锁Client 3 释放互斥锁......Client 2 已获取到互斥锁Client 2 释放互斥锁OK!

public void doWork(long time, TimeUnit unit) throws Exception{if (!lock.acquire(time, unit)){throw new IllegalStateException(clientName + " 不能得到互斥锁");}System.out.println(clientName + " 已获取到互斥锁");if (!lock.acquire(time, unit)){throw new IllegalStateException(clientName + " 不能得到互斥锁");}System.out.println(clientName + " 再次获取到互斥锁");try{resource.use(); // 使用资源Thread.sleep(1000 * 1);}finally{System.out.println(clientName + " 释放互斥锁");lock.release(); // 总是在finally中释放lock.release(); // 获取锁几次 释放锁也要几次}}

public class ExampleClientReadWriteLocks{private final InterProcessReadWriteLock lock;private final InterProcessMutex readLock;private final InterProcessMutex writeLock;private final FakeLimitedResource resource;private final String clientName;public ExampleClientReadWriteLocks(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName){this.resource = resource;this.clientName = clientName;lock = new InterProcessReadWriteLock(client, lockPath);readLock = lock.readLock();writeLock = lock.writeLock();}public void doWork(long time, TimeUnit unit) throws Exception{// 注意只能先得到写锁再得到读锁,不能反过来!!!if (!writeLock.acquire(time, unit)){throw new IllegalStateException(clientName + " 不能得到写锁");}System.out.println(clientName + " 已得到写锁");if (!readLock.acquire(time, unit)){throw new IllegalStateException(clientName + " 不能得到读锁");}System.out.println(clientName + " 已得到读锁");try{resource.use(); // 使用资源Thread.sleep(1000 * 1);}finally{System.out.println(clientName + " 释放读写锁");readLock.release();writeLock.release();}}}
连接初始化完成!Client 1 已得到写锁Client 1 已得到读锁Client 1 释放读写锁......Client 3 已得到写锁Client 3 已得到读锁Client 3 释放读写锁OK!

public void returnLease(Lease lease)public void returnAll(Collection<Lease> leases)
public Lease acquire() throws Exceptionpublic Collection<Lease> acquire(int qty) throws Exceptionpublic Lease acquire(long time, TimeUnit unit) throws Exceptionpublic Collection<Lease> acquire(int qty, long time, TimeUnit unit) throws Exception
public class InterProcessSemaphoreExample{private static final int MAX_LEASE = 10;private static final String PATH = "/examples/locks";public static void main(String[] args) throws Exception{FakeLimitedResource resource = new FakeLimitedResource();CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));client.start();InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, PATH, MAX_LEASE);Collection<Lease> leases = semaphore.acquire(5);System.out.println("获取租约数量:" + leases.size());Lease lease = semaphore.acquire();System.out.println("获取单个租约");resource.use();Collection<Lease> leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS);System.out.println("获取租约,如果为空则超时: " + leases2);System.out.println("释放租约");semaphore.returnLease(lease);System.out.println("释放集合中的所有租约");semaphore.returnAll(leases);client.close();System.out.println("OK!");}}
获取租约数量:5获取单个租约获取租约,如果为空则超时: null释放租约释放集合中的所有租约OK!

public InterProcessMultiLock(CuratorFramework client, List<String> paths)public InterProcessMultiLock(List<InterProcessLock> locks)
public class InterProcessMultiLockExample{private static final String PATH1 = "/examples/locks1";private static final String PATH2 = "/examples/locks2";public static void main(String[] args) throws Exception{FakeLimitedResource resource = new FakeLimitedResource();CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));client.start();InterProcessLock lock1 = new InterProcessMutex(client, PATH1); // 可重入锁InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, PATH2); // 不可重入锁InterProcessMultiLock lock = new InterProcessMultiLock(Arrays.asList(lock1, lock2));if (!lock.acquire(10, TimeUnit.SECONDS)){throw new IllegalStateException("不能获取多锁");}System.out.println("已获取多锁");System.out.println("是否有第一个锁: " + lock1.isAcquiredInThisProcess());System.out.println("是否有第二个锁: " + lock2.isAcquiredInThisProcess());try{resource.use(); // 资源操作}finally{System.out.println("释放多个锁");lock.release(); // 释放多锁}System.out.println("是否有第一个锁: " + lock1.isAcquiredInThisProcess());System.out.println("是否有第二个锁: " + lock2.isAcquiredInThisProcess());client.close();System.out.println("OK!");}}
已获取多锁是否有第一个锁: true是否有第二个锁: true释放多个锁是否有第一个锁: false是否有第二个锁: falseOK!

原文:http://www.cnblogs.com/LiZhiW/p/4931577.html