需求点:业务上常常有这样一个需求:一个服务常常会从多个数据源取得数据,然后并成一个结果。
这个操作,假设有3个数据源,同步处理通常的做法是:需要queryData1,queryData2,queryData3。执行时间会是3个时间之和。
一般的异步设计方案为:起一个业务的线程池,并发执行业务,然后由一个守护的线程等各个业务结束(时间为业务执行最长的时间),获取所有数据。这样明显执行时间会小于3个业务时间之和(例如下面的getAllInfoByProductId)。因为:消耗时间=执行最长的业务时间+守护线程的时间。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| int size = 5; CountDownLatch latch = new CountDownLatch(size); for (int i = 0; i < size; i++) { Executors.newFixedThreadPool(size).submit(() -> { try { System.out.println(Thread.currentThread().getName() + " " + latch.getCount()); } finally { latch.countDown(); } }); } try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println(Thread.currentThread().getName());
|
例如用 jdk的Future和线程池实现类似功能,自己造了轮子各种调试。
现在java8提供了一个很好的CompletableFuture工具,非常爽。Talk is cheap, show you the code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
| import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException;
public class ParallelTest {
public String getProductBaseInfo(String productId) { return productId + "商品基础信息"; }
public String getProductDetailInfo(String productId) { return productId + "商品详情信息"; }
public String getProductSkuInfo(String productId) { return productId + "商品sku信息"; }
public String getAllInfoByProductId(String productId) { CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> getProductBaseInfo(productId)); CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> getProductDetailInfo(productId)); CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> getProductSkuInfo(productId)); CompletableFuture.allOf(f1, f2, f3).join(); try { String baseInfo = f1.get(); String detailInfo = f2.get(); String skuInfo = f3.get(); return baseInfo + "" + detailInfo + skuInfo; } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } return null; }
@Test public void testGetAllInfoParalleByProductId() throws ExecutionException, InterruptedException { ParallelTest test = new ParallelTest(); String info = test.getAllInfoByProductId("1111"); Assertions.assertNotNull(info); }
@Test public void testGetAllInfoDirectly() throws ExecutionException, InterruptedException { ParallelTest test = new ParallelTest(); String baseInfo = getProductBaseInfo("1111"); String detailInfo = getProductDetailInfo("1111"); String skuInfo = getProductSkuInfo("1111"); String info=baseInfo + "" + detailInfo + skuInfo; Assertions.assertNotNull(info); } }
|
allOf是等待所有任务完成,接触阻塞,获取各个数据源的数据。
改进
对于上面的例子,使用了默认的线程池,线程数为cpu核数-1。这个并不能很好地利用资源。下面为线程数计算的公式:
1
| 服务器端最佳线程数量=((线程等待时间+线程cpu时间)/线程cpu时间) * cpu数量
|
下面例子中也将executor线程池暴露出来,方便配置线程数和做一些其他处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public class ParallelTest { ExecutorService executor = Executors.newFixedThreadPool(100);
public String getAllInfoByProductId(String productId) { CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> getProductBaseInfo(productId),executor); CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> getProductDetailInfo(productId),executor); CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> getProductSkuInfo(productId),executor); try { String baseInfo = f1.get(); String detailInfo = f2.get(); String skuInfo = f3.get(); return baseInfo + "" + detailInfo + skuInfo; } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } return null; } }
|
当然还有其他的类似处理:
以前在处理类似问题的时候,用了twitter的一个util工具(scala语言实现)
1 2 3 4 5
| <dependency> <groupId>com.twitter</groupId> <artifactId>util-core_2.11</artifactId> <version>${util-core_2.11.version}</version> </dependency>
|
用这个工具包的Future实现了类似的功能,但是多加了一种语言依赖
参考:
https://blog.csdn.net/u011726984/article/details/79320004
http://iamzhongyong.iteye.com/blog/1924745