当前位置: 首页 > 图灵资讯 > 技术篇> KeyAffinityExecutor 线程池

KeyAffinityExecutor 线程池

来源:图灵教育
时间:2023-05-31 09:14:01

微信微信官方账号:运维发展故事,作者:老郑

根据固定数据,有一批数据 key 分组并发,但要保证组内并行处理。 例如,在购物中心,不同的用户可以并发订单,但一个用户只能按顺序订单。在整体并发场景下,确保局部有序,确保最小事务单元的原子操作。

KeyAffinityExecutor 线程池_Java

对于上述场景,我们可以使用KeyAfinityExecutor(KeyAffinityExecutor 它是一种执行器,可以按照指定的Key亲和顺序消费) 为了解决这个问题,让我们来了解一下KeyAfinityExecutor。

基本使用
  1. 导入依赖

<dependency>  <groupId>com.github.phantomthief</groupId>  <artifactId>more-lambdas</artifactId>  <version>0.1.55</version></dependency>

  1. 创建线程池

public class KeyAffinityExecutorTest {    @Test    public void submitTaskKeyAffinityExecutor() {        //线程池        KeyAffinityExecutor keyAffinityExecutor = KeyAffinityExecutor                .newSerializingExecutor(2, 200, "测试-%d");        ///需要下单的信息        List<Order> orders = new ArrayList<>();        orders.add(new Order(1, "iPhone 16 Max"));        orders.add(new Order(1, "Thinking In Java"));        orders.add(new Order(1, "MengNiu Milk"));        orders.add(new Order(2, "Thinking In Java"));        orders.add(new Order(3, "HUAWEI 100P"));        orders.add(new Order(4, "XIAOMI 20"));        orders.add(new Order(5, "OPPO 98"));        orders.add(new Order(6, "HP EC80"));        orders.add(new Order(7, "BBK 100P"));        orders.add(new Order(8, "TCL 1380"));        orders.add(new Order(9, "CHANGHONG 32"));        orders.forEach(order -> keyAffinityExecutor.submit(order.getAccountId(), () -> {            System.out.println(Thread.currentThread() + " accountId:" + order.getAccountId() +                    ", skuNo:" + order.getSkuNo() + " checkout success!");            return null;        }));        try {            Thread.sleep(1000L);        } catch (InterruptedException e) {            throw new RuntimeException(e);        }        Assert.assertTrue(true);    }    @Data    @AllArgsConstructor    public static class Order {        long accountId;        String skuNo;    }}

输出结果如下:

Thread[测试-0,5,main] accountId:1, skuNo:iPhone 16 Max checkout success!Thread[测试-1,5,main] accountId:2, skuNo:Thinking In Java checkout success!Thread[测试-1,5,main] accountId:3, skuNo:HUAWEI 100P checkout success!Thread[测试-1,5,main] accountId:4, skuNo:XIAOMI 20 checkout success!Thread[测试-0,5,main] accountId:1, skuNo:Thinking In Java checkout success!Thread[测试-1,5,main] accountId:6, skuNo:HP EC80 checkout success!Thread[测试-0,5,main] accountId:1, skuNo:MengNiu Milk checkout success!Thread[测试-1,5,main] accountId:8, skuNo:TCL 1380 checkout success!Thread[测试-0,5,main] accountId:5, skuNo:OPPO 98 checkout success!Thread[测试-0,5,main] accountId:7, skuNo:BBK 100P checkout success!Thread[测试-0,5,main] accountId:9, skuNo:CHANGHONG 32 checkout success!

结论:对于 acccountId = 1 在同一线程下执行三个数据,线程ID:测试-0 因此,可以保证局部有序。

实现原理
  1. 选择执行线程池, 如果现在的话,我们可以在这里看到 key 存在线程池将直接返回。如果没有,则创建或选择任务较少的线程池,以确保任务分配的均匀性。

//通过 key 选择执行线程@Nonnullpublic V select(K key) {    int thisCount = count.getAsInt();    tryCheckCount(thisCount);    KeyRef keyRef = mapping.compute(key, (k, v) -> {        // 如果不存在,创建一个        if (v == null) {            if (usingRandom.test(thisCount)) {                do {                    try {                        v = new KeyRef(all.get(ThreadLocalRandom.current().nextInt(all.size())));                    } catch (IndexOutOfBoundsException e) {                        // ignore                    }                } while (v == null);            } else {                v = all.stream()                        .min(comparingInt(ValueRef::concurrency))                        .map(KeyRef::new)                        .orElseThrow(IllegalStateException::new);            }        }        v.incrConcurrency();        return v;    });    return keyRef.ref();}

  1. 执行线程池的初始化, 这里的本质是创建一个只有一个线程的线程池。这样,任务就可以通过路由到同一个 key 以下是顺序执行的保证。

static Supplier<ExecutorService> executor(String threadName, int queueBufferSize) {        return new Supplier<ExecutorService>() {            // ThreadFactory            private final ThreadFactory threadFactory = new ThreadFactoryBuilder()                    .setNameFormat(threadName)                    .build();            @Override            public ExecutorService get() {                LinkedBlockingQueue<Runnable> queue;                if (queueBufferSize > 0) {                    // blockingQueue                    queue = new LinkedBlockingQueue<Runnable>(queueBufferSize) {                        @Override                        public boolean offer(Runnable e) {                            try {                                //让 offer 方法阻塞,                                ///为什么可以这样做? ThreadPoolExecutor 1347 行                                put(e);                                return true;                            } catch (InterruptedException ie) {                                Thread.currentThread().interrupt();                            }                            return false;                        }                    };                } else {                    queue = new LinkedBlockingQueue<>();                }                ///创建线程的线程池                return new ThreadPoolExecutor(1, 1, 0L, MILLISECONDS, queue, threadFactory);            }        };    }

  1. 最后,执行任务并回收线程。

///每个key执行后回收处理key的线程池.public void finishCall(K key) (//如果执行完成后返回 nullmapping.computeIfPresent(key, (k, v) -> {    if (v.decrConcurrency()) {        return null;    } else {        return v;    }});}

综上所述,事实上,我们也可以通过只有一个线程的线程数组来实现唯一的key hash 路由。

参考地址

https://github.com/PhantomThief/more-lambdas-java

最后,请注意。如果您想阅读更多高质量的原创文章,请关注我们的官方帐户「运维开发故事」。


我是 “运维发展故事”微信官方账号团队成员老郑,某网高级 Java 工程师,10 余年 Java 互联网项目开发经验,擅长高并发、低延迟分布式互联网核心系统的研发。这里不仅有核心技术干货,还有我们对技术的思考和理解,欢迎关注我们的公共账户,期待与您一起成长!