跳至主要內容

谷粒商城分布式高级篇2

项目实战谷粒商城项目实战谷粒商城大约 13 分钟约 4035 字全民制作人ikun

谷粒商城分布式高级篇2

检索服务

导入页面到search目录下面,编辑host文件加上最后一行

image-20240131193252288
image-20240131193252288

nginx修改:

    server_name  *.gulimall.com;

配置网关:

        - id: gulimall_host_route # gulimall.com
          uri: lb://gulimall-product
          predicates:
            - Host=gulimall.com
        - id: gulimall_search_route # search.gulimall.com
          uri: lb://gulimall-search
          predicates:
            - Host=search.gulimall.com

修改mapping映射

//PUT gulimall_product
{
  "mappings": {
    "properties": {
      "skuId": {
        "type": "long"
      },
      "spuId": {
        "type": "long"
      },
      "skuTitle": {
        "type": "text",
        "analyzer": "ik_smart"
      },
      "skuPrice": {
        "type": "keyword"
      },
      "skuImg": {
        "type": "keyword"
      },
      "saleCount": {
        "type": "long"
      },
      "hosStock": {
        "type": "boolean"
      },
      "hotScore": {
        "type": "long"
      },
      "brandId": {
        "type": "long"
      },
      "catelogId": {
        "type": "long"
      },
      "brandName": {
        "type": "keyword"
      },
      "brandImg": {
        "type": "keyword"
      },
      "catalogName": {
        "type": "keyword"
      },
      "attrs": {
        "type": "nested",
        "properties": {
          "attrId": {
            "type": "long"
          },
          "attrName": {
            "type": "keyword"
          },
          "attrValue": {
            "type": "keyword"
          }
        }
      }
    }
  }
}

构建检索dsl语句:

//GET gulimall_product/_search
{
  "query": {
    "bool": {
      "must": [
        {
          "match": {
            "skuTitle": "华为"
          }
        }
      ],
      "filter": [
        {
          "term": {
            "catalogId": "225"
          }
        },
        {
          "terms": {
            "brandId": [
              "1",
              "2",
              "9"
            ]
          }
        },
        {
          "nested": {
            "path": "attrs",
            "query": {
              "bool": {
                "must": [
                  {
                    "term": {
                      "attrs.attrId": {
                        "valu e": "9"
                      }
                    }
                  },
                  {
                    "terms": {
                      "attrs.attrValue": [
                        "高通",
                        "海思"
                      ]
                    }
                  }
                ]
              }
            }
          }
        },
        {
          "term": {
            "hasStock": {
              "value": "true"
            }
          }
        },
        {
          "range": {
            "skuPrice": {
              "gte": 0,
              "lte": 6500
            }
          }
        }
      ]
    }
  },
  "sort": [
    {
      "skuPrice": {
        "order": "desc"
      }
    }
  ],
  "from": 0,
  "size": 5,
  "highlight": {
    "fields": {
      "skuTitle": {}
    },
    "pre_tags": "<b style='color:red'>",
    "post_tags": "</b>"
  },
  "aggs": {
    "brand_agg": {
      "terms": {
        "field": "brandId",
        "size": 10
      },
      "aggs": {
        "brand_name_agg": {
          "terms": {
            "field": "brandName",
            "size": 10
          }
        },
        "brand_img_agg": {
          "terms": {
            "field": "brandImg",
            "size": 10
          }
        }
      }
    },
    "catalog_agg": {
      "terms": {
        "field": "catalogId",
        "size": 10
      },
      "aggs": {
        "catalog_name_agg": {
          "terms": {
            "field": "catalogName",
            "size": 10
          }
        }
      }
    },
    "attr_agg": {
      "nested": {
        "path": "attrs"
      },
      "aggs": {
        "attr_id_agg": {
          "terms": {
            "field": "attrs.attrId",
            "size": 100
          },
          "aggs": {
            "attr_name_agg": {
              "terms": {
                "field": "attrs.attrName",
                "size": 10
              }
            },
            "attr_value_agg": {
              "terms": {
                "field": "attrs.attrValue",
                "size": 10
              }
            }
          }
        }
      }
    }
  }
}
  1. 查询条件
    • 使用bool查询,包含must(必须匹配)和filter(过滤)子句。
    • match子句匹配包含关键字"华为"的skuTitle字段。
    • term子句过滤catalogId为"225"的商品。
    • terms子句过滤brandId为"1"、"2"或"9"的商品。
    • nested子句对attrs字段进行嵌套查询,要求attrs.attrId为"9",并且attrs.attrValue为"高通"或"海思"。
    • term子句过滤hasStock字段为"true"的商品。
    • range子句过滤skuPrice在0到6500之间的商品。
  2. 排序
    • 结果按照skuPrice字段降序排列。
  3. 分页
    • 从搜索结果的第0条记录开始,获取5条记录。
  4. 高亮显示
    • 对匹配的skuTitle字段进行高亮显示,用红色标签。
  5. 聚合(Aggregations)
    • brand_agg聚合按照brandId字段进行分组,同时计算每个分组内的品牌名称和品牌图片。
    • catalog_agg聚合按照catalogId字段进行分组,同时计算每个分组内的目录名称。
    • attr_agg聚合对attrs字段进行嵌套分组,计算每个属性(attrId)下的属性名称(attrName)和属性值(attrValue)。

