第一部分 Java基础
第二部分 Java进阶

Java多线程和并发面试题(附答案)第5题

5、ConcurrentHashMap非阻塞Hash集合?

ConcurrentHashMap是Java并发包中提供的一个线程安全且高效的HashMap实现,ConcurrentHashMap在并发编程的场景中使用频率非常之高,本文就来分析下ConcurrentHashMap的实现原理,并对其实现原理进行分析。

● ConcurrentLinkedQuere 类图

ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重入锁。

ReentrantLock在ConcurrentHashMap里扮演锁的角色,HashEntry则用于存储键值对数据。一个ConcurrentHashMap里包含一个Segment数组,Segment的结构和HashMap类似,是一种数组和链表结构,一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素,每个Segment守护者一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先获得它对应的Segment锁。

● ConcurrentLinkedQuere实现原理

众所周知,哈希表是非常高效的,复杂度为O(1)的数据结构,在Java开发中,我们最常见到最频繁使用的就是HashMap和HashTable,但是在线程竞争激烈的并发场景中使用都不够合理。

● HashMap:先说HashMap,HashMap是线程不安全的,在并发环境下,可能会形成环状链表(扩容时可能造成,具体原因自行百度google或查看源码分析),导致get操作时,cpu 空转,所以,在并发环境中使用HashMap是非常危险的。

● HashTable:HashTable和HashMap的实现原理几乎一样,差别无非是:

1、HashTable不允许key和value为null;

2、HashTable是线程安全的。但是HashTable线程安全的策略实现代价却太大了,简单粗暴,get/put所有相关操作都是synchronized的,这相当于给整个哈希表加了一把大锁,多线程访问时候,只要有一个线程访问或操作该对象,那其他线程只能阻塞,相当于将所有的操作串行化,在竞争激烈的并发场景中性能就会非常差。如下图:

HashTable性能差主要是由于所有操作需要竞争同一把锁,而如果容器中有多把锁,每一把锁锁一段数据,这样在多线程访问时不同段的数据时,就不会存在锁竞争了,这样便可以有效地提高并发效率。这就是ConcurrentHashMap所采用的“分段锁”思想。

● ConcurrentLinkedQuere源码解析

ConcurrentHashMap采用了非常精妙的"分段锁"策略,ConcurrentHashMap的主干是个Segment数组。

final Segment<K,V>[] segments;

 Segment继承了ReentrantLock,所以它就是一种可重入锁(ReentrantLock)。在ConcurrentHashMap中,一个Segment就是一个子哈希表,Segment里维护了一个HashEntry数组,并发环境下,对于不同Segment的数据进行的操作是不用考虑锁竞争的。(就按默认的ConcurrentLeve为16来讲,理论上就允许16个线程并发执行)所以,对于同一个Segment的操作才需考虑线程同步,不同的Segment则无需考虑。Segment类似于HashMap,一个Segment维护着一个HashEntry数组。

transient volatile HashEntry<K,V>[] table;  

HashEntry是目前我们提到的最小的逻辑处理单元了。一个ConcurrentHashMap维护一个Segment数组,一个Segment维护一个HashEntry数组。

static final class HashEntry<K, V> {
    final int hash;
    final K key;
    volatile V value;
    volatile HashEntry<K, V> next;
    //其他省略
}

我们说Segment类似哈希表,那么一些属性就跟我们之前提到的HashMap差不多,比如负载因子loadFactor,比如阈值threshold等等,看下Segment的构造方法。

Segment(float lf, int threshold, HashEntry<K, V>[] tab) {
    this.loadFactor = lf;//负载因子
    this.threshold = threshold;//阈值
    this.table = tab;//主干数组即 HashEntry 数组
}

我们来看下ConcurrentHashMap的构造方法

public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException();
    //MAX_SEGMENTS 为 1<<16=65536,也就是最大并发数为 65536
    if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS;
    //2 的 sshif 次方等于 ssize,例:ssize=16,sshift=4;ssize=32,sshif=5
    int sshift = 0;
    //ssize 为 segments 数组长度,根据 concurrentLevel 计算得出
    int ssize = 1;
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    //segmentShift 和 segmentMask 这两个变量在定位 segment 时会用到,后面会详细讲
    this.segmentShift = 32 - sshift;
    this.segmentMask = ssize - 1;
    if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY;
    //计算 cap 的大小,即 Segment 中 HashEntry 的数组长度,cap 也一定为 2 的 n 次方.
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    while (cap < c) cap <<= 1;
    //创建 segments 数组并初始化第一个 Segment,其余的 Segment 延迟初始化
    Segment<K, V> s0 = new Segment<K, V>(loadFactor, (int) (cap * loadFactor), (HashEntry<K, V>[]) new HashEntry[cap]);
    Segment<K, V>[] ss = (Segment<K, V>[]) new Segment[ssize];
    UNSAFE.putOrderedObject(ss, SBASE, s0);
    this.segments = ss;
}

