Appearance
详解put源码,探究CAS和synchronized何时使用
点击展开源码,源码已添加注释
java
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 1.要求key和value都不能为空
if (key == null || value == null) throw new NullPointerException();
// 2.混乱Hash,减少Hash碰撞
int hash = spread(key.hashCode());
int binCount = 0; // 记录添加元素的链表长度
// 无限循环遍历哈希表中的桶
for (ConcurrentHashMap.Node<K, V>[] tab = table; ; ) {
ConcurrentHashMap.Node<K, V> f; // 当前桶中的第一个节点
int n, i, fh; // 第一个节点f,n是哈希表的长度,当前桶的索引i
// 3.如果哈希表为空,则进行初始化
if (tab == null || (n = tab.length) == 0) {
tab = initTable();
} else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 4.如果当前桶为空,则尝试使用CAS操作将新节点添加到当前桶中
if (casTabAt(tab, i, null, new ConcurrentHashMap.Node<K, V>(hash, key, value, null))) {
break; // 添加成功,跳出循环,不需要加锁
}
} else if ((fh = f.hash) == MOVED) {
// 如果当前桶的第一个节点哈希值为MOVED,表示正在进行扩容操作,则帮助进行扩容操作
tab = helpTransfer(tab, f);
} else {
V oldVal = null;
// 对当前桶的第一个节点加锁
synchronized (f) {
// 再次检查当前桶的第一个节点是否发生变化
if (tabAt(tab, i) == f) {
// 如果当前桶的第一个节点哈希值大于等于0,表示是普通节点
if (fh >= 0) {
binCount = 1; // 初始化链表长度为1
for (ConcurrentHashMap.Node<K, V> e = f; ; ++binCount) { // 遍历链表
K ek;
// 如果找到了键相同的节点
if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
oldVal = e.val; // 保存旧的值
if (!onlyIfAbsent) {
e.val = value; // 如果不是插入新值,则更新值
}
break;
}
ConcurrentHashMap.Node<K, V> pred = e;
if ((e = e.next) == null) { // 遍历到链表尾部,将新节点添加到链表尾部
pred.next = new ConcurrentHashMap.Node<K, V>(hash, key, value, null);
break;
}
}
} else if (f instanceof ConcurrentHashMap.TreeBin) {
// 如果当前桶的第一个节点是红黑树节点
ConcurrentHashMap.Node<K, V> p;
binCount = 2; // 初始化链表长度为2
if ((p = ((ConcurrentHashMap.TreeBin<K, V>) f).putTreeVal(hash, key,
value)) != null) { // 在红黑树中添加或更新节点,并返回旧的节点
oldVal = p.val; // 保存旧的值
if (!onlyIfAbsent)
p.val = value; // 如果不是插入新值,则更新值
}
}
}
}
// 如果链表长度不为0,表示成功添加或更新节点
if (binCount != 0) {
// 如果链表长度大于等于树化阈值,则将链表转化为红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal; // 返回旧的值
break;
}
}
}
addCount(1L, binCount); // 增加元素数量
return null;
}
执行步骤如下,观察流程图辅助理解:
- 如果传递null,抛出空指针异常
- 混乱Hash,减少Hash碰撞
- 无限循环,不断获取最新桶中数据
- 如果当前桶是null,则初始化
- 如果桶不是null但没有数据,尝试使用CAS插入数据,如果插入成功,则直接结束循环
- 如果桶内有数据且正在进行扩容则辅助进行扩容操作,完成后结束当前循环,获取最新桶内数据进行下次循环
- 这里就相当于进行和HashMap相似的put操作
- 对桶内第一个节点加锁,如果没有获取到进入阻塞状态,等待其他线程处理完毕
- 保险操作,再次检查第一个节点是否发生变化
- 如果是普通节点,则遍历直接插入到链表尾部
- 如果是红黑树则使用红黑树的插入方法
- 最后判断如果链表长度>8,则转换为红黑树
- 更新当前Map元素数量
总结
在桶内无元素时尝试使用CAS添加第一个节点,每次对桶进行put时都锁住第一个节点。
ConcurrentHashMap在哪些地方做了并发控制?
ConcurrentHashMap是通过synchronized和CAS自旋保证的线程安全,要想知道ConcurrentHashMap是如何加锁的,就要知道HashMap在哪些地方会导致线程安全问题,如初始化桶数组阶段和设置桶,插入链表,树化等阶段,都会有并发问题。
初始化桶阶段
如果在此阶段不做并发控制,那么极有可能出现多个线程都去初始化桶的问题,导致内存浪费,Map在此处采用自旋操作和CAS操作:如果此时没有线程初始化,则去初始化,否则当前线程让出CPU时间片,等待下一次唤醒
java
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
// 如果sizeCtl < 0,表示有其他线程正在初始化,当前线程让出处理器
Thread.yield();
else if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
try {
// 再次检查table的值
if ((tab = table) == null || tab.length == 0) {
// 进行哈希表的初始化操作……
}
} finally {
// 恢复sizeCtl的值,以便其他线程也能参与初始化竞争
sizeCtl = sc;
}
break; // 跳出循环,初始化成功
}
}
put阶段
- 如果hash后发现桶中没有值,则会直接采用CAS插入并且返回
- 如果发现桶中有值,则对流程按照当前的桶节点为维度进行加锁,将值插入链表或者红黑树中,源码如下:
java
// 省略....
// 如果当前桶节点为null,直接CAS(casTabAt方法)插入
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
break;
}
// 省略....
// 如果桶节点不为空,则对当前桶进行加锁,(只需要对第一个节点加锁即可)
else {
V oldVal = null;
synchronized (f) {
}
}
扩容阶段
ConcurrentHashMap并没有一味的通过CAS或者锁去限制多线程,在扩容阶段,ConcurrentHashMap就通过多线程来加加速扩容。
- 通过CPU核数为每个线程计算划分任务,每个线程最少的任务是迁移16个桶
- 将当前桶扩容的索引transferIndex赋值给当前线程,如果索引小于0,则说明扩容完毕,结束流程
- 否则再将当前线程扩容后的索引赋值给transferIndex,譬如,如果transferIndex原来是32,那么赋值之后transferIndex应该变为16,这样下一个线程就可以从16开始扩容了。这里有一个小问题,如果两个线程同时拿到同一段范围之后,该怎么处理?答案是ConcurrentHashMap会通过CAS对transferIndex进行设置,只可能有一个成功,所以就不会存在上面的问题
- 之后就可以对真正的扩容流程进行加锁操作了