转化为Java 代码:

@Service
@Slf4j
public class MallSearchServiceImpl implements MallSearchService {

    @Autowired
    private RestHighLevelClient restHighLevelClient;

    @Autowired
    private ProductFeignService productFeignService;

    @Override
    public SearchResult search(SearchParam param) {
        SearchRequest searchRequest = buildSearchRequest(param);
        SearchResult searchResult = null;
        try {
            SearchResponse response = restHighLevelClient.search(searchRequest, GulimallElasticSearchConfig.COMMON_OPTIONS);
            searchResult = buildSearchResult(response, param);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        return searchResult;
    }

    private SearchResult buildSearchResult(SearchResponse response, SearchParam param) {
        SearchResult searchResult = new SearchResult();
        //1.返回的所有查询到的商品
        SearchHits hits = response.getHits();
        List<SkuEsModel> esModels = new ArrayList<>();
        if (hits.getHits() != null && hits.getHits().length > 0) {
            for (SearchHit hit : hits.getHits()) {
                String sourceAsString = hit.getSourceAsString();
                SkuEsModel esModel = JSON.parseObject(sourceAsString, SkuEsModel.class);
                //判断是否按照关键字搜索,如果是,高亮显示
                if (!StringUtils.isEmpty(param.getKeyword())) {
                    String skuTitle = hit.getHighlightFields().get("skuTitle").getFragments()[0].string();
                    esModel.setSkuTitle(skuTitle);
                }
                esModels.add(esModel);
            }
        }
        searchResult.setProducts(esModels);

        //2.当前所有商品涉及到的所有属性信息
        List<SearchResult.AttrVo> attrVos = new ArrayList<>();
        ParsedNested attrAgg = response.getAggregations().get("attr_agg");
        ParsedLongTerms attrIdAgg = attrAgg.getAggregations().get("attrId_agg");
        for (Terms.Bucket bucket : attrIdAgg.getBuckets()) {
            SearchResult.AttrVo attrVo = new SearchResult.AttrVo();
            //属性id
            long attrId = bucket.getKeyAsNumber().longValue();
            attrVo.setAttrId(attrId);

            //属性名
            ParsedStringTerms attrNameAgg = bucket.getAggregations().get("attrName_agg");
            String attrName = attrNameAgg.getBuckets().get(0).getKeyAsString();
            attrVo.setAttrName(attrName);

            //属性值
            ParsedStringTerms attrValueAgg = bucket.getAggregations().get("attrValue_agg");
            List<String> attrValues = attrValueAgg.getBuckets().stream()
                    .map(item -> item.getKeyAsString()).collect(Collectors.toList());
            attrVo.setAttrValue(attrValues);
            attrVos.add(attrVo);
        }
        searchResult.setAttrs(attrVos);

        //3.当前所有商品涉及到的所有品牌信息
        List<SearchResult.BrandVo> brandVos = new ArrayList<>();
        //获取到品牌的聚合
        ParsedLongTerms brandAgg = response.getAggregations().get("brand_agg");
        for (Terms.Bucket bucket : brandAgg.getBuckets()) {
            SearchResult.BrandVo brandVo = new SearchResult.BrandVo();

            //1、得到品牌的id
            long brandId = bucket.getKeyAsNumber().longValue();
            brandVo.setBrandId(brandId);

            //2、得到品牌的名字
            ParsedStringTerms brandNameAgg = bucket.getAggregations().get("brandName_agg");
            String brandName = brandNameAgg.getBuckets().get(0).getKeyAsString();
            brandVo.setBrandName(brandName);

            //3、得到品牌的图片
            ParsedStringTerms brandImgAgg = bucket.getAggregations().get("brandImg_agg");
            String brandImg = brandImgAgg.getBuckets().get(0).getKeyAsString();
            brandVo.setBrandImg(brandImg);

            brandVos.add(brandVo);
        }
        searchResult.setBrands(brandVos);

        //4.当前所有商品涉及到的所有分类信息
        List<SearchResult.CatalogVo> catalogVos = new ArrayList<>();
        ParsedLongTerms catalogAgg = response.getAggregations().get("catalog_agg");
        for (Terms.Bucket bucket : catalogAgg.getBuckets()) {
            SearchResult.CatalogVo catalogVo = new SearchResult.CatalogVo();
            //1、得到分类的id
            long catalogId = bucket.getKeyAsNumber().longValue();
            catalogVo.setCatalogId(catalogId);

            //2、得到分类的名字
            ParsedStringTerms catalogNameAgg = bucket.getAggregations().get("catalogName_agg");
            String catalogName = catalogNameAgg.getBuckets().get(0).getKeyAsString();
            catalogVo.setCatalogName(catalogName);

            catalogVos.add(catalogVo);
        }
        searchResult.setCatalogs(catalogVos);

        //5.分页信息
        searchResult.setPageNum(param.getPageNum());
        //5.1 当前页码
        long value = hits.getTotalHits().value;
        searchResult.setTotal(value);
        //5.2 总记录数
        int totalPage = (int) (value % EsConstant.PRODUCT_PAGESIZE == 0 ?
                (int) value / EsConstant.PRODUCT_PAGESIZE : (int) value / EsConstant.PRODUCT_PAGESIZE + 1);
        searchResult.setTotalPages(totalPage);


        //5.3 总页码
        List<Integer> pageNavs = new ArrayList<>();
        for (int i = 1; i <= totalPage; i++) {
            pageNavs.add(i);
        }
        searchResult.setPageNavs(pageNavs);

        //6、构建面包屑导航
        if (param.getAttrs() != null && param.getAttrs().size() > 0) {
            List<SearchResult.NavVo> collect = param.getAttrs().stream().map(attr -> {
                //1、分析每一个attrs传过来的参数值
                SearchResult.NavVo navVo = new SearchResult.NavVo();
                String[] s = attr.split("_");
                navVo.setNavValue(s[1]);
                R r = productFeignService.attrInfo(Long.parseLong(s[0]));
                if (r.getCode() == 0) {
                    AttrResponseVo data = r.getData("attr", new TypeReference<AttrResponseVo>() {
                    });
                    navVo.setNavName(data.getAttrName());
                } else {
                    navVo.setNavName(s[0]);
                }

                //2、取消了这个面包屑以后,我们要跳转到哪个地方,将请求的地址url里面的当前置空
                //拿到所有的查询条件,去掉当前
                String encode = null;
                try {
                    encode = URLEncoder.encode(attr, "UTF-8");
                    encode.replace("+", "%20");  //浏览器对空格的编码和Java不一样,差异化处理
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
                String replace = param.get_queryString().replace("&attrs=" + attr, "");
                navVo.setLink("http://search.gulimall.com/list.html?" + replace);

                return navVo;
            }).collect(Collectors.toList());

            searchResult.setNavs(collect);
        }


        return searchResult;
    }

    private SearchRequest buildSearchRequest(SearchParam param) {
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        /**
         * 模糊匹配,过滤(按照属性,分类,品牌,价格区间,库存)
         */
        //1、构建检索请求
        BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();

        //1.1、must-模糊匹配
        if (!StringUtils.isEmpty(param.getKeyword())) {
            boolQueryBuilder.must(QueryBuilders.matchQuery("skuTitle", param.getKeyword()));
        }
        //1.2 bool - filter - term
        if (param.getCatalog3Id() != null) {
            boolQueryBuilder.filter(QueryBuilders.termQuery("catalogId", param.getCatalog3Id()));
        }

        //1.2.2 brandId
        if (param.getBrandId() != null) {
            boolQueryBuilder.filter(QueryBuilders.termQuery("brandId", param.getBrandId()));
        }

        //1.2.3 attrs
        if (param.getAttrs() != null && !param.getAttrs().isEmpty()) {
            param.getAttrs().forEach(item -> {
                //attrs=1_5寸:8寸   &   attrs=2_16G:8G
                BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
                String[] split = item.split("_");
                String attrId = split[0];
                String[] attrValues = split[1].split(":");
                boolQuery.must(QueryBuilders.termQuery("attrs.attrId", attrId));
                boolQuery.must(QueryBuilders.termsQuery("attrs.attrValue", attrValues));
                NestedQueryBuilder nestedQuery = QueryBuilders.nestedQuery("attrs", boolQuery, ScoreMode.None);
                boolQuery.filter(nestedQuery);
            });
        }

        //1.2.4 hasStock
        if (param.getHasStock() != null) {
            boolQueryBuilder.filter(QueryBuilders.termQuery("hsStock", param.getHasStock() == 1));
        }

        //1.2.5 skuPrice
        if (!StringUtils.isEmpty(param.getSkuPrice())) {
            //skuPrice=1_500  skuPrice=500_ skuPrice=_500
            RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery("skuPrice");
            String[] split = param.getSkuPrice().split("_");
            if (split.length == 2) {
                rangeQuery.gte(split[0]).lte(split[1]);
            } else if (split.length == 1) {
                if (param.getSkuPrice().startsWith("_")) {
                    rangeQuery.lte(split[0]);
                } else {
                    rangeQuery.gte(split[0]);
                }
            }
            boolQueryBuilder.filter(rangeQuery);
        }
        //封装所有的查询条件
        searchSourceBuilder.query(boolQueryBuilder);

        /**
         * 排序,分页,高亮
         */
        //排序
        //形式为sort=hotScore_asc/desc
        if (!StringUtils.isEmpty(param.getSort())) {
            String[] sort = param.getSort().split("_");
            searchSourceBuilder.sort(sort[0], "asc".equals(sort[1]) ? SortOrder.ASC : SortOrder.DESC);
        }
        //分页
        searchSourceBuilder.from((param.getPageNum() - 1) * EsConstant.PRODUCT_PAGESIZE);
        searchSourceBuilder.size(EsConstant.PRODUCT_PAGESIZE);

        //高亮
        if (!StringUtils.isEmpty(param.getKeyword())) {
            HighlightBuilder highlightBuilder = new HighlightBuilder();
            highlightBuilder.field("skuTitle");
            highlightBuilder.preTags("<b style='color:red'>");
            highlightBuilder.postTags("</b>");
            searchSourceBuilder.highlighter(highlightBuilder);
        }

        /**
         * 聚合分析
         */
        //品牌聚合
        TermsAggregationBuilder brandAgg = AggregationBuilders.terms("brand_agg");
        brandAgg.field("brandId").size(50);

        brandAgg.subAggregation(AggregationBuilders.terms("brandName_agg").field("brandName").size(1));
        brandAgg.subAggregation(AggregationBuilders.terms("brandImg_agg").field("brandImg").size(1));
        searchSourceBuilder.aggregation(brandAgg);

        //分类聚合
        TermsAggregationBuilder catalogAgg = AggregationBuilders.terms("catalog_agg");
        catalogAgg.field("catalogId").size(50);
        catalogAgg.subAggregation(AggregationBuilders.terms("catalogName_agg").field("catalogName").size(1));
        searchSourceBuilder.aggregation(catalogAgg);

        //属性聚合
        NestedAggregationBuilder nested = AggregationBuilders.nested("attr_agg", "attrs");
        //按照属性id聚合
        TermsAggregationBuilder attrIdAgg = AggregationBuilders.terms("attrId_agg").field("attrs.attrId");
        nested.subAggregation(attrIdAgg);
        //在每个attrId下按照attrValue聚合
        attrIdAgg.subAggregation(AggregationBuilders.terms("attrValue_agg").field("attrs.attrValue").size(50));
        //在每个attrId下再聚合attrName
        attrIdAgg.subAggregation(AggregationBuilders.terms("attrName_agg").field("attrs.attrName").size(1));
        searchSourceBuilder.aggregation(nested);

        log.info("构建的DSL语句:{}", searchSourceBuilder.toString());
        SearchRequest searchRequest = new SearchRequest(new String[]{EsConstant.PRODUCT_INDEX}, searchSourceBuilder);
        return searchRequest;
    }


}

控制器:

@GetMapping("/list.html")
private String listPage(SearchParam param, Model model, HttpServletRequest request) {
    param.set_queryString(request.getQueryString());
    SearchResult result = mallSearchService.search(param);
    model.addAttribute("result", result);
    return "list";
}

异步

创建线程

第一种方式:

public class ThreadDemo {
    public static void main(String[] args) {
        System.out.println("main .... start ....");
        MyThread myThread = new MyThread();
        myThread.start();
        System.out.println("main .... end ....");
    }

    public static class MyThread extends Thread {
        @Override
        public void run() {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("i = " + i);
        }
    }
}

第二种方式:

public static void main(String[] args) {
    System.out.println("main .... start ....");
    Runnable01 runnable01 = new Runnable01();
    Thread thread = new Thread(runnable01);
    thread.start();
    System.out.println("main .... end ....");
}

public static class Runnable01 implements Runnable {

    @Override
    public void run() {
        System.out.println("当前线程:" + Thread.currentThread().getId());
        int i = 10 / 2;
        System.out.println("i = " + i);
    }
}

第三种方式:

public static void main(String[] args) throws ExecutionException, InterruptedException {
    System.out.println("main .... start ....");
    FutureTask<Integer> task = new FutureTask<>(new Callable01());
    new Thread(task, "A").start();
    Integer i = task.get();
    //get会阻塞,直到线程执行完毕
    System.out.println("i = " + i);
    System.out.println("main .... end ....");
}

public static class Callable01 implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
        System.out.println("当前线程:" + Thread.currentThread().getId());
        int i = 10 / 2;
        System.out.println("i = " + i);
        return i;
    }
}

