当前位置: 首页 > 图灵资讯 > 技术篇> 9张图深入剖析ConcurrentHashMap

9张图深入剖析ConcurrentHashMap

来源:图灵教育
时间:2023-10-06 09:42:21

前言

在日常开发中,我们经常使用key-value键值对的Hashmap,用哈希表实现,用空间换时间,提高查询性能

但HashMap在多线程并发场景中并不安全

若要使用线程安全,可使用ConcurrentHashMap、HashTable、Collections.synchronizedmap等

但由于后两者使用synchronized粒度过大,一般不使用,而是使用并发包中的concurenthashmap

使用volatile保证ConcurentHashmap中内存的可见性,使读取场景不需要“锁定”来保证原子性

在写作场景中使用CAS+synchronized,synchronized只锁定哈希表索引位置的第一个节点,相当于加锁细粒度,提高并发性能

本文将从ConcurrentHashMap的使用、阅读、写作和扩展的实现原理、设计理念等方面进行分析

在查看本文之前,您需要了解哈希表,volatile、CAS、synchronized等知识

volatile可以查看这篇文章:5个案例和流程图让你从0到1理解volatile关键词

CAS、synchronized可以查看这篇文章:15000字,6个代码案例,5个原理图,让你彻底了解synchronizeded

使用ConcurentHashmapp

ConcurentHashMap是并发场景下线程安全的Map,可以在并发场景下查询存储K、V键值对

不可变对象是绝对线程安全的,无论外界如何使用,都是线程安全的

ConcurrentHashMap不是绝对的线程安全,只提供线程安全的方法,如果外层使用错误仍会导致线程不安全

