国外的智慧城市建设网站,北京电商网站开发价格,淮南医院网站建设,山西省财政厅网站三基建设专栏文章目录 场景#xff1a;解决方案 场景#xff1a;
前段时间在做一个数据同步工具#xff0c;其中一个服务的任务是调用A服务的接口#xff0c;将数据库中指定数据请求过来#xff0c;交给kafka去判断哪些数据是需要新增#xff0c;哪些数据是需要修改的。
刚开始的设… 文章目录 场景解决方案 场景
前段时间在做一个数据同步工具其中一个服务的任务是调用A服务的接口将数据库中指定数据请求过来交给kafka去判断哪些数据是需要新增哪些数据是需要修改的。
刚开始的设计思路是,我创建多个服务同时去请求A服务的接口每个服务都请求到全量数据由于这些服务都注册在xxl-job上而且采用的是分片广播的路由策略那么每个服务就可以只处理请求到的所有数据中id%服务总数分片索引的部分数据然后交给kafka由kafka决定这条数据应该放到哪个分区上。
解决方案
最近学了线程池后回过头来思考认为之前的方案还有很大的优化空间。
1.当数据量很大时一次性查询所有数据会导致数据库的负载过大而使用分页查询每次只查询部分数据可以减轻数据库的负担从而提高数据库的性能和响应速度所以请求数据方每次分页查询少量数据这样可以整体降低请求数据的时间。第一次优化.之前是每个服务都要把全量数据请求过来假设全量数据1000w条一个服务请求数据需要100s我开5个服务那请求数据的总时长就是500s。现在把1000w条数据均分给5个服务那1个服务就只需要请求200w条数据耗时20s那所有服务的请求总时长就是100s。总体耗时缩小了5倍。上面说的分页查询就可以实现页面大小假设10w也就是将1000w/10w逻辑上分成了100页每个服务自己的分片索引作为页号每次请求完都给索引加上分片总数例如当前注册了五个服务那分片总数5对于分片索引为1的服务来说请求的页号为16111621。。。对于分片索引为2的服务来说请求的页号为271217。。。对于分片索引为3的服务来说请求的页号为381318。。。。对于分片索引为4的服务来说请求的页号为491419。。。。对于分片索引为5的服务来说请求的页号为5101520.。。这样1000w条数据就均分到每个服务上了。对于每个服务都是单线程去请求数据就可以将请求操作以及页号总服务数的操作写在一个while循环里一直请求数据直到请求的数据为空时也就是页号超过100了退出while。 //单线程情况下while(true){String body HttpUtil.get(remoteURL?pageSize100000pageNumshardIndex);
// logger.info(body:{},body);//2.获取返回结果的messageJSONObject jsonObject new JSONObject();
// if (StrUtil.isNotBlank(body)) {jsonObject JSONUtil.parseObj(body);
// logger.info(name:{},Thread.currentThread().getName());
// }
// logger.info(jsonObject:{},jsonObject);//3.从body中获取dataListTestPO tests JSONUtil.toList(jsonObject.getJSONArray(data), TestPO.class);if(CollectionUtil.isEmpty(tests)){break;}shardIndexshardTotal;}第二次优化 了解了线程池后还可以再优化。之前是一个服务单线程循环请求需要20s假设每次请求10w条需要请求200w/10w也就是20次那一次请求就需要1s。如果使用线程池的话那么耗时还会更小因为当你将任务都交给线程池去执行时多个线程会同时并行去请求各自页的数据假如你只设置了4个线程那这4个线程会同时发起请求获取数据1s会完成4次请求那分给服务的200w5s就请求完了。那5个服务从总耗时500s降到了总耗时5s*525s。 这次优化第一版代码只展示了请求数据的代码其他业务代码没有展示 一直向线程池里扔请求数据的任务当某个任务请求到的数据是空的时候意味着要请求的数据已经没了那就结束循环不再扔请求数据的任务。 //线程共享变量static volatile boolean flag true;XxlJob(value fenpian)public void fenpian() {int shardIndex XxlJobHelper.getShardIndex();
// int shardTotal XxlJobHelper.getShardTotal();//分片总数int shardTotal 4;AtomicInteger pageNum new AtomicInteger(shardIndex);//多线程情况下
// ListCompletableFuturecompletableFutureListnew ArrayList();while (flag){CompletableFutureVoid future CompletableFuture.runAsync(() - {String body HttpUtil.get(remoteURL ?pageSize1000pageNum pageNum.getAndAdd(shardTotal));JSONObject jsonObject new JSONObject();jsonObject JSONUtil.parseObj(body);ListTestPO tests JSONUtil.toList(jsonObject.getJSONArray(data), TestPO.class);logger.info(tests的size:{},tests.size());if(CollectionUtil.isEmpty(tests)){flagfalse;}},executorService);completableFutureList.add(future);}CompletableFuture[] completableFutures completableFutureList.toArray(new CompletableFuture[completableFutureList.size()]);CompletableFuture.allOf(completableFutures).join();logger.info(任务结束);executorService.shutdown();上面代码会有一个问题就是while循环往线程池里扔任务所有线程在执行时会在请求数据那里”停留“一段时间“停留期间”还会一直循环向线程池扔任务当线程执行完某次请求得到空数据结束循环时等待队列中还排着大堆任务等着去请求数据。
为了解决这个问题我改用了for循环提交任务提前根据请求数据总量、每次读取的条数以及服务总数得到每个服务需要执行的任务数。 第二版代码
XxlJob(value fenpian)public void fenpian() {int shardIndex XxlJobHelper.getShardIndex()1;int shardTotal XxlJobHelper.getShardTotal();//分片总数
// int shardTotal 4;AtomicInteger pageNum new AtomicInteger(shardIndex);//多线程情况下ListCompletableFuturecompletableFutureListnew ArrayList();//总条数double total 10000000;//读取的条数double pageSize1000;double tasks Math.ceil( total / (double) shardTotal / pageSize);logger.info(任务数{},tasks);for(double i0;itasks;i){CompletableFutureVoid future CompletableFuture.runAsync(() - {String url remoteURL ?pageSize1000pageNum pageNum.getAndAdd(shardTotal);logger.info(url:{},threadName:{},url,Thread.currentThread().getName());String body HttpUtil.get(url);JSONObject jsonObject new JSONObject();jsonObject JSONUtil.parseObj(body);ListTestPO tests JSONUtil.toList(jsonObject.getJSONArray(data), TestPO.class);logger.info(tests的size:{},tests.size());},executorService);completableFutureList.add(future);}CompletableFuture[] completableFutures completableFutureList.toArray(new CompletableFuture[completableFutureList.size()]);CompletableFuture.allOf(completableFutures).join();logger.info(任务结束);如有问题请求指正(^^ゞ