第四种方式:

public static ExecutorService service = Executors.newFixedThreadPool(10);

public static void main(String[] args) throws ExecutionException, InterruptedException {
    System.out.println("main .... start ....");
    service.execute(new Runnable01());
    System.out.println("main .... end ....");
}

线程池

线程池构造器:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
  1. corePoolSize(核心线程数):
    • 描述:线程池中始终保持存活的线程数,即使它们处于空闲状态。
  2. maximumPoolSize(最大线程数):
    • 描述:线程池中允许存在的最大线程数。
  3. keepAliveTime(线程空闲时间):
    • 描述:当线程池中的线程数超过核心线程数时,多余的空闲线程在被终止之前等待新任务的最长时间。
  4. unit(时间单位):
    • 描述:用于指定 keepAliveTime 的时间单位,可以是秒、毫秒等。
  5. workQueue(工作队列):
    • 描述:用于保存等待执行的任务的阻塞队列。类型:BlockingQueue<Runnable>
  6. threadFactory(线程工厂):
    • 描述:用于创建新线程的工厂。类型:ThreadFactory 接口的实现。
  7. handler(拒绝策略):
    • 描述:当工作队列已满,并且无法再接受新任务时,用于处理新任务的策略。类型:RejectedExecutionHandler 接口的实现。

