谷粒商城分布式高级篇2
大约 13 分钟约 4035 字
谷粒商城分布式高级篇2
检索服务
导入页面到search目录下面,编辑host文件加上最后一行
nginx修改:
server_name *.gulimall.com;
配置网关:
- id: gulimall_host_route # gulimall.com
uri: lb://gulimall-product
predicates:
- Host=gulimall.com
- id: gulimall_search_route # search.gulimall.com
uri: lb://gulimall-search
predicates:
- Host=search.gulimall.com
修改mapping映射
//PUT gulimall_product
{
"mappings": {
"properties": {
"skuId": {
"type": "long"
},
"spuId": {
"type": "long"
},
"skuTitle": {
"type": "text",
"analyzer": "ik_smart"
},
"skuPrice": {
"type": "keyword"
},
"skuImg": {
"type": "keyword"
},
"saleCount": {
"type": "long"
},
"hosStock": {
"type": "boolean"
},
"hotScore": {
"type": "long"
},
"brandId": {
"type": "long"
},
"catelogId": {
"type": "long"
},
"brandName": {
"type": "keyword"
},
"brandImg": {
"type": "keyword"
},
"catalogName": {
"type": "keyword"
},
"attrs": {
"type": "nested",
"properties": {
"attrId": {
"type": "long"
},
"attrName": {
"type": "keyword"
},
"attrValue": {
"type": "keyword"
}
}
}
}
}
}
构建检索dsl语句:
//GET gulimall_product/_search
{
"query": {
"bool": {
"must": [
{
"match": {
"skuTitle": "华为"
}
}
],
"filter": [
{
"term": {
"catalogId": "225"
}
},
{
"terms": {
"brandId": [
"1",
"2",
"9"
]
}
},
{
"nested": {
"path": "attrs",
"query": {
"bool": {
"must": [
{
"term": {
"attrs.attrId": {
"valu e": "9"
}
}
},
{
"terms": {
"attrs.attrValue": [
"高通",
"海思"
]
}
}
]
}
}
}
},
{
"term": {
"hasStock": {
"value": "true"
}
}
},
{
"range": {
"skuPrice": {
"gte": 0,
"lte": 6500
}
}
}
]
}
},
"sort": [
{
"skuPrice": {
"order": "desc"
}
}
],
"from": 0,
"size": 5,
"highlight": {
"fields": {
"skuTitle": {}
},
"pre_tags": "<b style='color:red'>",
"post_tags": "</b>"
},
"aggs": {
"brand_agg": {
"terms": {
"field": "brandId",
"size": 10
},
"aggs": {
"brand_name_agg": {
"terms": {
"field": "brandName",
"size": 10
}
},
"brand_img_agg": {
"terms": {
"field": "brandImg",
"size": 10
}
}
}
},
"catalog_agg": {
"terms": {
"field": "catalogId",
"size": 10
},
"aggs": {
"catalog_name_agg": {
"terms": {
"field": "catalogName",
"size": 10
}
}
}
},
"attr_agg": {
"nested": {
"path": "attrs"
},
"aggs": {
"attr_id_agg": {
"terms": {
"field": "attrs.attrId",
"size": 100
},
"aggs": {
"attr_name_agg": {
"terms": {
"field": "attrs.attrName",
"size": 10
}
},
"attr_value_agg": {
"terms": {
"field": "attrs.attrValue",
"size": 10
}
}
}
}
}
}
}
}
- 查询条件:
- 使用
bool
查询,包含must
(必须匹配)和filter
(过滤)子句。 match
子句匹配包含关键字"华为"的skuTitle
字段。term
子句过滤catalogId
为"225"的商品。terms
子句过滤brandId
为"1"、"2"或"9"的商品。nested
子句对attrs
字段进行嵌套查询,要求attrs.attrId
为"9",并且attrs.attrValue
为"高通"或"海思"。term
子句过滤hasStock
字段为"true"的商品。range
子句过滤skuPrice
在0到6500之间的商品。
- 使用
- 排序:
- 结果按照
skuPrice
字段降序排列。
- 结果按照
- 分页:
- 从搜索结果的第0条记录开始,获取5条记录。
- 高亮显示:
- 对匹配的
skuTitle
字段进行高亮显示,用红色标签。
- 对匹配的
- 聚合(Aggregations):
brand_agg
聚合按照brandId
字段进行分组,同时计算每个分组内的品牌名称和品牌图片。catalog_agg
聚合按照catalogId
字段进行分组,同时计算每个分组内的目录名称。attr_agg
聚合对attrs
字段进行嵌套分组,计算每个属性(attrId
)下的属性名称(attrName
)和属性值(attrValue
)。
转化为Java 代码:
@Service
@Slf4j
public class MallSearchServiceImpl implements MallSearchService {
@Autowired
private RestHighLevelClient restHighLevelClient;
@Autowired
private ProductFeignService productFeignService;
@Override
public SearchResult search(SearchParam param) {
SearchRequest searchRequest = buildSearchRequest(param);
SearchResult searchResult = null;
try {
SearchResponse response = restHighLevelClient.search(searchRequest, GulimallElasticSearchConfig.COMMON_OPTIONS);
searchResult = buildSearchResult(response, param);
} catch (IOException e) {
throw new RuntimeException(e);
}
return searchResult;
}
private SearchResult buildSearchResult(SearchResponse response, SearchParam param) {
SearchResult searchResult = new SearchResult();
//1.返回的所有查询到的商品
SearchHits hits = response.getHits();
List<SkuEsModel> esModels = new ArrayList<>();
if (hits.getHits() != null && hits.getHits().length > 0) {
for (SearchHit hit : hits.getHits()) {
String sourceAsString = hit.getSourceAsString();
SkuEsModel esModel = JSON.parseObject(sourceAsString, SkuEsModel.class);
//判断是否按照关键字搜索,如果是,高亮显示
if (!StringUtils.isEmpty(param.getKeyword())) {
String skuTitle = hit.getHighlightFields().get("skuTitle").getFragments()[0].string();
esModel.setSkuTitle(skuTitle);
}
esModels.add(esModel);
}
}
searchResult.setProducts(esModels);
//2.当前所有商品涉及到的所有属性信息
List<SearchResult.AttrVo> attrVos = new ArrayList<>();
ParsedNested attrAgg = response.getAggregations().get("attr_agg");
ParsedLongTerms attrIdAgg = attrAgg.getAggregations().get("attrId_agg");
for (Terms.Bucket bucket : attrIdAgg.getBuckets()) {
SearchResult.AttrVo attrVo = new SearchResult.AttrVo();
//属性id
long attrId = bucket.getKeyAsNumber().longValue();
attrVo.setAttrId(attrId);
//属性名
ParsedStringTerms attrNameAgg = bucket.getAggregations().get("attrName_agg");
String attrName = attrNameAgg.getBuckets().get(0).getKeyAsString();
attrVo.setAttrName(attrName);
//属性值
ParsedStringTerms attrValueAgg = bucket.getAggregations().get("attrValue_agg");
List<String> attrValues = attrValueAgg.getBuckets().stream()
.map(item -> item.getKeyAsString()).collect(Collectors.toList());
attrVo.setAttrValue(attrValues);
attrVos.add(attrVo);
}
searchResult.setAttrs(attrVos);
//3.当前所有商品涉及到的所有品牌信息
List<SearchResult.BrandVo> brandVos = new ArrayList<>();
//获取到品牌的聚合
ParsedLongTerms brandAgg = response.getAggregations().get("brand_agg");
for (Terms.Bucket bucket : brandAgg.getBuckets()) {
SearchResult.BrandVo brandVo = new SearchResult.BrandVo();
//1、得到品牌的id
long brandId = bucket.getKeyAsNumber().longValue();
brandVo.setBrandId(brandId);
//2、得到品牌的名字
ParsedStringTerms brandNameAgg = bucket.getAggregations().get("brandName_agg");
String brandName = brandNameAgg.getBuckets().get(0).getKeyAsString();
brandVo.setBrandName(brandName);
//3、得到品牌的图片
ParsedStringTerms brandImgAgg = bucket.getAggregations().get("brandImg_agg");
String brandImg = brandImgAgg.getBuckets().get(0).getKeyAsString();
brandVo.setBrandImg(brandImg);
brandVos.add(brandVo);
}
searchResult.setBrands(brandVos);
//4.当前所有商品涉及到的所有分类信息
List<SearchResult.CatalogVo> catalogVos = new ArrayList<>();
ParsedLongTerms catalogAgg = response.getAggregations().get("catalog_agg");
for (Terms.Bucket bucket : catalogAgg.getBuckets()) {
SearchResult.CatalogVo catalogVo = new SearchResult.CatalogVo();
//1、得到分类的id
long catalogId = bucket.getKeyAsNumber().longValue();
catalogVo.setCatalogId(catalogId);
//2、得到分类的名字
ParsedStringTerms catalogNameAgg = bucket.getAggregations().get("catalogName_agg");
String catalogName = catalogNameAgg.getBuckets().get(0).getKeyAsString();
catalogVo.setCatalogName(catalogName);
catalogVos.add(catalogVo);
}
searchResult.setCatalogs(catalogVos);
//5.分页信息
searchResult.setPageNum(param.getPageNum());
//5.1 当前页码
long value = hits.getTotalHits().value;
searchResult.setTotal(value);
//5.2 总记录数
int totalPage = (int) (value % EsConstant.PRODUCT_PAGESIZE == 0 ?
(int) value / EsConstant.PRODUCT_PAGESIZE : (int) value / EsConstant.PRODUCT_PAGESIZE + 1);
searchResult.setTotalPages(totalPage);
//5.3 总页码
List<Integer> pageNavs = new ArrayList<>();
for (int i = 1; i <= totalPage; i++) {
pageNavs.add(i);
}
searchResult.setPageNavs(pageNavs);
//6、构建面包屑导航
if (param.getAttrs() != null && param.getAttrs().size() > 0) {
List<SearchResult.NavVo> collect = param.getAttrs().stream().map(attr -> {
//1、分析每一个attrs传过来的参数值
SearchResult.NavVo navVo = new SearchResult.NavVo();
String[] s = attr.split("_");
navVo.setNavValue(s[1]);
R r = productFeignService.attrInfo(Long.parseLong(s[0]));
if (r.getCode() == 0) {
AttrResponseVo data = r.getData("attr", new TypeReference<AttrResponseVo>() {
});
navVo.setNavName(data.getAttrName());
} else {
navVo.setNavName(s[0]);
}
//2、取消了这个面包屑以后,我们要跳转到哪个地方,将请求的地址url里面的当前置空
//拿到所有的查询条件,去掉当前
String encode = null;
try {
encode = URLEncoder.encode(attr, "UTF-8");
encode.replace("+", "%20"); //浏览器对空格的编码和Java不一样,差异化处理
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
String replace = param.get_queryString().replace("&attrs=" + attr, "");
navVo.setLink("http://search.gulimall.com/list.html?" + replace);
return navVo;
}).collect(Collectors.toList());
searchResult.setNavs(collect);
}
return searchResult;
}
private SearchRequest buildSearchRequest(SearchParam param) {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
/**
* 模糊匹配,过滤(按照属性,分类,品牌,价格区间,库存)
*/
//1、构建检索请求
BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
//1.1、must-模糊匹配
if (!StringUtils.isEmpty(param.getKeyword())) {
boolQueryBuilder.must(QueryBuilders.matchQuery("skuTitle", param.getKeyword()));
}
//1.2 bool - filter - term
if (param.getCatalog3Id() != null) {
boolQueryBuilder.filter(QueryBuilders.termQuery("catalogId", param.getCatalog3Id()));
}
//1.2.2 brandId
if (param.getBrandId() != null) {
boolQueryBuilder.filter(QueryBuilders.termQuery("brandId", param.getBrandId()));
}
//1.2.3 attrs
if (param.getAttrs() != null && !param.getAttrs().isEmpty()) {
param.getAttrs().forEach(item -> {
//attrs=1_5寸:8寸 & attrs=2_16G:8G
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
String[] split = item.split("_");
String attrId = split[0];
String[] attrValues = split[1].split(":");
boolQuery.must(QueryBuilders.termQuery("attrs.attrId", attrId));
boolQuery.must(QueryBuilders.termsQuery("attrs.attrValue", attrValues));
NestedQueryBuilder nestedQuery = QueryBuilders.nestedQuery("attrs", boolQuery, ScoreMode.None);
boolQuery.filter(nestedQuery);
});
}
//1.2.4 hasStock
if (param.getHasStock() != null) {
boolQueryBuilder.filter(QueryBuilders.termQuery("hsStock", param.getHasStock() == 1));
}
//1.2.5 skuPrice
if (!StringUtils.isEmpty(param.getSkuPrice())) {
//skuPrice=1_500 skuPrice=500_ skuPrice=_500
RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery("skuPrice");
String[] split = param.getSkuPrice().split("_");
if (split.length == 2) {
rangeQuery.gte(split[0]).lte(split[1]);
} else if (split.length == 1) {
if (param.getSkuPrice().startsWith("_")) {
rangeQuery.lte(split[0]);
} else {
rangeQuery.gte(split[0]);
}
}
boolQueryBuilder.filter(rangeQuery);
}
//封装所有的查询条件
searchSourceBuilder.query(boolQueryBuilder);
/**
* 排序,分页,高亮
*/
//排序
//形式为sort=hotScore_asc/desc
if (!StringUtils.isEmpty(param.getSort())) {
String[] sort = param.getSort().split("_");
searchSourceBuilder.sort(sort[0], "asc".equals(sort[1]) ? SortOrder.ASC : SortOrder.DESC);
}
//分页
searchSourceBuilder.from((param.getPageNum() - 1) * EsConstant.PRODUCT_PAGESIZE);
searchSourceBuilder.size(EsConstant.PRODUCT_PAGESIZE);
//高亮
if (!StringUtils.isEmpty(param.getKeyword())) {
HighlightBuilder highlightBuilder = new HighlightBuilder();
highlightBuilder.field("skuTitle");
highlightBuilder.preTags("<b style='color:red'>");
highlightBuilder.postTags("</b>");
searchSourceBuilder.highlighter(highlightBuilder);
}
/**
* 聚合分析
*/
//品牌聚合
TermsAggregationBuilder brandAgg = AggregationBuilders.terms("brand_agg");
brandAgg.field("brandId").size(50);
brandAgg.subAggregation(AggregationBuilders.terms("brandName_agg").field("brandName").size(1));
brandAgg.subAggregation(AggregationBuilders.terms("brandImg_agg").field("brandImg").size(1));
searchSourceBuilder.aggregation(brandAgg);
//分类聚合
TermsAggregationBuilder catalogAgg = AggregationBuilders.terms("catalog_agg");
catalogAgg.field("catalogId").size(50);
catalogAgg.subAggregation(AggregationBuilders.terms("catalogName_agg").field("catalogName").size(1));
searchSourceBuilder.aggregation(catalogAgg);
//属性聚合
NestedAggregationBuilder nested = AggregationBuilders.nested("attr_agg", "attrs");
//按照属性id聚合
TermsAggregationBuilder attrIdAgg = AggregationBuilders.terms("attrId_agg").field("attrs.attrId");
nested.subAggregation(attrIdAgg);
//在每个attrId下按照attrValue聚合
attrIdAgg.subAggregation(AggregationBuilders.terms("attrValue_agg").field("attrs.attrValue").size(50));
//在每个attrId下再聚合attrName
attrIdAgg.subAggregation(AggregationBuilders.terms("attrName_agg").field("attrs.attrName").size(1));
searchSourceBuilder.aggregation(nested);
log.info("构建的DSL语句:{}", searchSourceBuilder.toString());
SearchRequest searchRequest = new SearchRequest(new String[]{EsConstant.PRODUCT_INDEX}, searchSourceBuilder);
return searchRequest;
}
}
控制器:
@GetMapping("/list.html")
private String listPage(SearchParam param, Model model, HttpServletRequest request) {
param.set_queryString(request.getQueryString());
SearchResult result = mallSearchService.search(param);
model.addAttribute("result", result);
return "list";
}
异步
创建线程
第一种方式:
public class ThreadDemo {
public static void main(String[] args) {
System.out.println("main .... start ....");
MyThread myThread = new MyThread();
myThread.start();
System.out.println("main .... end ....");
}
public static class MyThread extends Thread {
@Override
public void run() {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("i = " + i);
}
}
}
第二种方式:
public static void main(String[] args) {
System.out.println("main .... start ....");
Runnable01 runnable01 = new Runnable01();
Thread thread = new Thread(runnable01);
thread.start();
System.out.println("main .... end ....");
}
public static class Runnable01 implements Runnable {
@Override
public void run() {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("i = " + i);
}
}
第三种方式:
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main .... start ....");
FutureTask<Integer> task = new FutureTask<>(new Callable01());
new Thread(task, "A").start();
Integer i = task.get();
//get会阻塞,直到线程执行完毕
System.out.println("i = " + i);
System.out.println("main .... end ....");
}
public static class Callable01 implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("i = " + i);
return i;
}
}
第四种方式:
public static ExecutorService service = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main .... start ....");
service.execute(new Runnable01());
System.out.println("main .... end ....");
}
线程池
线程池构造器:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
corePoolSize
(核心线程数):
- 描述:线程池中始终保持存活的线程数,即使它们处于空闲状态。
maximumPoolSize
(最大线程数):
- 描述:线程池中允许存在的最大线程数。
keepAliveTime
(线程空闲时间):
- 描述:当线程池中的线程数超过核心线程数时,多余的空闲线程在被终止之前等待新任务的最长时间。
unit
(时间单位):
- 描述:用于指定 keepAliveTime 的时间单位,可以是秒、毫秒等。
workQueue
(工作队列):
- 描述:用于保存等待执行的任务的阻塞队列。类型:BlockingQueue
<Runnable>
。threadFactory
(线程工厂):
- 描述:用于创建新线程的工厂。类型:ThreadFactory 接口的实现。
handler
(拒绝策略):
- 描述:当工作队列已满,并且无法再接受新任务时,用于处理新任务的策略。类型:RejectedExecutionHandler 接口的实现。
面试题:一个线程池 core 7; max 20 ,queue:50,100 并发进来怎么分配的;
答案:先有 7 个能直接得到执行,接下来 50 个进入队列排队,在多开 13 个继续执行。现在70 个被安排上了。剩下 30 个默认拒绝策略。
常见线程池:
- FixedThreadPool (固定大小线程池):
FixedThreadPool
是一个具有固定线程数量的线程池。- 在执行任务时,如果线程池中的线程都在执行任务,新任务会被放入队列中等待。
- 适用于并发任务数量可控的场景。
- CachedThreadPool (缓存线程池):
CachedThreadPool
是一个可根据需要创建新线程的线程池,线程池的大小可动态调整。- 在执行任务时,如果线程池中的线程都在执行任务,会创建新的线程来处理新任务。
- 适用于短生命周期的异步任务。
- SingleThreadExecutor (单线程线程池):
SingleThreadExecutor
是一个仅包含一个线程的线程池。- 所有提交的任务都按顺序执行,保证不会有并发执行的情况。
- 适用于需要保证任务按照顺序执行的场景。
- ScheduledThreadPool (定时任务线程池):
ScheduledThreadPool
是一个支持定时以及周期性执行任务的线程池。- 可以用于执行定时任务,例如定时执行任务、周期性执行任务等。
- 适用于需要按照一定规律执行任务的场景。
这些线程池实现都是通过 Executors
工厂类创建的,提供了方便的线程池创建和管理方式。
CompletableFuture
创建异步对象
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
runAsync没有返回值,supply有返回值
runAsync
public static ExecutorService service = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("start ...");
CompletableFuture<Void> future =CompletableFuture.runAsync(()->{
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("i = " + i);
},service);
System.out.println("end ...");
}
supplyAsync
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("start ...");
CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("i = " + i);
return i;
}, service);
Integer i = integerCompletableFuture.get();
System.out.println("i = " + i);
System.out.println("end2 ...");
}
完成时回调
whenComplete回调
public static ExecutorService service = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("start ...");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("current thread: " + Thread.currentThread().getName());
return 10 / 2;
}, service).whenComplete((result, e) -> {
System.out.println("current thread: " + Thread.currentThread().getName());
if (e == null) {
System.out.println("result: " + result);
} else {
System.out.println("exception: " + e);
}
}).exceptionally(e -> {
System.out.println("exception: " + e);
return 0;
});
Integer i = future.get();
System.out.println("result2: " + i);
System.out.println("end ...");
}
后续处理handle:
public static ExecutorService service = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("start ...");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("current thread: " + Thread.currentThread().getName());
return 10 / 2;
}, service).handle((res, thr) -> {
System.out.println("current thread: " + Thread.currentThread().getName());
return res * 2;
});
Integer i = future.get();
System.out.println("result2: " + i);
System.out.println("end ...");
}
总结:
public CompletableFuture<T> whenComplete(
BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(null, action);
}
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(defaultExecutor(), action);
}
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action, Executor executor) {
return uniWhenCompleteStage(screenExecutor(executor), action);
}
public CompletableFuture<T> exceptionally(
Function<Throwable, ? extends T> fn) {
return uniExceptionallyStage(null, fn);
}
whenComplete 处理正常和异常的结果,exceptionally 处理异常情况。
whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。
whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行
handle:和 complete 一样,可对结果做最后的处理(可处理异常),可改变返回值。
线程串行化方法
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn) {
return uniApplyStage(defaultExecutor(), fn);
}
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn, Executor executor) {
return uniApplyStage(screenExecutor(executor), fn);
}
thenApply
: 这个方法表示当当前的CompletableFuture完成时,将执行提供的函数,并返回一个新的CompletableFuture,其结果是应用该函数的结果。thenApplyAsync
: 这是异步版本的thenApply
,它使用默认的Executor执行器执行提供的函数。thenApplyAsync
(带有Executor参数): 这是具有自定义Executor执行器的异步版本,允许你指定一个特定的执行器来执行提供的函数。
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
return uniAcceptStage(null, action);
}
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
return uniAcceptStage(defaultExecutor(), action);
}
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
Executor executor) {
return uniAcceptStage(screenExecutor(executor), action);
}
thenAccept
: 当当前的CompletableFuture完成时,将执行提供的Consumer函数,但不返回新的结果。相反,返回一个CompletableFuture<Void>
,表示这个阶段的操作不产生结果。thenAcceptAsync
: 这是异步版本的thenAccept
,使用默认的Executor执行器执行提供的Consumer函数。thenAcceptAsync
(带有Executor参数): 这是具有自定义Executor执行器的异步版本,允许你指定一个特定的执行器来执行提供的Consumer函数。
public CompletableFuture<Void> thenRun(Runnable action) {
return uniRunStage(null, action);
}
public CompletableFuture<Void> thenRunAsync(Runnable action) {
return uniRunStage(defaultExecutor(), action);
}
public CompletableFuture<Void> thenRunAsync(Runnable action,
Executor executor) {
return uniRunStage(screenExecutor(executor), action);
}
thenRun
: 当前CompletableFuture
完成后,将执行提供的Runnable
操作,但不返回新的结果。相反,返回一个CompletableFuture<Void>
,表示这个阶段的操作不产生结果。thenRunAsync
: 这是thenRun
的异步版本,使用默认的Executor
执行器执行提供的Runnable
操作。thenRunAsync
(带有Executor
参数): 这是具有自定义Executor
执行器的异步版本,允许你指定一个特定的执行器来执行提供的Runnable
操作。
两任务组合
public <U,V> CompletableFuture<V> thenCombine(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(null, other, fn);
}
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(defaultExecutor(), other, fn);
}
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
return biApplyStage(screenExecutor(executor), other, fn);
}
public <U> CompletableFuture<Void> thenAcceptBoth(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action) {
return biAcceptStage(null, other, action);
}
public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action) {
return biAcceptStage(defaultExecutor(), other, action);
}
public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action, Executor executor) {
return biAcceptStage(screenExecutor(executor), other, action);
}
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
Runnable action) {
return biRunStage(null, other, action);
}
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
Runnable action) {
return biRunStage(defaultExecutor(), other, action);
}
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
Runnable action,
Executor executor) {
return biRunStage(screenExecutor(executor), other, action);
}
- thenCombine:组合两个 future,获取两个 future 的返回结果,并返回当前任务的返回值
- thenAcceptBoth:组合两个 future,获取两个 future 任务的返回结果,然后处理任务,没有返回值。
- runAfterBoth:组合两个 future,不需要获取 future 的结果,只需两个future 处理完任务后,处理该任务。
两任务组合完成一个
把上面的both换成either,当两个任务中,任意一个 future 任务完成的时候,执行任务。
多任务组合
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
allOf:等待所有任务完成
anyOf:只要有一个任务完成
商品详情
配置网关
- id: gulimall_host_route # gulimall.com
uri: lb://gulimall-product
predicates:
- Host=gulimall.com,item.gulimall.com
配置线程池:
@ConfigurationProperties(prefix = "gulimall.thread")
// @Component
@Data
public class ThreadPoolConfigProperties {
private Integer coreSize;
private Integer maxSize;
private Integer keepAliveTime;
}
在配置文件中输入这些值:
#config thread pool
gulimall.thread.coreSize=20
gulimall.thread.maxSize=200
gulimall.thread.keepAliveTime=10
配置线程池容器:
@EnableConfigurationProperties(ThreadPoolConfigProperties.class)
@Configuration
public class MyThreadConfig {
@Bean
public ThreadPoolExecutor threadPoolExecutor(ThreadPoolConfigProperties pool) {
return new ThreadPoolExecutor(
pool.getCoreSize(),
pool.getMaxSize(),
pool.getKeepAliveTime(),
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(100000),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
}
}
使用:
@Override
public SkuItemVo item(Long skuId) throws ExecutionException, InterruptedException {
SkuItemVo skuItemVo = new SkuItemVo();
CompletableFuture<SkuInfoEntity> infoFuture = CompletableFuture.supplyAsync(() -> {
//1、sku基本信息的获取 pms_sku_info
SkuInfoEntity info = this.getById(skuId);
skuItemVo.setInfo(info);
return info;
}, executor);
CompletableFuture<Void> saleAttrFuture = infoFuture.thenAcceptAsync((res) -> {
//3、获取spu的销售属性组合
List<SkuItemSaleAttrVo> saleAttrVos = skuSaleAttrValueService.getSaleAttrBySpuId(res.getSpuId());
skuItemVo.setSaleAttr(saleAttrVos);
}, executor);
CompletableFuture<Void> descFuture = infoFuture.thenAcceptAsync((res) -> {
//4、获取spu的介绍 pms_spu_info_desc
SpuInfoDescEntity spuInfoDescEntity = spuInfoDescService.getById(res.getSpuId());
skuItemVo.setDesc(spuInfoDescEntity);
}, executor);
CompletableFuture<Void> baseAttrFuture = infoFuture.thenAcceptAsync((res) -> {
//5、获取spu的规格参数信息
List<SpuItemAttrGroupVo> attrGroupVos = attrGroupService.getAttrGroupWithAttrsBySpuId(res.getSpuId(), res.getCatalogId());
skuItemVo.setGroupAttrs(attrGroupVos);
}, executor);
// Long spuId = info.getSpuId();
// Long catalogId = info.getCatalogId();
//2、sku的图片信息 pms_sku_images
CompletableFuture<Void> imageFuture = CompletableFuture.runAsync(() -> {
List<SkuImagesEntity> imagesEntities = skuImagesService.getImagesBySkuId(skuId);
skuItemVo.setImages(imagesEntities);
}, executor);
//等到所有任务都完成
CompletableFuture.allOf(saleAttrFuture,descFuture,baseAttrFuture,imageFuture).get();
return skuItemVo;
}