java源码学习ConcurrentHashMap
还是从put()开始
public V put(K key, V value) {return putVal(key, value, false);
}
put()会再调一个putValue(),因为要传入 onlyIfAbsent 参数同意更新元素,与HashMap不一样的是 key的散列算法不是在put()完成,而是在putValue()完成
/** * key:储存到HashMap的key * value:储存到HashMap的key对应的value * onlyIfAbsent:如果包含了该key,则不更新对应的值,众所周知,put()除了新增之外,还有更新的功能,是因为put()调用putVal()时候传的都是false */final V putVal(K key, V value, boolean onlyIfAbsent) {//ConcurrentHashMap的键值都不允许为null
if (key == null || value == null) throw new NullPointerException();
//计算key的hash值
int hash = spread(key.hashCode());
// 记录当前储存的位置是否需要转行成红黑树结构
int binCount = 0;
// 循环ConcurrentHashMap的Node数组,找出对于存储的位置
for (ConcurrentHashMap.Node<K,V>[] tab = table;;) {
// 定义了几个变量:
// f:存放在数组相同下标位置的第一个元素
// n:当前ConcurrentHashMap的Node数组的长度
// i:当前存储元素需要存放的数组下标
// fh : 存放在数组相同下标位置的第一个元素的hash值
ConcurrentHashMap.Node<K,V> f; int n, i, fh;
// 判断ConcurrentHashMap的Node数组长度,如果等于null或者长度等于0的情况,则进行扩容操作
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 判断当前元素需要存储的数组位置是否为null,如果是null的情况下则尝试存放数据
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 这里使用了CAS操作(乐观锁),尝试存放元素,如果元素存放成功则直接结束当前put(),否则继续找位置
if (casTabAt(tab, i, null,
new ConcurrentHashMap.Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 这里判断当前ConcurrentHashMap是否正在进行扩容操作
// 如果是在进行扩容操作,则当前线程也帮忙扩容,并且存放元素
else if ((fh = f.hash) == MOVED)
// 帮忙进行扩容操作,helpTransfer() 这个函数是帮助正在扩容的线程
// 一起进行扩容操作并且存放元素
tab = helpTransfer(tab, f);
else {
// 定义了一个指向旧值的变量,当本次put()操作是更新操作时,则使用该变量指向旧值
// 并且更新完成之后返回旧值
V oldVal = null;
// 上面也说过 f 是每个相同元素下标位置的首个元素,
// 这里对 f 进行了加锁,这样能锁住所有即将存在 i 位置元素的put()操作
// 并且不影响其他下标位置的元素插入,相比HashTable的方法锁大大提高了性能
synchronized (f) {
// 如果 f 没又改变,则继续操作,否则重复第一个循环
if (tabAt(tab, i) == f) {
// 判断当前节点是否链表节点
if (fh >= 0) {
// 记录当前储存的位置是否需要转行成红黑树结构
binCount = 1;
// 遍历链表找出存放的位置
for (ConcurrentHashMap.Node<K,V> e = f;; ++binCount) {
K ek;
// 如果遍历到的节点 key一致,则进行更新操作 并且oldVal变量存储旧值,用于后续返回
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
// 如果一致没有相同的key,则在链表最后添加新节点
ConcurrentHashMap.Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new ConcurrentHashMap.Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 如果当前数组下标节点等于 TreeBin 的时候,则进行按红黑树规则进行添加或修改操作
else if (f instanceof ConcurrentHashMap.TreeBin) {
ConcurrentHashMap.Node<K,V> p;
binCount = 2;
if ((p = ((ConcurrentHashMap.TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 判断当前链表的长度是否需要转换成红黑树结构
// static final int TREEIFY_THRESHOLD = 8;
// 当链表长度大于等于 8 的时候,则进行转换成红黑树操作
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 记录当前ConcurrentHashMap长度并且判断是否需要进行扩容操作
addCount(1L, binCount);
return null;
}
// 表示当前table正在进行扩容操作static final int MOVED = -1; // hash for forwarding nodes// 用于管理TreeNode,标记元素数据结构为红黑树时static final int TREEBIN = -2; // hash for roots of trees
put()操作使用了synchronized 与CAS乐观锁,当存储在数组相同位置的元素为null时候,会使用CAS操作进行尝试存储,如果存储成功则直接走最后的记录长度和判断是否需要扩容流程。数组相同位置的元素不为null,则先使用 synchronized 锁住该对象,这样就锁住了该数组下标的整个数据结构(链表或者红黑树的管理者TreeBin)再进行与HashMap大致相同的链表或者红黑树存储规则
下面随便画了一个 此处 synchronized 锁的图
接下来看看初始化容量函数 initTable()
private final ConcurrentHashMap.Node<K,V>[] initTable() {ConcurrentHashMap.Node<K,V>[] tab; int sc;
// 判断当前ConcurrentHashMap数组长度是否需要扩容
while ((tab = table) == null || tab.length == 0) {
// 判断是否有其他线程在进行初始化容量操作
// 如果 sizeCtl < 0 则说明有其他线程在进行初始化容量操作
if ((sc = sizeCtl) < 0)
// 如果有其他线程在进行初始化容量,则降低当前线程的优先级
Thread.yield(); // lost initialization race; just spin
// 如果没有其他线程进行初始化容量则使用CAS操作尝试获取到初始化容量的资格
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// 再判断一次是否需要初始化容量
// 当首个线程先取得初始化容量资格时候,很大可能已经完成了
// 其他减低了优先级的线程即使再获取到容量初始化资格都会被该判断阻挡
if ((tab = table) == null || tab.length == 0) {
// 获取默认初始化容量
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
// 初始化ConcurrentHashMap 数组
@SuppressWarnings("unchecked")
ConcurrentHashMap.Node<K,V>[] nt = (ConcurrentHashMap.Node<K,V>[])new ConcurrentHashMap.Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
// 返回初始化的数组
return tab;
}
initTable()主要也还是使用到了CAS操作,使只有一个线程经常初始化,并且table使用了volatile修饰线程共享资源,初始化一旦完成,其他线程立马知道了该table已经初始化,在高并发多线程情况下,有些被降低了优先级的线程后面也会通过CAS获取到初始化资格,但是下面还有一个判断table的长度,就无需再进行扩容操作了,所以ConcurrentHashMap的容量初始化是由双重校验+CAS+volatile保证了其线程安全。HashMap里面扩容与初始化容量都是同一个函数,而ConcurrentHashMap则是分开了2个函数,初始化容量一个,扩容一个(transfer())
transient volatile Node<K,V>[] table;
下面看看扩容函数 transfer()与协助扩容函数 helpTransfer()
/** * tab:当前的table * nextTab:扩容后的table */private final void transfer(ConcurrentHashMap.Node<K,V>[] tab, ConcurrentHashMap.Node<K,V>[] nextTab) {int n = tab.length, stride;
// 获取CPU每个核的处理量,默认16
// private static final int MIN_TRANSFER_STRIDE = 16;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// 判断扩容后的 table 是否为null
// 如果为null则在此处进行创建新 table
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
ConcurrentHashMap.Node<K,V>[] nt = (ConcurrentHashMap.Node<K,V>[])new ConcurrentHashMap.Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
// 记录新的table
nextTable = nextTab;
// 记录旧的table长度
transferIndex = n;
}
// 记录新 table的长度
int nextn = nextTab.length;
// 创建一个 ForwardingNode 并且说明当前的table正在进行扩容操作
ConcurrentHashMap.ForwardingNode<K,V> fwd = new ConcurrentHashMap.ForwardingNode<K,V>(nextTab);
// 记录当前 f 当前数组下标位置的某个元素数据是否需要计算新table下的新下标
boolean advance = true;
// 记录是否完成数据扫描
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
ConcurrentHashMap.Node<K,V> f; int fh;
// 遍历旧table中的节点,计算新table中的下标
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
// 当该旧table全部节点转移到新的table上时,结束扩容
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
// talbe 指向新table
nextTable = null;
table = nextTab;
//扩容阈值设置为原来容量的1.5倍
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// 使用CAS操作更新这个扩容阈值
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
// 判断存放节点的数组下标位置是否为null
// 如果为null则尝试存放
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// 如果不为null则判断改位置是否已经转移完成
// 如果是则不再转移当前下标位置的元素
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
// 锁住数组的下标的第一个元素
synchronized (f) {
// 判断 f 的值有没有被修改
// 如果 f 没又改变,则继续操作,否则重复第一个循环
if (tabAt(tab, i) == f) {
ConcurrentHashMap.Node<K,V> ln, hn;
// 判断是否链表结构
// 如果是,则根据链表结构的存储规则存储元素
if (fh >= 0) {
int runBit = fh & n;
ConcurrentHashMap.Node<K,V> lastRun = f;
for (ConcurrentHashMap.Node<K,V> p = f.next; p != null; p = p.next) {
// 把节点存放新table的原位或者移动 n 位存放
// 其实就是计算该节点是存放在当前下标位置还是 i+n的下标位置
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (ConcurrentHashMap.Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new ConcurrentHashMap.Node<K,V>(ph, pk, pv, ln);
else
hn = new ConcurrentHashMap.Node<K,V>(ph, pk, pv, hn);
}
// 通过上面的计算把ln链表放在原本的下标位置
// 把hn链表放在 原本下标+n的位置
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
// 在旧table存放 ForwardingNode 代表该数组下标位置的数据已经完成转移
setTabAt(tab, i, fwd);
advance = true;
}
// 判断是否红黑树结构
// 如果是,则根据红黑树结构的存储规则存储元素
else if (f instanceof ConcurrentHashMap.TreeBin) {
ConcurrentHashMap.TreeBin<K,V> t = (ConcurrentHashMap.TreeBin<K,V>)f;
ConcurrentHashMap.TreeNode<K,V> lo = null, loTail = null;
ConcurrentHashMap.TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (ConcurrentHashMap.Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
ConcurrentHashMap.TreeNode<K,V> p = new ConcurrentHashMap.TreeNode<K,V>
(h, e.key, e.val, null, null);
// 与上面链表的操作大致相同
// 计算当前元素存放的位置
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 如果通过计算分开重组后的红黑树长度小于等于6则转换成链表
// static final int UNTREEIFY_THRESHOLD = 6;
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new ConcurrentHashMap.TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new ConcurrentHashMap.TreeBin<K,V>(hi) : t;
// 把计算后存放在原来位置的 ln继续存放在原来的位置
// hn存放在 i+n 的位置
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
// 在旧table存放 ForwardingNode 代表该数组下标位置的数据已经完成转移
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
final ConcurrentHashMap.Node<K,V>[] helpTransfer(ConcurrentHashMap.Node<K,V>[] tab, ConcurrentHashMap.Node<K,V> f) {ConcurrentHashMap.Node<K,V>[] nextTab; int sc;
// 判断当前 table是否在进行扩容操作
if (tab != null && (f instanceof ConcurrentHashMap.ForwardingNode) &&
(nextTab = ((ConcurrentHashMap.ForwardingNode<K,V>)f).nextTable) != null) {
// 获取扩容后的table大小
int rs = resizeStamp(tab.length);
// 再次判断扩容操作是否还在进行,并且是同一次扩容操作
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
// 通过CAS扩容操作尝试获取扩容操作资格
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
每次扩容是容量原来的2倍,并且旧table原元素只会存放在新table的 i 或者 i+n 的位置而不是重新计算hash新的 i ,并且put()操作的时候有可能会帮忙一起扩容,分担了单线程操作的压力,当旧的 i 位置的节点全部转移完成使用 ForwardingNode 标记当前位置所以节点已经转移完成,下面是一个简单的扩容后可能存放的位置图
会引起扩容操作的2个函数addCount()与treeifyBin()
private final void addCount(long x, int check) {CounterCell[] as; long b, s;
// 使用CAS操作更新baseCount
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
// 多线程修改baseCount时,没修改成功的线程会执行fullAddCount(),把x的值添加到counterCell类中
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
// 当check >= 0 的时候证明需要扩容
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0) // 其他线程在初始化,break;
break;
// 其他线程正在扩容,协助扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 只有前线程在扩容
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
private final void treeifyBin(Node<K,V>[] tab, int index) {Node<K,V> b; int n, sc;
if (tab != null) {
//如果table的长度<64 就扩大一倍
// 就是说当前 table的长度>64 且链表节点数量 >= 8时候才会转换成红黑树
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
// 锁住当前数组位置,进行红黑树转换
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
//在原来下标的位置 用TreeBin替换掉原来的Node对象
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
addCount()的主要作用是增加当前ConcurrentHashMap的元素数量并且判断是否需要扩容,当容量已经达到了扩容阈值则进行扩容操作
treeifyBin()则是用于进行树结构转换和判断是否需要扩容,当容量 < 64 时则进行扩容而非转换成树结构,只有容量>64且链表长度>=8时才会进行树结构转换
与HashMap不同的是多了2个内部类:ForwardingNode 与 TreeBin
ForwardingNode:只有扩容操作的时候才会使用到,标记数组的 i 位置是否已经完成
TreeBin:HashMap是直接使用TreeNode,而ConcurrentHashMap则是通过TreeBin来管理TreeNode来使用
ConcurrentHashMap通过CAS和巧妙地运用synchronized 提供了一个安全和高效而且非常精彩解决方案
ConcurrentHashMap的一些简单学习记录到此结束,世界真的很大
以上是 java源码学习ConcurrentHashMap 的全部内容, 来源链接: utcz.com/z/514078.html