初始化方法有三个参数,如果用户不指定则会使用默认值,initialCapacity为16,loadFactor为0.75(负载因子,扩容时需要参考),concurrentLevel为16。

从上面的代码可以看出来,Segment数组的大小ssize是由concurrentLevel来决定的,但是却不一定等于concurrentLevel,ssize一定是大于或等于concurrentLevel的最小的2的次幂。比如:默认情况下concurrentLevel是16,则ssize为16;若concurrentLevel为14,ssize为16;若concurrentLevel为17,则ssize为32。为什么Segment的数组大小一定是2的次幂?其实主要是便于通过按位与的散列算法来定位Segment的index。其实,put方法对segment也会有所体现。

public V put(K key, V value) {
    Segment<K, V> s;
    //concurrentHashMap 不允许 key/value 为空
    if (value == null)
        throw new NullPointerException();
    //hash 函数对 key 的 hashCode 重新散列,避免差劲的不合理的 hashcode,保证散列均匀
    int hash = hash(key);
    //返回的 hash 值无符号右移 segmentShift 位与段掩码进行位运算,定位 segment
    int j = (hash >>> segmentShift) & segmentMask;
    if ((s = (Segment<K, V>) UNSAFE.getObject    // nonvolatile; recheck
            (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
        s = ensureSegment(j);
    return s.put(key, hash, value, false);
}

● 从源码看出,put的主要逻辑也就两步:

定位segment并确保定位的Segment已初始化。

调用Segment的put方法。

Ps:关于segmentShift和segmentMask

segmentShift和segmentMask这两个全局变量的主要作用是用来定位Segment。

int j = (hash >>> segmentShift) & segmentMask;

segmentMask:段掩码,假如segments数组长度为16,则段掩码为16-1=15;segments长度为32,段掩码为32-1=31。这样得到的所有bit位都为1,可以更好地保证散列的均匀性。

segmentShift:2的sshift次方等于ssize,segmentShift=32-sshift。若segments长度为16,segmentShift=32-4=28;若segments长度为32,segmentShift=32-5=27。而计算得出的hash值最大为32位,无符号右移segmentShift,则意味着只保留高几位(其余位是没用的),然后与段掩码segmentMask位运算来定位Segment。

ConcurrentLinkedQuere 方法。

● Get 操作:

public V get(Object key) {
    Segment<K, V> s;
    HashEntry<K, V>[] tab;
    int h = hash(key);
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    //先定位 Segment,再定位 HashEntry
    if ((s = (Segment<K, V>) UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) {
        for (HashEntry<K, V> e = (HashEntry<K, V>) UNSAFE.getObjectVolatile(tab, ((long) (((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value;
        }
    }
    return null;
}

get方法无需加锁,由于其中涉及到的共享变量都使用volatile修饰,volatile可以保证内存可见性,所以不会读取到过期数据。

来看下concurrentHashMap代理到Segment上的put方法,Segment中的put方法是要加锁的。只不过是锁粒度细了而已。

● Put 操作:

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    HashEntry<K, V> node = tryLock() ? null :
            scanAndLockForPut(key, hash, value);
    //tryLock 不成功时会遍历定位到的 HashEnry 位置的链表(遍历主要是为了使 CPU 缓存链表),
    // 若找不到,则创建 HashEntry。tryLock 一定次数后(MAX_SCAN_RETRIES 变量决定),则lock。
    // 若遍历过程中,由于其他线程的操作导致链表头结点变化,则需要重新遍历。
    V oldValue;
    try {
        HashEntry<K, V>[] tab = table;
        int index = (tab.length - 1) & hash;//定位 HashEntry,可以看到,这个 hash 值在定位 Segment
        //时和在 Segment 中定位 HashEntry 都会用到,只不过定位 Segment 时只用到高几位。
        HashEntry<K, V> first = entryAt(tab, index);
        for (HashEntry<K, V> e = first; ; ) {
            if (e != null) {
                K k;
                if ((k = e.key) == key ||
                        (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                e = e.next;
            } else {
                if (node != null) node.setNext(first);
                else
                    node = new HashEntry<K, V>(hash, key, value, first);
                int c = count + 1;
                //若 c 超出阈值 threshold,需要扩容并 rehash。扩容后的容量是当前容量的 2 倍。这样可以最大程
                //度避免之前散列好的 entry 重新散列,具体在另一篇文章中有详细分析,不赘述。扩容并 rehash 的这个过程是比较消耗资源的。
                if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node);
                else
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        unlock();
    }
    return oldValue;
}

ConcurrentLinkedQuere 总结

ConcurrentHashMap作为一种线程安全且高效的哈希表的解决方案,尤其是其中的“分段锁”的方案,相比HashTable的全表锁在性能上的提升非常之大。本文对ConcurrentHashMap的实现原理进行了详细分析,并解读了部分源码,希望能帮助到有需要的童鞋。