论文地址:implementing Lock-Free Queue
论文大体讲的意思是:Lock-Base的程序的performance不好,并且a process inside the critical section can delay all operations indenitely;所以基于以上的弊端,他们提出了Non-Blocking的算法,也就是CSW和FAA,当然就是CAS,而CAS也有最难以handler的情况,也就是ABA问题,他们给出了solution,也就是检查引用;他们分别给出了链表场景和数组场景的algorithm,最后是性能分析。
这篇笔记主要为了给Lock-Free提供一些实现方法和思路。
我们先看链表的情况:

双端链表,入队Enqueue方法是tail移动,出队Dequeue是Head移动,下面记录一下伪代码。
Enqueue(x) q new record q^:value x q^:next NULL repeat p tail succ Compare&Swap(p^:next, NULL, q) if succ 6= TRUE Compare&Swap(tail ; p; p^:next) until succ = TRUE Compare&Swap(tail ; p; q) end
Dequeue() repeat p head if p^:next = NULL error queue empty until Compare&Swap(head ; p; p^:next) return p^:next^:value end
Enqueue的思路是,先设置tail的next指针,如果成功,则把tail指针移动;Dequeue的思路是,如果head的copy p的next不为空,则进行移动,并在成功之后返回之前p的next的值,Java没有指针操作,只有使用unSafe类进行内存操作。(数组实现要比链表实现稳定多了)在跑线程的时候,add会有几率出现Runnable的情况,目前还不知道什么原因,最初的原因是把变量赋值写在for(;;)里,导致它一直不取值,只做循环体,将赋值写在循环体里好了一些,但还是会出现死循环问题。
import sun.misc.Unsafe; import java.lang.reflect.Field; /** * Created by MacBook on 2019/4/14. */ public class MyLockFreeLinkQueue<E> implements MyQueue<E>{ Node<E> head; Node<E> tail; static Unsafe unsafe; private static final long headOffset; private static final long tailOffset; private static final long nextOffset; static{ try{ Field singleoneInstanceField = Unsafe.class.getDeclaredField("theUnsafe"); singleoneInstanceField.setAccessible(true); unsafe = (Unsafe)singleoneInstanceField.get(null); headOffset = unsafe.objectFieldOffset (MyLockFreeLinkQueue.class.getDeclaredField("head")); tailOffset = unsafe.objectFieldOffset (MyLockFreeLinkQueue.class.getDeclaredField("tail")); nextOffset = unsafe.objectFieldOffset (MyLockFreeLinkQueue.Node.class.getDeclaredField("next")); }catch (Exception e){ throw new Error(e); } } static class Node<E>{ E data; Node<E> next; public Node(E data, Node<E> next) { this.data = data; this.next = next; } public E getData() { return data; } public void setData(E data) { this.data = data; } public Node<E> getNext() { return next; } public void setNext(Node<E> next) { this.next = next; } } public MyLockFreeLinkQueue() { head = tail = new Node<>(null,null); } /** * Enqueue(x) q new record q^:value x q^:next NULL repeat p tail succ Compare&Swap(p^:next, NULL, q) if succ 6= TRUE Compare&Swap(tail ; p; p^:next) until succ = TRUE Compare&Swap(tail ; p; q) end * @param e * @return */ @Override public boolean add(E e) { Node<E> q = new Node<>(e,null); for(;;){ Node<E> p = tail; if(unsafe.compareAndSwapObject(p,nextOffset,null,q)){ while(unsafe.compareAndSwapObject(this,tailOffset,p,q)); break; } } return true; } /** * Dequeue() repeat p head if p^:next = NULL error queue empty until Compare&Swap(head ; p; p^:next) return p^:next^:value end * @return */ @Override public E take() { for(;;){ Node<E> p = head,next = p.getNext(); if(next == null){ return null; }else if(unsafe.compareAndSwapObject(this,headOffset,p,next)){ p.setNext(null);// help gc return next.getData(); } } } }
这里我跑了2个生产者线程,2个消费者线程,数据有序的被消费了。
...
pool-1-thread-3 send [62] to queue; total 193 pool-1-thread-3 send [97] to queue; total 194 pool-1-thread-3 send [25] to queue; total 195 pool-1-thread-3 send [17] to queue; total 196 pool-1-thread-3 send [50] to queue; total 197 pool-1-thread-3 send [72] to queue; total 198 pool-1-thread-3 send [46] to queue; total 199 pool-1-thread-3 send [83] to queue; total 200 pool-1-thread-4 consumer [62],count 2n+1 result :125; total 193 pool-1-thread-4 consumer [97],count 2n+1 result :195; total 194 pool-1-thread-4 consumer [25],count 2n+1 result :51; total 195 pool-1-thread-4 consumer [17],count 2n+1 result :35; total 196 pool-1-thread-4 consumer [50],count 2n+1 result :101; total 197 pool-1-thread-4 consumer [72],count 2n+1 result :145; total 198 pool-1-thread-4 consumer [46],count 2n+1 result :93; total 199 pool-1-thread-4 consumer [83],count 2n+1 result :167; total 200
接下来,我实践了环形数组的实现,基于我之前实现的BlockingQueue,这个Lock-Free Queue会变得比较简单。

import java.util.concurrent.atomic.AtomicInteger;
/**
* Created by MacBook on 2019/4/13.
*/
public class MyLockFreeQueue<E> implements MyQueue<E>{
private Object[] data;
private AtomicInteger takeIndex;
private AtomicInteger putIndex;
private AtomicInteger size;
private static final int DEFAULT_CAPACITY = 10;
public MyLockFreeQueue (){
this(DEFAULT_CAPACITY);
}
public MyLockFreeQueue(int initCapacity){
if(initCapacity < 0){
throw new IllegalStateException("initCapacity must not be negative");
}
data = new Object[initCapacity];
takeIndex = new AtomicInteger(0);
putIndex = new AtomicInteger(0);
size = new AtomicInteger(0);
}
public boolean add(E e){
if(e == null){
throw new NullPointerException("the element you put can‘t be null");
}
for(int index = putIndex.get();;){
if(size.get() == data.length){
return false;
}
int expect = (index == data.length - 1)?0:(index+1);
if(putIndex.compareAndSet(index,expect)){
data[index] = e;
size.incrementAndGet();
return true;
}
}
}
public E take(){
for(int index = takeIndex.get();;){
if(size.get() == 0){
return null;
}
int expect = (index == data.length - 1)?0:(index+1);
E e = (E)data[index];
if(takeIndex.compareAndSet(index,expect)){
size.decrementAndGet();
return e;
}
}
}
}
思路就是,使用两个标记入队和出队的Atom Integer对象,在成功申请当前格子之后,给当前格子赋值,使用size来判断是否EMPTY和FULL。这里依然有一点缺陷,就是index和size不同步的问题,不过我也是跑了2+2线程,也是有序消费了。
...pool-1-thread-3 send [81] to queue; total 188 pool-1-thread-2 consumer [81],count 2n+1 result :163; total 188 pool-1-thread-3 send [1] to queue; total 189 pool-1-thread-2 consumer [1],count 2n+1 result :3; total 189 pool-1-thread-2 consumer [19],count 2n+1 result :39; total 190 pool-1-thread-3 send [19] to queue; total 190 pool-1-thread-3 send [61] to queue; total 191 pool-1-thread-2 consumer [61],count 2n+1 result :123; total 191 pool-1-thread-3 send [16] to queue; total 192 pool-1-thread-2 consumer [16],count 2n+1 result :33; total 192 pool-1-thread-3 send [74] to queue; total 193 pool-1-thread-2 consumer [74],count 2n+1 result :149; total 193 pool-1-thread-3 send [38] to queue; total 194 pool-1-thread-2 consumer [38],count 2n+1 result :77; total 194 pool-1-thread-3 send [32] to queue; total 195 pool-1-thread-2 consumer [32],count 2n+1 result :65; total 195 pool-1-thread-3 send [9] to queue; total 196 pool-1-thread-2 consumer [9],count 2n+1 result :19; total 196 pool-1-thread-3 send [77] to queue; total 197 pool-1-thread-2 consumer [77],count 2n+1 result :155; total 197 pool-1-thread-3 send [69] to queue; total 198 pool-1-thread-2 consumer [69],count 2n+1 result :139; total 198 pool-1-thread-3 send [52] to queue; total 199 pool-1-thread-2 consumer [52],count 2n+1 result :105; total 199 pool-1-thread-3 send [81] to queue; total 200 pool-1-thread-2 consumer [81],count 2n+1 result :163; total 200
ExecutorService executor = Executors.newFixedThreadPool(6); MyLockFreeQueue<Integer> queue = new MyLockFreeQueue(); Worker<Integer> pro = new Provider(queue); Worker<Integer> con = new Consumer(queue); executor.submit(pro); executor.submit(con); executor.submit(pro); executor.submit(con); executor.submit(pro); executor.submit(con); executor.shutdown();
原文:https://www.cnblogs.com/chentingk/p/10705141.html