让我们来看看下面的案例。使用value存储自增调用次数,打开10个线程,每次执行100次,最终结果应为1000次,但错误使用导致不到1万次

    public void test() {//        Map<String, Integer> map = new HashMap(16);        Map<String, Integer> map = new ConcurrentHashMap(16);        String key = "key";        CountDownLatch countDownLatch = new CountDownLatch(10);        for (int i = 0; i < 10; i++) {            new Thread(() -> {                for (int j = 0; j < 100; j++) {                    incr(map, key);//                    incrCompute(map, key);                }                countDownLatch.countDown();            }).start();        }        try {            //阻塞到线程完成            countDownLatch.await();        } catch (InterruptedException e) {            e.printStackTrace();        }        //1000不到        System.out.println(map.get(key));    }private void incr(Map<String, Integer> map, String key) {        map.put(key, map.getOrDefault(key, 0) + 1);    }

在自增方法incr中,先读操作,再计算,最后写操作。这种复合操作不能保证原子性,导致所有最终结果不能累积1万

正确的使用方法是使用JDK8提供的默认方法compute

concurenthashMap实现compute在put中使用同步手段后,其原理是计算

private void incrCompute(Map<String, Integer> map, String key) {        map.compute(key, (k, v) -> Objects.isNull(v) ? 1 : v + 1);    }
数据结构

类似于Hashmap,使用哈希表+链表/红黑树实现

哈希表

哈希表的实现由数组组成。当哈希冲突(哈希算法得到相同的索引)发生时,链地址法构建链表

image.png

当链表上的节点太长,遍历搜索成本大,超过阈值(链表节点超过8个,哈希表长度超过64个),树变成红黑树,减少遍历搜索成本,时间复杂性从O开始(n)优化为(log n)

image.png

由Node数组成的ConcurrentHashMap在扩容过程中会有两个新旧哈希表:table、nextTable

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>    implements ConcurrentMap<K,V>, Serializable {//哈希表 node数组transienttinttintent volatile Node<K,V>[] table;        //扩容时,为了兼容读写,会有两个哈希表,这是新的哈希表    private transient volatile Node<K,V>[] nextTable;        // 默认为 0    // 当初始化时, 为 -1    // 当扩容时, 为 -(1 + 扩展线程数)    // 初始化或扩容完成后,为 下一次扩容的阈值大小    private transient volatile int sizeCtl;        //扩容时 用于指定迁移区间的下标    private transient volatile int transferIndex;        ///统计每个哈希槽中的元素数量    private transient volatile CounterCell[] counterCells;}
节点

当哈希表数组的节点与哈希冲突发生时,用Node构建链表的节点

///实现哈希表的节点,使用staticc进行数组和链表 class Node<K,V> implements Map.Entry<K,V> {    //节点哈希值final int hash;final K key;volatile V val;    ///作为链表 后续指针 volatile Node<K,V> next;    }// 扩容时,如果是一个 bin 迁移完毕, 用 ForwardingNode 作为旧 table bin staticc的头结点 final class ForwardingNode<K,V> extends Node<K,V> {}// 用在 compute 以及 computeIfAbsent 时, 用来占位, 计算完成后,替换为普通 Nodestatic final class ReservationNode<K,V> extends Node<K,V> {}// 作为 treebin 的头节点, 存储 root 和 firststatic final class TreeBin<K,V> extends Node<K,V> {}// 作为 treebin 的节点, 存储 parent, left, rightstatic final class TreeNode<K,V> extends Node<K,V> {}

节点哈希值

//转发节点static final int MOVED     = -1;//数组中红黑树的节点staticc final int TREEBIN   = -2;///占位节点staticcctic final int RESERVED  = -3;

转发节点:继承Node,用于扩展旧哈希表索引的第一个节点,在新的哈希表中寻找转发节点

static final class ForwardingNode<K,V> extends Node<K,V> {    //新哈希表        final Node<K,V>[] nextTable;            ForwardingNode(Node<K,V>[] tab) {            ///哈希值设置-1            super(MOVED, null, null, null);            this.nextTable = tab;        }}

数组中红黑树的节点 TreeBin:继承Node,first指向红黑树的第一个节点

static final class TreeBin<K,V> extends Node<K,V> {        TreeNode<K,V> root;    ///红黑树首节点        volatile TreeNode<K,V> first;}    

image.png

TreeNode红黑树节点

static final class TreeNode<K,V> extends Node<K,V> {        TreeNode<K,V> parent;          TreeNode<K,V> left;        TreeNode<K,V> right;        TreeNode<K,V> prev;     boolean red;}

占位节点:继承Node需要计算(使用)computer方法)先用占位节点占位,然后构建节点代替占位节点

static final class ReservationNode<K,V> extends Node<K,V> {        ReservationNode() {            super(RESERVED, null, null, null);        }        Node<K,V> find(int h, Object k) {            return null;        }    }
实现原理结构

检查结构中的参数,然后根据需要存储的数据容量和负载因子计算哈希表容量,最后将哈希表容量调整为两次

构造不会初始化,而是等到使用后再创建(懒加载)

public ConcurrentHashMap(int initialCapacity,                             float loadFactor, int concurrencyLevel) {        ///检查负载因子,初始容量        if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)            throw new IllegalArgumentException();                //concurrencyLevel:1        if (initialCapacity < concurrencyLevel)   // Use at least as many bins            initialCapacity = concurrencyLevel;   // as estimated threads        //计算大小 = 容量/负载因子 向上取整        long size = (long)(1.0 + (long)initialCapacity / loadFactor);        //如果超过最大值,则使用最大值         //tableSizeFor 将尺寸调整为两次幂        int cap = (size >= (long)MAXIMUM_CAPACITY) ?            MAXIMUM_CAPACITY : tableSizeFor((int)size);                //设置容量        this.sizeCtl = cap;    }
读-get

使用volatile阅读场景以确保可见性,即使是其他线程修改也是可见的,不需要使用其他手段来确保同步

读取操作需要在哈希表中找到元素,扰动算法打乱哈希值,然后通过哈希算法使用哈希值获得索引,根据索引上的第一个节点分为多种情况

  1. 扰动算法完全扰乱了哈希值(避免过度的哈希冲突)和符号位&保证结果正数

    int h = spread(key.hashCode())

    扰动算法:哈希值16位异或运算

    扰动算法后,&HASH_BITS = ffffffffffx7 (011111...),符号位为0,保证结果为正数

    负哈希值表示转发节点、树首节点、占位节点等特殊功能

    static final int spread(int h) {        return (h ^ (h >>> 16)) & HASH_BITS;    }
  2. 通过哈希算法获得数组中的索引(下标),使用中断的哈希值

    n 哈希表长度:(n = tab.length)

    (e = tabAt(tab, (n - 1) & h)

    h是计算后的哈希值,哈希值%(哈希表长度-1) 索引位置可以找到

    为提高性能,规定哈希表长度为2n次幂,哈希表长度二进制必须为1000..,而(n-1)二进制必须是0111...

    因此(n - 1) & h计算索引,进行和计算的结果必须是0~n-1之间 使用位运算来提高性能

  3. 在获得数组上的节点后,需要进行比较

    找到哈希表上的第一个节点后,比较key 检查是否是当前节点

    比较规则:首先比较哈希值。如果对象哈希值相同,则可能是相同的对象,并且需要比较key(==和equals)。如果哈希值不同,它肯定不是相同的对象

    首先比较哈希值的好处是提高搜索性能。如果直接使用equals, 时间复杂度可能会上升(如Stringequals)

  4. 使用链地址法来解决哈希冲突,所以在找到节点后,它可能会遍历链表或树木;由于哈希表的扩展,也可能需要在新节点上找到

    4.1 第一节点比较成功,直接返回

    4.2 第一节点的哈希值为负,说明节点是特殊情况:转发节点,树的第一节点 、预订占位节点的计算

    • 如果是转发节点,如果是扩容,去新数组找
    • 如果是Trebin,去红黑树寻找
    • 若为占位节点 直接返回空

    4.3 依次对链表进行比较

get代码

public V get(Object key) {    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;    //1.spread:扰动算法 + 让key的哈希值不能为负,因为负数哈希值代表红黑树或Forwardingnodeng    int h = spread(key.hashCode());    //2.(n - 1) & h:下标、索引 事实上,数组长度模哈希值 位运算效率更高    //e:在哈希表对应索引位置上的节点    if ((tab = table) != null && (n = tab.length) > 0 &&        (e = tabAt(tab, (n - 1) & h)) != null) {        //3.如果哈希值相等,说明可以找到,然后比较key        if ((eh = e.hash) == h) {            //4.1 找到key相等的说明 返回            if ((ek = e.key) == key || (ek != null && key.equals(ek)))                return e.val;        }        //4.2 第一节点哈希值为负,表明该节点为转发节点,目前正在扩容,去新数组找        else if (eh < 0)            return (p = e.find(h, key)) != null ?= null ? p.val : null;                //4.3 通过链表,可以找到返回值,不能返回nulll        while ((e = e.next) != null) {            if (e.hash == h &&                ((ek = e.key) == key || (ek != null && key.equals(ek))))                return e.val;        }    }    return null;}
写-put

添加元素时,使用CAS+synchronized(只锁定哈希表中的第一个节点)保证原子性的同步

  1. 获得哈希值:扰动算法+确保哈希值为正数
  2. 哈希表是空的,CAS保证了线程的初始化
private final Node<K,V>[] initTable() {        Node<K,V>[] tab; int sc;        while ((tab = table) == null || tab.length == 0) {            //小于0 说明其他线程是初始化的 让CPU时间片 退出后续初始化完成            if ((sc = sizeCtl) < 0)                Thread.yield();             else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {                //CAS将SIZECTL设置为-1 成功后(表示有线程初始化) 初始化                try {                    if ((tab = table) == null || tab.length == 0) {                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;                        @SuppressWarnings("unchecked")                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];                        table = tab = nt;                        sc = n - (n >>> 2);                    }                } finally {                    sizeCtl = sc;                }                break;            }        }        return tab;    }
  1. 通过哈希算法获得索引上的节点 f = tabAt(tab, i = (n - 1) & hash)

  2. 根据不同的情况进行处理

    • 4.1 当第一个节点为空时,直接CAS将节点添加到索引位置 casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null))

      image.png

    • 4.2 第一节点hash是MOVED -1时,表示该节点为转发节点,表示该节点正在扩展,有助于扩展

    • 4.3 首节点加锁

      • 4.3.1 搜索并添加遍历链表/覆盖

        image.png

      • 4.3.2 寻找并添加遍历树/覆盖

  3. addCount对每个节点上的数据进行统计,并检查扩容

put代码

///onlyIfabsent是true时,如果有k,v这次不会覆盖final V putVal(K key, V value, boolean onlyIfAbsent) {    if (key == null || value == null) throw new NullPointerException();    //1.获得哈希值:扰动算法+确保哈希值为正数    int hash = spread(key.hashCode());    int binCount = 0;    //乐观锁思想 CSA+失败重试    for (Node<K,V>[] tab = table;;) {        Node<K,V> f; int n, i, fh;        //2.哈希表为空 CAS保证只有一个线程初始化        if (tab == null || (n = tab.length) == 0)            tab = initTable();        //3. 找到索引上的第一个节点的哈希算法        //4.1 当节点为空时,直接CAS构建节点        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {            if (casTabAt(tab, i, null,                         new Node<K,V>(hash, key, value, null)))                break;                   // no lock when adding to empty bin        }        //4.2 hash是索引首节点 MOVED 说明节点是转发节点,目前正在扩容,去帮助扩容        else if ((fh = f.hash) == MOVED)            tab = helpTransfer(tab, f);        else {            V oldVal = null;            //4.3 首节点 加锁            synchronized (f) {                ///第一节点没变                if (tabAt(tab, i) == f) {                    ///首节哈希值大于或等于0 说明节点是链表上的节点                      //4.3.1 搜索遍历链表,然后添加/覆盖                    if (fh >= 0) {                        //记录链表上的几个节点                        binCount = 1;                        ///如果找到遍历链表,可以替换,如果遍历后没有找到,添加(尾插)                        for (Node<K,V> e = f;; ++binCount) {                            K ek;                            //替换                            if (e.hash == hash &&                                ((ek = e.key) == key ||                                 (ek != null && key.equals(ek)))) {                                oldVal = e.val;                                ///onlyifabsent允许false覆盖(使用xxifabsent方法时,有价值就不覆盖)                                if (!onlyIfAbsent)                                    e.val = value;                                break;                            }                            Node<K,V> pred = e;                            //添加                            if ((e = e.next) == null) {                                pred.next = new Node<K,V>(hash, key,                                                          value, null);                                break;                            }                        }                    }                    //如果是红黑树首节点,找到相应的节点并覆盖它                    //4.3.2 寻找遍历树,然后添加/覆盖                    else if (f instanceof TreeBin) {                        Node<K,V> p;                        binCount = 2;                        //如果添加返回null,返回不是null,则出来添加                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,                                                       value)) != null) {                            oldVal = p.val;                            //覆盖                            if (!onlyIfAbsent)                                p.val = value;                        }                    }                }            }                        if (binCount != 0) {                if (binCount >= TREEIFY_THRESHOLD)                    ///链表上的节点超过TREEIFY_THRESHOLD 8个(不是第一节点) 并且 树化前数组长度超过64,否则扩容                    treeifyBin(tab, i);                if (oldVal != null)                    return oldVal;                break;            }        }    }    //5.为统计元素(添加节点)添加计数    addCount(1L, binCount);    return null;}
扩容

当哈希表中的元素数量为了避免频繁的哈希冲突时 / 哈希表长度 当超过负载因子时,扩容(增加哈希表的长度)

一般来说,扩容是哈希表长度的两倍,如32到64,保证长度为2倍;如果扩容长度达到整形上限,则使用整形的最大值

扩容时,需要将数组中每个槽中的链表或树迁移到新数组中

如果处理器是多核的,那么迁移的操作不是单独完成的,而是允许其他线程帮助迁移

在迁移过程中,让每个线程从右到左每次迁移多个槽,然后判断是否全部迁移。如果没有迁移,则继续循环迁移

扩容操作主要在于transfer扩容主要有三种场景:

  1. addCount:添加节点后,增加计数检查和扩展
  2. helpTransfer:发现线程put正在迁移,以帮助扩展
  3. tryPresize:试着调整容量(批量添加)putAll,树化数组长度不超过64小时treeifyBin调用)

分为以下三个步骤

  1. 每次迁移多少个槽根据CPU核数和哈希表总长度计算,最小16个槽

  2. 新的哈希表是空的,表明它是初始化的

  3. 循环迁移

    • 3.1 负责迁移的区间分配 [bround,i](多线程同时迁移可能存在)

      image.png

    • 3.2 迁移:分为链表迁移移、树迁移

      链表迁移

      1. 将链表上的节点完全分散到新哈希表的index、index+旧哈希表长度的两个下标中(类似于Hashmap)

      2. 将index位置链表中的节点 (hash & 哈希表长度),结果0放在新数组的index位置,结果1放在新数组的index+旧哈希表长度位置

        image.png

        例如,旧哈希表的长度是16,在索引3的位置,16的二进制是10000,hash&16 => hash& 10000 ,也就是说,节点哈希值的第五位是0,新哈希表的第三位是1,新哈希表的3+16标记是1

      3. 使用头插法将计算结果为0构建成ln链表,为1构建hn链表,为方便构建链表,将首先找到lastrun节点:lastrun节点和后续节点都是同一链表上的节点,便于迁移

        在构建链表之前,先构建lastRun,例如图中的lastRun。 e->f ,首先将lastrun放在ln链表上,遍历原始链表 :a->e->f,遍历到b:b->a->e->f

      image.png

      1. 每次转移索引位置后,将转发节点设置在原哈希表的对应位置。当其他线程读取get时,根据转发节点找到新的哈希表并编写put操作,以帮助扩展容量(其他范围迁移)

        image.png

扩容代码

//tab 旧哈希表///nextTab 新哈希表privatete final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {        //1.计算每次迁移多少个槽        //n:哈希表的长度(多少槽)        int n = tab.length, stride;        //stride:每次迁移多少个槽?        //NCPU: CPU核数        //如果是多核,每次迁移槽数 = 总槽数无符号右移3位(n/8)CPU核数再除          ///每次最小迁移槽的最小数量 = MIN_TRANSFER_STRIDE = 16        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)            stride = MIN_TRANSFER_STRIDE; // subpide range            //2.如果新哈希表是空的,说明是初始化的        if (nextTab == null) {            // initiating            try {                @SuppressWarnings("unchecked")                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];                nextTab = nt;            } catch (Throwable ex) {      // try to cope with OOME                sizeCtl = Integer.MAX_VALUE;                return;            }            nextTable = nextTab;            //transferindex用于记录 每次负责迁移的槽右区间下标,从右到左分配,从最右开始            transferIndex = n;        }        ///新哈希表长度        int nextn = nextTab.length;        //创建转发节点,转发节点一般设置在旧哈希表头节点,新的哈希表可以通过转发节点找到        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);        //advance:是否继续循环迁移        boolean advance = true;        //         boolean finishing = false; // to ensure sweep before committing nextTab        //3.循环迁移        for (int i = 0, bound = 0;;) {            Node<K,V> f; int fh;            //3.1 负责迁移的区间分配            ///bound是左间间隔 i为右区间            while (advance) {                int nextIndex, nextBound;                ///处理一个槽 右区间 自减                if (--i >= bound || finishing)                    advance = false;                //transferIndex<=0说明 要迁移的范围已完全分配                else if ((nextIndex = transferIndex) <= 0) {                    i = -1;                    advance = false;                }                ///CAS设置本次迁移的范围,防止多线程分到相同的区间                else if (U.compareAndSwapInt                         (this, TRANSFERINDEX, nextIndex,                          nextBound = (nextIndex > stride ?                                       nextIndex - stride : 0))) {                    bound = nextBound;                    i = nextIndex - 1;                    advance = false;                }            }                        //3.2 迁移                        //3.2.1 如果右区间i不再在范围内,说明迁移完            if (i < 0 || i >= n || i + n >= nextn) {                int sc;                //如果迁移完成,设置哈希表和数量                if (finishing) {                    nextTable = null;                    table = nextTab;                    sizeCtl = (n << 1) - (n >>> 1);                    return;                }                //CAS sizeCtl的数量-1 表示 完成线程迁移                 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {                    //如果不是最后一条线直接返回                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)                        return;                    //最后一条线程设置finishing为true  后面再循环 设置哈希表、数量等操作                    finishing = advance = true;                    i = n; // recheck before commit                }            }            //3.2.2 如果旧哈希表i位置节点为空,则CAS设置为转发节点            else if ((f = tabAt(tab, i)) == null)                advance = casTabAt(tab, i, null, fwd);            //3.2.3 假如旧哈希表这个位置的第一节点是转发节点,说明其它线程已经处理好了,重新循环            else if ((fh = f.hash) == MOVED)                advance = true; // already processed            else {                //3.2.4 锁定第一节点 迁移                synchronized (f) {                    if (tabAt(tab, i) == f) {                        Node<K,V> ln, hn;                        //3.2.4.1 链表迁移                        ///首节哈希值大于或等于0 说明 是链表节点                        if (fh >= 0) {                            int runBit = fh & n;                            Node<K,V> lastRun = f;                            ///寻找lastrun节点                             for (Node<K,V> p = f.next; p != null; p = p.next) {                                int b = p.hash & n;                                if (b != runBit) {                                    runBit = b;                                    lastRun = p;                                }                            }                            //如果最后一个计算值是0                            //lastrun节点和后续节点的计算值为0构成ln链表 否则 都是1构成hn链表                            if (runBit == 0) {                                ln = lastRun;                                hn = null;                            }                            else {                                hn = lastRun;                                ln = null;                            }                                                        //构建ln的遍历、hn链表 (头插)                            for (Node<K,V> p = f; p != lastRun; p = p.next) {                                int ph = p.hash; K pk = p.key; V pv = p.val;                                //头部插入:Node结构的第四个参数是后续节点                                if ((ph & n) == 0)                                    ln = new Node<K,V>(ph, pk, pv, ln);                                else                                    hn = new Node<K,V>(ph, pk, pv, hn);                            }                            ///将ln链表设置到i位置                            setTabAt(nextTab, i, ln);                            ///将hn链表设置到i+n位置                            setTabAt(nextTab, i + n, hn);                            //设置转发节点                            setTabAt(tab, i, fwd);                            advance = true;                        }                        //3.2.4.2 树迁移                        else if (f instanceof TreeBin) {                            TreeBin<K,V> t = (TreeBin<K,V>)f;                            TreeNode<K,V> lo = null, loTail = null;                            TreeNode<K,V> hi = null, hiTail = null;                            int lc = 0, hc = 0;                            for (Node<K,V> e = t.first; e != null; e = e.next) {                                int h = e.hash;                                TreeNode<K,V> p = new TreeNode<K,V>                                    (h, e.key, e.val, null, null);                                if ((h & n) == 0) {                                    if ((p.prev = loTail) == null)                                        lo = p;                                    else                                        loTail.next = p;                                    loTail = p;                                    ++lc;                                }                                else {                                    if ((p.prev = hiTail) == null)                                        hi = p;                                    else                                        hiTail.next = p;                                    hiTail = p;                                    ++hc;                                }                            }                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :                                (hc ! untreeify(lo) :                                (hc != 0) ? new TreeBin<K,V>(lo) : t;                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :                                (lc != 0) ? new TreeBin<K,V>(hi) : t;                            setTabAt(nextTab, i, ln);                            setTabAt(nextTab, i + n, hn);                            setTabAt(tab, i, fwd);                            advance = true;                        }                    }                }            }        }    }

实现这一原则并没有对红树和黑树进行太多的描述。一方面,红树和黑树的概念太多了。另一方面,我几乎忘记了(我老了,不能像大学那样手写红树和黑树)

另一方面,我认为只知道使用红黑树的好处就足够了,在工作中也不常用。即使红黑树如何变色、左右旋转以满足红黑树的条件,也没有意义。感兴趣的学生应该学习

迭代器

ConcurrentHashmap中的迭代器一致性较弱,在获取时使用记录的哈希表重建新对象

Entry迭代器:

public Iterator<Map.Entry<K,V>> iterator() {    ConcurrentHashMap<K,V> m = map;    Node<K,V>[] t;    int f = (t = m.table) == null ? 0 : t.length;    return new EntryIterator<K,V>(t, f, 0, f, m);}

key迭代器

public Enumeration<K> keys() {    Node<K,V>[] t;    int f = (t = table) == null ? 0 : t.length;    return new KeyIterator<K,V>(t, f, 0, f, this);}

value迭代器

public Enumeration<V> elements() {    Node<K,V>[] t;    int f = (t = table) == null ? 0 : t.length;    return new ValueIterator<K,V>(t, f, 0, f, this);}
总结

concurenthashmap使用哈希表的数据结构。当哈希发生冲突时,链地址法解决了问题,并将哈希构建成链表到同一索引的节点。当数据量达到一定阈值时,链表将转换为红黑树

concurenthashmap使用volatile修改存储数据,使其他线程在阅读场景中的修改可见,不需要使用同步机制,使用cas和synchronzied来确保原子性写作场景

在查询数据时,首先通过扰动算法(16位异或),确保结果为正(与上符号位0),然后与上哈希表长度-1找到索引值,然后根据不同情况找到索引(比较先判断哈希值,等于判断key)

当put添加/覆盖数据时,索引位置也首先通过扰动算法和哈希找到,并根据不同的情况进行搜索。如果找到它,它将被覆盖,如果找不到它,它将被替换

当需要扩展容量时,线程将安排需要迁移的槽间隔。当其他线程通过put时,它也将帮助迁移。每次线程迁移后,将将转发节点设置到原始哈希表中,以便通过转发节点在新的哈希表中找到有线程查询。当转移到所有槽时,留下一个线程来设置哈希表、数量等

迭代器使用弱一致性,在获得迭代器时通过哈希表构建新的对象

ConcurrentHashMap 只保证相对线程的安全,不保证绝对线程的安全。如果需要一系列操作,应正确使用