面试题:一个线程池 core 7; max 20 ,queue:50,100 并发进来怎么分配的;

答案:先有 7 个能直接得到执行,接下来 50 个进入队列排队,在多开 13 个继续执行。现在70 个被安排上了。剩下 30 个默认拒绝策略。

常见线程池:

  1. FixedThreadPool (固定大小线程池):
    • FixedThreadPool 是一个具有固定线程数量的线程池。
    • 在执行任务时,如果线程池中的线程都在执行任务,新任务会被放入队列中等待。
    • 适用于并发任务数量可控的场景。
  2. CachedThreadPool (缓存线程池):
    • CachedThreadPool 是一个可根据需要创建新线程的线程池,线程池的大小可动态调整。
    • 在执行任务时,如果线程池中的线程都在执行任务,会创建新的线程来处理新任务。
    • 适用于短生命周期的异步任务。
  3. SingleThreadExecutor (单线程线程池):
    • SingleThreadExecutor 是一个仅包含一个线程的线程池。
    • 所有提交的任务都按顺序执行,保证不会有并发执行的情况。
    • 适用于需要保证任务按照顺序执行的场景。
  4. ScheduledThreadPool (定时任务线程池):
    • ScheduledThreadPool 是一个支持定时以及周期性执行任务的线程池。
    • 可以用于执行定时任务,例如定时执行任务、周期性执行任务等。
    • 适用于需要按照一定规律执行任务的场景。

