package test.thread;
import java.io.Serializable;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import junit.framework.Assert;
import test.thread.ConcurrentLinkedQueue.Node;
public class ConcurrentLinkedQueue implements Serializable {
private static final long serialVersionUID = 1L;
private Node emptyNode = new Node(null, null);
private AtomicReference<Node> head = new AtomicReference<Node>(emptyNode);
private AtomicReference<Node> tail = new AtomicReference<Node>(emptyNode);
public static int num = 10000000;
private static AtomicInteger failCount = new AtomicInteger(0);
private Lock lock = new ReentrantLock();
public static int numThreads=4;
public static int numsToAdd=ConcurrentLinkedQueue.num/ConcurrentLinkedQueue.numThreads;
public boolean add(Node node) {
while (true) {
Node curTail = tail.get();
if (curTail == tail.get()) {
if (curTail.getNext().get() != null) {
tail.compareAndSet(curTail, curTail.getNext().get());
} else {
if (curTail.getNext().compareAndSet(null, node)) {
tail.compareAndSet(curTail, node);
return true;
}
}
}
// fail count ++
failCount.incrementAndGet();
}
}
public void addByLock(Node node) {
try {
lock.lock();
tail.get().getNext().set(node);
tail.set(node);
} finally {
lock.unlock();
}
}
public void print() {
Node node = head.get().getNext().get();
int count = 0;
Set<Integer> sets = new HashSet<Integer>();
while (node != null) {
count++;
// System.out.println("data:" + node.getData());
sets.add((Integer) node.getData());
node = node.getNext().get();
}
Assert.assertTrue(count == num);
// 验证设置结果集为0-num-1
for (int i = 0; i < num; i++) {
Assert.assertTrue(sets.contains(i));
}
// System.out.println("CAS失败次数:" + failCount);
}
public static void main(String[] args) {
ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
CountDownLatch latch = new CountDownLatch(numThreads);
ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
Long oldTime = System.currentTimeMillis();
for (int i = 0; i < numThreads; i++) {
Worker worker = new Worker();
worker.setLatch(latch);
worker.setStartIndex(i);
worker.setQueue(concurrentLinkedQueue);
executorService.execute(worker);
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
Long cost = System.currentTimeMillis() - oldTime;
concurrentLinkedQueue.print();
System.out.println("cost:" + cost);
}
static class Node {
private Object data;
private AtomicReference<Node> next;
public Node(Object data, Node node) {
this.data = data;
this.next = new AtomicReference<Node>(node);
}
public Node(Object data) {
this.data = data;
next = new AtomicReference<Node>(null);
}
public Object getData() {
return data;
}
public void setData(Object data) {
this.data = data;
}
public AtomicReference<Node> getNext() {
return next;
}
public void setNext(AtomicReference<Node> next) {
this.next = next;
}
}
}
class Worker implements Runnable {
private CountDownLatch latch;
private int startIndex;
private ConcurrentLinkedQueue queue;
public void run() {
startIndex=startIndex*ConcurrentLinkedQueue.numsToAdd;
for(int i=startIndex;i<startIndex+ConcurrentLinkedQueue.numsToAdd;i++){
queue.add(new Node(i));
}
latch.countDown();
}
public CountDownLatch getLatch() {
return latch;
}
public void setLatch(CountDownLatch latch) {
this.latch = latch;
}
public int getStartIndex() {
return startIndex;
}
public void setStartIndex(int startIndex) {
this.startIndex = startIndex;
}
public ConcurrentLinkedQueue getQueue() {
return queue;
}
public void setQueue(ConcurrentLinkedQueue queue) {
this.queue = queue;
}
}
CAS实现的一个简单LinkedQueue,布布扣,bubuko.com
CAS实现的一个简单LinkedQueue
原文:http://blog.csdn.net/zhaozhenzuo/article/details/20775877