微信微信官方账号:运维发展故事,作者:老郑
根据固定数据,有一批数据 key 分组并发,但要保证组内并行处理。 例如,在购物中心,不同的用户可以并发订单,但一个用户只能按顺序订单。在整体并发场景下,确保局部有序,确保最小事务单元的原子操作。
对于上述场景,我们可以使用KeyAfinityExecutor(KeyAffinityExecutor 它是一种执行器,可以按照指定的Key亲和顺序消费) 为了解决这个问题,让我们来了解一下KeyAfinityExecutor。
基本使用- 导入依赖
<dependency> <groupId>com.github.phantomthief</groupId> <artifactId>more-lambdas</artifactId> <version>0.1.55</version></dependency>
- 创建线程池
public class KeyAffinityExecutorTest { @Test public void submitTaskKeyAffinityExecutor() { //线程池 KeyAffinityExecutor keyAffinityExecutor = KeyAffinityExecutor .newSerializingExecutor(2, 200, "测试-%d"); ///需要下单的信息 List<Order> orders = new ArrayList<>(); orders.add(new Order(1, "iPhone 16 Max")); orders.add(new Order(1, "Thinking In Java")); orders.add(new Order(1, "MengNiu Milk")); orders.add(new Order(2, "Thinking In Java")); orders.add(new Order(3, "HUAWEI 100P")); orders.add(new Order(4, "XIAOMI 20")); orders.add(new Order(5, "OPPO 98")); orders.add(new Order(6, "HP EC80")); orders.add(new Order(7, "BBK 100P")); orders.add(new Order(8, "TCL 1380")); orders.add(new Order(9, "CHANGHONG 32")); orders.forEach(order -> keyAffinityExecutor.submit(order.getAccountId(), () -> { System.out.println(Thread.currentThread() + " accountId:" + order.getAccountId() + ", skuNo:" + order.getSkuNo() + " checkout success!"); return null; })); try { Thread.sleep(1000L); } catch (InterruptedException e) { throw new RuntimeException(e); } Assert.assertTrue(true); } @Data @AllArgsConstructor public static class Order { long accountId; String skuNo; }}
输出结果如下:
Thread[测试-0,5,main] accountId:1, skuNo:iPhone 16 Max checkout success!Thread[测试-1,5,main] accountId:2, skuNo:Thinking In Java checkout success!Thread[测试-1,5,main] accountId:3, skuNo:HUAWEI 100P checkout success!Thread[测试-1,5,main] accountId:4, skuNo:XIAOMI 20 checkout success!Thread[测试-0,5,main] accountId:1, skuNo:Thinking In Java checkout success!Thread[测试-1,5,main] accountId:6, skuNo:HP EC80 checkout success!Thread[测试-0,5,main] accountId:1, skuNo:MengNiu Milk checkout success!Thread[测试-1,5,main] accountId:8, skuNo:TCL 1380 checkout success!Thread[测试-0,5,main] accountId:5, skuNo:OPPO 98 checkout success!Thread[测试-0,5,main] accountId:7, skuNo:BBK 100P checkout success!Thread[测试-0,5,main] accountId:9, skuNo:CHANGHONG 32 checkout success!
结论:对于 acccountId = 1 在同一线程下执行三个数据,线程ID:测试-0 因此,可以保证局部有序。
实现原理- 选择执行线程池, 如果现在的话,我们可以在这里看到 key 存在线程池将直接返回。如果没有,则创建或选择任务较少的线程池,以确保任务分配的均匀性。
//通过 key 选择执行线程@Nonnullpublic V select(K key) { int thisCount = count.getAsInt(); tryCheckCount(thisCount); KeyRef keyRef = mapping.compute(key, (k, v) -> { // 如果不存在,创建一个 if (v == null) { if (usingRandom.test(thisCount)) { do { try { v = new KeyRef(all.get(ThreadLocalRandom.current().nextInt(all.size()))); } catch (IndexOutOfBoundsException e) { // ignore } } while (v == null); } else { v = all.stream() .min(comparingInt(ValueRef::concurrency)) .map(KeyRef::new) .orElseThrow(IllegalStateException::new); } } v.incrConcurrency(); return v; }); return keyRef.ref();}
- 执行线程池的初始化, 这里的本质是创建一个只有一个线程的线程池。这样,任务就可以通过路由到同一个 key 以下是顺序执行的保证。
static Supplier<ExecutorService> executor(String threadName, int queueBufferSize) { return new Supplier<ExecutorService>() { // ThreadFactory private final ThreadFactory threadFactory = new ThreadFactoryBuilder() .setNameFormat(threadName) .build(); @Override public ExecutorService get() { LinkedBlockingQueue<Runnable> queue; if (queueBufferSize > 0) { // blockingQueue queue = new LinkedBlockingQueue<Runnable>(queueBufferSize) { @Override public boolean offer(Runnable e) { try { //让 offer 方法阻塞, ///为什么可以这样做? ThreadPoolExecutor 1347 行 put(e); return true; } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } return false; } }; } else { queue = new LinkedBlockingQueue<>(); } ///创建线程的线程池 return new ThreadPoolExecutor(1, 1, 0L, MILLISECONDS, queue, threadFactory); } }; }
- 最后,执行任务并回收线程。
///每个key执行后回收处理key的线程池.public void finishCall(K key) (//如果执行完成后返回 nullmapping.computeIfPresent(key, (k, v) -> { if (v.decrConcurrency()) { return null; } else { return v; }});}
综上所述,事实上,我们也可以通过只有一个线程的线程数组来实现唯一的key hash 路由。
参考地址https://github.com/PhantomThief/more-lambdas-java
最后,请注意。如果您想阅读更多高质量的原创文章,请关注我们的官方帐户「运维开发故事」。
我是 “运维发展故事”微信官方账号团队成员老郑,某网高级 Java 工程师,10 余年 Java 互联网项目开发经验,擅长高并发、低延迟分布式互联网核心系统的研发。这里不仅有核心技术干货,还有我们对技术的思考和理解,欢迎关注我们的公共账户,期待与您一起成长!