这些线程池实现都是通过 Executors 工厂类创建的,提供了方便的线程池创建和管理方式。

CompletableFuture

创建异步对象

public static CompletableFuture<Void> runAsync(Runnable runnable) 
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)

runAsync没有返回值,supply有返回值

runAsync

public static ExecutorService service = Executors.newFixedThreadPool(10);

public static void main(String[] args) throws ExecutionException, InterruptedException {
    System.out.println("start ...");
    CompletableFuture<Void> future =CompletableFuture.runAsync(()->{
        System.out.println("当前线程:" + Thread.currentThread().getId());
        int i = 10 / 2;
        System.out.println("i = " + i);
    },service);
    System.out.println("end ...");
}

supplyAsync

public static void main(String[] args) throws ExecutionException, InterruptedException {
    System.out.println("start ...");
    CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
        System.out.println("当前线程:" + Thread.currentThread().getId());
        int i = 10 / 2;
        System.out.println("i = " + i);
        return i;
    }, service);
    Integer i = integerCompletableFuture.get();
    System.out.println("i = " + i);
    System.out.println("end2 ...");
}

完成时回调

whenComplete回调

public static ExecutorService service = Executors.newFixedThreadPool(10);

public static void main(String[] args) throws ExecutionException, InterruptedException {
    System.out.println("start ...");
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
        System.out.println("current thread: " + Thread.currentThread().getName());
        return 10 / 2;
    }, service).whenComplete((result, e) -> {
        System.out.println("current thread: " + Thread.currentThread().getName());
        if (e == null) {
            System.out.println("result: " + result);
        } else {
            System.out.println("exception: " + e);
        }
    }).exceptionally(e -> {
        System.out.println("exception: " + e);
        return 0;
    });
    Integer i = future.get();
    System.out.println("result2: " + i);
    System.out.println("end ...");
}

