java并发:CopyOnWrite机制

java

java.util.concurrent.CopyOnWriteArrayList是一个线程安全的 ArrayList,其修改操作是在底层的一个复制的数组(快照)上进行的,即用了写时复制策略。

其定义如下:

public class CopyOnWriteArrayList<E>

implements List<E>, RandomAccess, Cloneable, java.io.Serializable {

private static final long serialVersionUID = 8673264195747942595L;

/**

* The lock protecting all mutators. (We have a mild preference

* for builtin monitors over ReentrantLock when either will do.)

*/

final transient Object lock = new Object();

/** The array, accessed only via getArray/setArray. */

private transient volatile Object[] array;

/**

* Gets the array. Non-private so as to also be accessible

* from CopyOnWriteArraySet class.

*/

final Object[] getArray() {

return array;

}

其构造函数如下:

    /**

* Creates an empty list.

*/

public CopyOnWriteArrayList() {

setArray(new Object[0]);

}

/**

* Creates a list containing the elements of the specified

* collection, in the order they are returned by the collection's

* iterator.

*

* @param c the collection of initially held elements

* @throws NullPointerException if the specified collection is null

*/

public CopyOnWriteArrayList(Collection<? extends E> c) {

Object[] es;

if (c.getClass() == CopyOnWriteArrayList.class)

es = ((CopyOnWriteArrayList<?>)c).getArray();

else {

es = c.toArray();

if (c.getClass() != java.util.ArrayList.class)

es = Arrays.copyOf(es, es.length, Object[].class);

}

setArray(es);

}

/**

* Creates a list holding a copy of the given array.

*

* @param toCopyIn the array (a copy of this array is used as the

* internal array)

* @throws NullPointerException if the specified array is null

*/

public CopyOnWriteArrayList(E[] toCopyIn) {

setArray(Arrays.copyOf(toCopyIn, toCopyIn.length, Object[].class));

}

解释:

针对无参构造函数,其内部创建了一个大小为 0 的 Object 数组作为array 的初始值。

其添加元素的方法有多个,如下:

add(E e)的定义如下:

    /**

* Appends the specified element to the end of this list.

*

* @param e element to be appended to this list

* @return {@code true} (as specified by {@link Collection#add})

*/

public boolean add(E e) {

synchronized (lock) {

Object[] es = getArray();

int len = es.length;

es = Arrays.copyOf(es, len + 1);

es[len] = e;

setArray(es);

return true;

}

}

弱一致性问题

CopyOnWriteArrayList中获取指定位置的元素的方法如下:

    /**

* {@inheritDoc}

*

* @throws IndexOutOfBoundsException {@inheritDoc}

*/

public E get(int index) {

return elementAt(getArray(), index);

}

    static <E> E elementAt(Object[] a, int index) {

return (E) a[index];

}

解说:

从上述代码可以知道,当线程 x调用 get方法获取指定位置的元素时需要两步:

  • 获取 array数组(步骤A)
  • 通过下标访问指定位置的元素 (步骤B)

这个两步操作在整个过程中并没有加锁。

问题:假设线程 x执行完步骤 A 后,在执行步骤 B 前,线程 y进行了 remove操作,这时会发生什么?

remove方法的代码如下:

    /**

* Removes the element at the specified position in this list.

* Shifts any subsequent elements to the left (subtracts one from their

* indices). Returns the element that was removed from the list.

*

* @throws IndexOutOfBoundsException {@inheritDoc}

*/

public E remove(int index) {

synchronized (lock) {

Object[] es = getArray();

int len = es.length;

E oldValue = elementAt(es, index);

int numMoved = len - index - 1;

Object[] newElements;

if (numMoved == 0)

newElements = Arrays.copyOf(es, len - 1);

else {

newElements = new Object[len - 1];

System.arraycopy(es, 0, newElements, 0, index);

System.arraycopy(es, index + 1, newElements, index,

numMoved);

}

setArray(newElements);

return oldValue;

}

}

remove操作获取独占锁,然后进行写时复制操作,在复制的数组里删除 index 处的元素,随后让 array 指向新复制的数组。  

  

        

上图指出了线程 x 执行步骤B时操作的数组是线程 y 删除元素之前线程 x 复制的数组。

所以,线程 y 虽然己经删除了 index 处的元素,但是线程 x 的步骤 B 还是会返回index 处的元素,这就是写时复制策略的弱一致性问题。

迭代器弱一致性

大家都知道可以使用法代器遍历List的元素,当调用 CopyOnWriteArrayList 的 iterator()方法获取法代器时,实际上会返回一个 COWiterator对象。

    /**

* Returns an iterator over the elements in this list in proper sequence.

*

* <p>The returned iterator provides a snapshot of the state of the list

* when the iterator was constructed. No synchronization is needed while

* traversing the iterator. The iterator does <em>NOT</em> support the

* {@code remove} method.

*

* @return an iterator over the elements in this list in proper sequence

*/

public Iterator<E> iterator() {

return new COWIterator<E>(getArray(), 0);

}

    static final class COWIterator<E> implements ListIterator<E> {

/** Snapshot of the array */

private final Object[] snapshot;

/** Index of element to be returned by subsequent call to next. */

private int cursor;

COWIterator(Object[] es, int initialCursor) {

cursor = initialCursor;

snapshot = es;

}

public boolean hasNext() {

return cursor < snapshot.length;

}

public boolean hasPrevious() {

return cursor > 0;

}

@SuppressWarnings("unchecked")

public E next() {

if (! hasNext())

throw new NoSuchElementException();

return (E) snapshot[cursor++];

}

@SuppressWarnings("unchecked")

public E previous() {

if (! hasPrevious())

throw new NoSuchElementException();

return (E) snapshot[--cursor];

}

public int nextIndex() {

return cursor;

}

public int previousIndex() {

return cursor - 1;

}

/**

* Not supported. Always throws UnsupportedOperationException.

* @throws UnsupportedOperationException always; {@code remove}

* is not supported by this iterator.

*/

public void remove() {

throw new UnsupportedOperationException();

}

/**

* Not supported. Always throws UnsupportedOperationException.

* @throws UnsupportedOperationException always; {@code set}

* is not supported by this iterator.

*/

public void set(E e) {

throw new UnsupportedOperationException();

}

/**

* Not supported. Always throws UnsupportedOperationException.

* @throws UnsupportedOperationException always; {@code add}

* is not supported by this iterator.

*/

public void add(E e) {

throw new UnsupportedOperationException();

}

@Override

public void forEachRemaining(Consumer<? super E> action) {

Objects.requireNonNull(action);

final int size = snapshot.length;

int i = cursor;

cursor = size;

for (; i < size; i++)

action.accept(elementAt(snapshot, i));

}

}

解说:

COWiterator对象的 snapshot变量保存了当前 array的内容,cursor是遍历数据的下标。

在线程使用返回的迭代器遍历元素的过程中,如果有其他线程对 list 进行了增删改,那么 snapshot就是快照了。

因为增删改后指向 array的变量(即本文最开始着色了的那行代码,private transient volatile Object[] array;)指向了新数组。 

所以在获取迭代器遍历元素时,其他线程对 list 进行的增删改对外不可见,因为它们操作的是两个不同的数组,这就是弱一致性。 

CopyOnWriteArraySet

java.util.concurrent.CopyOnWriteArraySet是在CopyOnWriteArrayList的基础上使用了装饰模式,其很多方法都是调用CopyOnWriteArrayList的API来实现的。

其添加元素的代码如下:

    /**

* Adds the specified element to this set if it is not already present.

* More formally, adds the specified element {@code e} to this set if

* the set contains no element {@code e2} such that

* {@code Objects.equals(e, e2)}.

* If this set already contains the element, the call leaves the set

* unchanged and returns {@code false}.

*

* @param e element to be added to this set

* @return {@code true} if this set did not already contain the specified

* element

*/

public boolean add(E e) {

return al.addIfAbsent(e);

}

解说:

从CopyOnWriteArraySet的定义以及构造函数可以看到其底层数据结构就是CopyOnWriteArrayList,代码如下:

public class CopyOnWriteArraySet<E> extends AbstractSet<E>

implements java.io.Serializable {

private static final long serialVersionUID = 5457747651344034263L;

private final CopyOnWriteArrayList<E> al;

/**

* Creates an empty set.

*/

public CopyOnWriteArraySet() {

al = new CopyOnWriteArrayList<E>();

}

/**

* Creates a set containing all of the elements of the specified

* collection.

*

* @param c the collection of elements to initially contain

* @throws NullPointerException if the specified collection is null

*/

public CopyOnWriteArraySet(Collection<? extends E> c) {

if (c.getClass() == CopyOnWriteArraySet.class) {

@SuppressWarnings("unchecked") CopyOnWriteArraySet<E> cc =

(CopyOnWriteArraySet<E>)c;

al = new CopyOnWriteArrayList<E>(cc.al);

}

else {

al = new CopyOnWriteArrayList<E>();

al.addAllAbsent(c);

}

}

以上是 java并发:CopyOnWrite机制 的全部内容, 来源链接: utcz.com/z/394690.html

回到顶部