掌握 Java 并发性:CompletableFuture、Fork/Join 及其他
2024-12-15 19:43:45
作为一名拥有多年经验的 java 开发人员,我见证了该语言的巨大发展,尤其是在并发领域。新功能的引入彻底改变了我们处理多线程编程的方式,使构建能够充分利用现代硬件的高性能应用程序变得更加容易。
completablefuture 是 java 并发工具包中最重要的补充之一。它提供了一种强大的异步计算方法,使我们能够轻松链接操作、组合结果并处理错误。我发现它在我需要编排多个独立任务的场景中特别有用。
以下是如何使用 completablefuture 的实际示例:
completablefuture<string> usernamefuture = completablefuture.supplyasync(() -> fetchusername()); completablefuture<string> emailfuture = completablefuture.supplyasync(() -> fetchemail()); completablefuture<user> userfuture = usernamefuture.thencombine(emailfuture, (username, email) -> { return new user(username, email); }); userfuture.thenaccept(user -> system.out.println("user created: " + user));
在此代码中,我们异步获取用户名和电子邮件,然后组合结果来创建 user 对象。 completablefuture 的美妙之处在于它允许我们以清晰、可读的方式表达这种复杂的异步逻辑。
fork/join 框架是我们并发工具库中的另一个强大工具。它专为递归分而治之问题而设计,非常适合对大型数组进行排序或处理树状数据结构等任务。该框架使用工作窃取算法,允许空闲线程从繁忙线程中“窃取”任务,确保系统资源的有效利用。
立即学习“Java免费学习笔记(深入)”;
这是使用 fork/join 框架对大型数组的元素求和的示例:
public class sumtask extends recursivetask<long> { private final long[] array; private final int start; private final int end; private static final int threshold = 10000; public sumtask(long[] array, int start, int end) { this.array = array; this.start = start; this.end = end; } @override protected long compute() { if (end - start <= threshold) { long sum = 0; for (int i = start; i < end; i++) { sum += array[i]; } return sum; } else { int mid = (start + end) / 2; sumtask left = new sumtask(array, start, mid); sumtask right = new sumtask(array, mid, end); left.fork(); long rightresult = right.compute(); long leftresult = left.join(); return leftresult + rightresult; } } } // usage forkjoinpool pool = forkjoinpool.commonpool(); long[] array = new long[1000000]; // ... fill array with values sumtask task = new sumtask(array, 0, array.length); long sum = pool.invoke(task);
这个示例演示了我们如何使用 fork/join 框架来并行化计算密集型任务。该框架负责在可用线程之间分配工作,使我们能够专注于算法的逻辑。
并行流是构建在 fork/join 框架之上的更高级别的抽象。它们提供了一种简单的方法来并行化集合操作,从而可以轻松地利用多核处理器来执行数据处理任务。我发现并行流在处理大型数据集时特别有用,其中可以对每个元素独立执行操作。
这是使用并行流处理大量数字的示例:
list<integer> numbers = arrays.aslist(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); long sum = numbers.parallelstream() .filter(n -> n % 2 == 0) .maptolong(n -> n * n) .sum(); system.out.println("sum of squares of even numbers: " + sum);
在此代码中,我们使用并行流来过滤偶数、对它们进行平方并对结果求和。 parallelstream() 方法自动使用 fork/join 池在多个线程之间分配工作。
stampedlock 是 java 并发实用程序的最新补充。它旨在在读取比写入频繁得多的情况下提供比 reentrantreadwritelock 更好的性能。 stampedlock 允许三种锁定模式:写入、读取和乐观读取。
以下是如何使用 stampedlock 的示例:
public class point { private double x, y; private final stampedlock sl = new stampedlock(); void move(double deltax, double deltay) { long stamp = sl.writelock(); try { x += deltax; y += deltay; } finally { sl.unlockwrite(stamp); } } double distancefromorigin() { long stamp = sl.tryoptimisticread(); double currentx = x, currenty = y; if (!sl.validate(stamp)) { stamp = sl.readlock(); try { currentx = x; currenty = y; } finally { sl.unlockread(stamp); } } return math.sqrt(currentx * currentx + currenty * currenty); } }
在此示例中,move 方法使用写锁来确保更新坐标时的独占访问。 distancefromorigin 方法使用乐观读取,它实际上并不锁定状态。如果乐观读取失败(即,如果在读取期间发生写入),它将回退到常规读取锁定。
java 9 中引入的 flow api 为标准库带来了响应式编程。它提供了用于实现反应流规范的接口,允许具有非阻塞背压的异步流处理。虽然 flow api 本身相当低级,但它可以作为更高级别反应式库的基础。
这是使用 flow api 的简单示例:
public class SimplePublisher implements Flow.Publisher<Integer> { private final List<Integer> data = Arrays.asList(1, 2, 3, 4, 5); @Override public void subscribe(Flow.Subscriber<? super Integer> subscriber) { subscriber.onSubscribe(new Flow.Subscription() { private int index = 0; @Override public void request(long n) { for (int i = 0; i < n && index < data.size(); i++) { subscriber.onNext(data.get(index++)); } if (index >= data.size()) { subscriber.onComplete(); } } @Override public void cancel() { index = data.size(); } }); } } public class SimpleSubscriber implements Flow.Subscriber<Integer> { private Flow.Subscription subscription; @Override public void onSubscribe(Flow.Subscription subscription) { this.subscription = subscription; subscription.request(1); } @Override public void onNext(Integer item) { System.out.println("Received: " + item); subscription.request(1); } @Override public void onError(Throwable throwable) { System.err.println("Error: " + throwable.getMessage()); } @Override public void onComplete() { System.out.println("Complete"); } } // Usage SimplePublisher publisher = new SimplePublisher(); SimpleSubscriber subscriber = new SimpleSubscriber(); publisher.subscribe(subscriber);
此示例演示了使用 flow api 的简单发布者-订阅者交互。发布者发出一系列整数,订阅者一次处理一个整数。
这些并发特性显着提高了我编写高效、可扩展的 java 应用程序的能力。 completablefuture 使异步编程变得更加直观,使我能够清楚地表达复杂的工作流程。 fork/join 框架对于并行化递归算法非常有价值,而并行流则简化了批量数据处理。
stampedlock 已被证明在我需要对并发访问进行细粒度控制的场景中非常有用,尤其是在读取密集的情况下。虽然我没有直接使用 flow api,但很高兴看到响应式编程原理成为 java 标准库的一部分。
我在使用这些功能时学到的重要教训之一是为工作选择正确工具的重要性。虽然并行流很方便,但它们并不总是每个并发任务的最佳选择。有时,使用 completablefuture 或 fork/join 框架的更底层方法可以提供更好的性能和控制。
我还发现这些功能通常组合起来效果最佳。例如,我可能使用 completablefuture 来协调多个异步任务,每个异步任务在内部都使用并行流进行数据处理。或者,我可能会使用 stampedlock 来保护使用 flow api 构建的反应式应用程序中的共享状态。
需要注意的是,虽然这些功能使并发编程变得更容易,但它们并不能消除仔细设计和测试的需要。并发错误仍然可能发生,而且它们通常很微妙且难以重现。我始终确保在适当的时候使用 jcstress 等工具彻底测试我的并发代码。
总之,java 的并发增强为开发人员提供了一套强大的工具来构建高性能应用程序。通过利用 completablefuture、fork/join 框架、并行流、stampedlock 和 flow api,我们可以创建响应迅速、高效且可扩展的系统,充分利用现代多核处理器。随着 java 的不断发展,我很高兴看到该语言的未来版本中将引入哪些新的并发功能。
我们的创作一定要看看我们的创作:
投资者中心 | 投资者中央西班牙语 | 投资者中德意志 | 智能生活 | 时代与回响 | 令人费解的谜团 | 印度教 | 精英开发 | js学校
我们在媒体上科技考拉洞察 | 时代与回响世界 | 投资者中央媒体 | 令人费解的谜团 | 科学与时代媒介 | 现代印度教
以上就是掌握 Java 并发性:CompletableFuture、Fork/Join 及其他的详细内容,更多请关注图灵教育其它相关文章!