后续处理handle:

public static ExecutorService service = Executors.newFixedThreadPool(10);

public static void main(String[] args) throws ExecutionException, InterruptedException {
    System.out.println("start ...");
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
        System.out.println("current thread: " + Thread.currentThread().getName());
        return 10 / 2;
    }, service).handle((res, thr) -> {
        System.out.println("current thread: " + Thread.currentThread().getName());
        return res * 2;
    });
    Integer i = future.get();
    System.out.println("result2: " + i);
    System.out.println("end ...");
}

总结:

public CompletableFuture<T> whenComplete(
    BiConsumer<? super T, ? super Throwable> action) {
    return uniWhenCompleteStage(null, action);
}

public CompletableFuture<T> whenCompleteAsync(
    BiConsumer<? super T, ? super Throwable> action) {
    return uniWhenCompleteStage(defaultExecutor(), action);
}

public CompletableFuture<T> whenCompleteAsync(
    BiConsumer<? super T, ? super Throwable> action, Executor executor) {
    return uniWhenCompleteStage(screenExecutor(executor), action);
}

public CompletableFuture<T> exceptionally(
    Function<Throwable, ? extends T> fn) {
    return uniExceptionallyStage(null, fn);
}

whenComplete 处理正常和异常的结果,exceptionally 处理异常情况。

whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。

whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行

handle:和 complete 一样,可对结果做最后的处理(可处理异常),可改变返回值。

线程串行化方法

    public <U> CompletableFuture<U> thenApply(
        Function<? super T,? extends U> fn) {
        return uniApplyStage(null, fn);
    }

    public <U> CompletableFuture<U> thenApplyAsync(
        Function<? super T,? extends U> fn) {
        return uniApplyStage(defaultExecutor(), fn);
    }

    public <U> CompletableFuture<U> thenApplyAsync(
        Function<? super T,? extends U> fn, Executor executor) {
        return uniApplyStage(screenExecutor(executor), fn);
    }

  • thenApply: 这个方法表示当当前的CompletableFuture完成时,将执行提供的函数,并返回一个新的CompletableFuture,其结果是应用该函数的结果。
  • thenApplyAsync: 这是异步版本的thenApply,它使用默认的Executor执行器执行提供的函数。
  • thenApplyAsync(带有Executor参数): 这是具有自定义Executor执行器的异步版本,允许你指定一个特定的执行器来执行提供的函数。
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
    return uniAcceptStage(null, action);
}

public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
    return uniAcceptStage(defaultExecutor(), action);
}

public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
                                               Executor executor) {
    return uniAcceptStage(screenExecutor(executor), action);
}
  • thenAccept: 当当前的CompletableFuture完成时,将执行提供的Consumer函数,但不返回新的结果。相反,返回一个CompletableFuture<Void>,表示这个阶段的操作不产生结果。
  • thenAcceptAsync: 这是异步版本的thenAccept,使用默认的Executor执行器执行提供的Consumer函数。
  • thenAcceptAsync(带有Executor参数): 这是具有自定义Executor执行器的异步版本,允许你指定一个特定的执行器来执行提供的Consumer函数。
public CompletableFuture<Void> thenRun(Runnable action) {
    return uniRunStage(null, action);
}

public CompletableFuture<Void> thenRunAsync(Runnable action) {
    return uniRunStage(defaultExecutor(), action);
}

public CompletableFuture<Void> thenRunAsync(Runnable action,
                                            Executor executor) {
    return uniRunStage(screenExecutor(executor), action);
}
  • thenRun: 当前CompletableFuture完成后,将执行提供的Runnable操作,但不返回新的结果。相反,返回一个CompletableFuture<Void>,表示这个阶段的操作不产生结果。
  • thenRunAsync: 这是thenRun的异步版本,使用默认的Executor执行器执行提供的Runnable操作。
  • thenRunAsync(带有Executor参数): 这是具有自定义Executor执行器的异步版本,允许你指定一个特定的执行器来执行提供的Runnable操作。

两任务组合

public <U,V> CompletableFuture<V> thenCombine(
    CompletionStage<? extends U> other,
    BiFunction<? super T,? super U,? extends V> fn) {
    return biApplyStage(null, other, fn);
}

public <U,V> CompletableFuture<V> thenCombineAsync(
    CompletionStage<? extends U> other,
    BiFunction<? super T,? super U,? extends V> fn) {
    return biApplyStage(defaultExecutor(), other, fn);
}

public <U,V> CompletableFuture<V> thenCombineAsync(
    CompletionStage<? extends U> other,
    BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
    return biApplyStage(screenExecutor(executor), other, fn);
}

public <U> CompletableFuture<Void> thenAcceptBoth(
    CompletionStage<? extends U> other,
    BiConsumer<? super T, ? super U> action) {
    return biAcceptStage(null, other, action);
}

public <U> CompletableFuture<Void> thenAcceptBothAsync(
    CompletionStage<? extends U> other,
    BiConsumer<? super T, ? super U> action) {
    return biAcceptStage(defaultExecutor(), other, action);
}

public <U> CompletableFuture<Void> thenAcceptBothAsync(
    CompletionStage<? extends U> other,
    BiConsumer<? super T, ? super U> action, Executor executor) {
    return biAcceptStage(screenExecutor(executor), other, action);
}

