public class SharedCounterExample implements SharedCountListener{private static final int QTY = 5;private static final String PATH = "/examples/counter";public static void main(String[] args) throws IOException, Exception{final Random rand = new Random();SharedCounterExample example = new SharedCounterExample();CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));client.start();SharedCount baseCount = new SharedCount(client, PATH, 0);baseCount.addListener(example);baseCount.start();List<SharedCount> examples = Lists.newArrayList();ExecutorService service = Executors.newFixedThreadPool(QTY);for (int i = 0; i < QTY; ++i){final SharedCount count = new SharedCount(client, PATH, 0);examples.add(count);Callable<Void> task = new Callable<Void>(){@Overridepublic Void call() throws Exception{count.start();Thread.sleep(rand.nextInt(10000));count.setCount(rand.nextInt(10000));System.out.println("计数器当前值:" + count.getVersionedValue().getValue());System.out.println("计数器当前版本:" + count.getVersionedValue().getVersion());System.out.println("trySetCount:" + count.trySetCount(count.getVersionedValue(), 123));return null;}};service.submit(task);}service.shutdown();service.awaitTermination(10, TimeUnit.MINUTES);for (int i = 0; i < QTY; ++i){examples.get(i).close();}baseCount.close();client.close();System.out.println("OK!");}@Overridepublic void stateChanged(CuratorFramework client, ConnectionState newState){System.out.println("连接状态: " + newState.toString());}@Overridepublic void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception{System.out.println("计数器值改变:" + newCount);}}
连接状态: CONNECTED计数器当前值:1684计数器当前版本:11trySetCount:true计数器值改变:123计数器当前值:8425计数器当前版本:13trySetCount:true计数器值改变:123计数器当前值:9369计数器当前版本:15trySetCount:true计数器值改变:123计数器当前值:4075计数器当前版本:17trySetCount:true计数器值改变:123计数器当前值:9221计数器当前版本:19trySetCount:trueOK!

public class DistributedAtomicLong implements DistributedAtomicNumber<Long>{private final DistributedAtomicValue value;......}public class DistributedAtomicValue{......AtomicValue<byte[]> trySet(MakeValue makeValue) throws Exception{MutableAtomicValue<byte[]> result = new MutableAtomicValue<byte[]>(null, null, false);tryOptimistic(result, makeValue);if ( !result.succeeded() && (mutex != null) ){tryWithMutex(result, makeValue);}return result;}......}
public class DistributedAtomicLongExample{private static final int QTY = 5;private static final String PATH = "/examples/counter";public static void main(String[] args) throws IOException, Exception{final Random rand = new Random();CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));client.start();List<DistributedAtomicLong> examples = Lists.newArrayList();ExecutorService service = Executors.newFixedThreadPool(QTY);for (int i = 0; i < QTY; ++i){final DistributedAtomicLong count = new DistributedAtomicLong(client, PATH, new RetryNTimes(10, 10));examples.add(count);Callable<Void> task = new Callable<Void>(){@Overridepublic Void call() throws Exception{try{Thread.sleep(1000 + rand.nextInt(10000));AtomicValue<Long> value = count.increment();System.out.println("修改成功: " + value.succeeded());if (value.succeeded()){System.out.println("修改之前的值:" + value.preValue() + " | 修改之后的值:" + value.postValue());}}catch (Exception e){e.printStackTrace();}return null;}};service.submit(task);}service.shutdown();service.awaitTermination(10, TimeUnit.MINUTES);client.close();System.out.println("OK!");}}
修改成功: true修改之前的值:0 | 修改之后的值:1修改成功: true修改之前的值:1 | 修改之后的值:2修改成功: true修改之前的值:2 | 修改之后的值:3修改成功: true修改之前的值:3 | 修改之后的值:4修改成功: true修改之前的值:4 | 修改之后的值:5OK!

原文:http://www.cnblogs.com/LiZhiW/p/4941771.html