public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
                                            Runnable action) {
    return biRunStage(null, other, action);
}

public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
                                                 Runnable action) {
    return biRunStage(defaultExecutor(), other, action);
}

public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
                                                 Runnable action,
                                                 Executor executor) {
    return biRunStage(screenExecutor(executor), other, action);
}
  1. thenCombine:组合两个 future,获取两个 future 的返回结果,并返回当前任务的返回值
  2. thenAcceptBoth:组合两个 future,获取两个 future 任务的返回结果,然后处理任务,没有返回值。
  3. runAfterBoth:组合两个 future,不需要获取 future 的结果,只需两个future 处理完任务后,处理该任务。

两任务组合完成一个

把上面的both换成either,当两个任务中,任意一个 future 任务完成的时候,执行任务。

多任务组合

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) 

allOf:等待所有任务完成

anyOf:只要有一个任务完成

商品详情

配置网关

- id: gulimall_host_route # gulimall.com
  uri: lb://gulimall-product
  predicates:
    - Host=gulimall.com,item.gulimall.com

配置线程池:

@ConfigurationProperties(prefix = "gulimall.thread")
// @Component
@Data
public class ThreadPoolConfigProperties {

    private Integer coreSize;

    private Integer maxSize;

    private Integer keepAliveTime;
}

在配置文件中输入这些值:

#config thread pool
gulimall.thread.coreSize=20
gulimall.thread.maxSize=200
gulimall.thread.keepAliveTime=10

配置线程池容器:

@EnableConfigurationProperties(ThreadPoolConfigProperties.class)
@Configuration
public class MyThreadConfig {
    @Bean
    public ThreadPoolExecutor threadPoolExecutor(ThreadPoolConfigProperties pool) {
        return new ThreadPoolExecutor(
                pool.getCoreSize(),
                pool.getMaxSize(),
                pool.getKeepAliveTime(),
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(100000),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()
        );
    }
}

使用:

@Override
public SkuItemVo item(Long skuId) throws ExecutionException, InterruptedException {

    SkuItemVo skuItemVo = new SkuItemVo();

    CompletableFuture<SkuInfoEntity> infoFuture = CompletableFuture.supplyAsync(() -> {
        //1、sku基本信息的获取  pms_sku_info
        SkuInfoEntity info = this.getById(skuId);
        skuItemVo.setInfo(info);
        return info;
    }, executor);


    CompletableFuture<Void> saleAttrFuture = infoFuture.thenAcceptAsync((res) -> {
        //3、获取spu的销售属性组合
        List<SkuItemSaleAttrVo> saleAttrVos = skuSaleAttrValueService.getSaleAttrBySpuId(res.getSpuId());
        skuItemVo.setSaleAttr(saleAttrVos);
    }, executor);


    CompletableFuture<Void> descFuture = infoFuture.thenAcceptAsync((res) -> {
        //4、获取spu的介绍    pms_spu_info_desc
        SpuInfoDescEntity spuInfoDescEntity = spuInfoDescService.getById(res.getSpuId());
        skuItemVo.setDesc(spuInfoDescEntity);
    }, executor);


    CompletableFuture<Void> baseAttrFuture = infoFuture.thenAcceptAsync((res) -> {
        //5、获取spu的规格参数信息
        List<SpuItemAttrGroupVo> attrGroupVos = attrGroupService.getAttrGroupWithAttrsBySpuId(res.getSpuId(), res.getCatalogId());
        skuItemVo.setGroupAttrs(attrGroupVos);
    }, executor);


    // Long spuId = info.getSpuId();
    // Long catalogId = info.getCatalogId();

    //2、sku的图片信息    pms_sku_images
    CompletableFuture<Void> imageFuture = CompletableFuture.runAsync(() -> {
        List<SkuImagesEntity> imagesEntities = skuImagesService.getImagesBySkuId(skuId);
        skuItemVo.setImages(imagesEntities);
    }, executor);
    //等到所有任务都完成
    CompletableFuture.allOf(saleAttrFuture,descFuture,baseAttrFuture,imageFuture).get();

    return skuItemVo;
}
上次编辑于:
贡献者: yunfeidog