建设眼镜网站风格,太原seo排名收费,中国建设银行官企业网站,出名的网站制作正规公司本笔记内容为黑马头条项目的热点文章-实时计算部分
目录
一、实时流式计算
1、概念
2、应用场景
3、技术方案选型
二、Kafka Stream
1、概述
2、Kafka Streams的关键概念
3、KStream
4、Kafka Stream入门案例编写
5、SpringBoot集成Kafka Stream
三、app端热点文章…本笔记内容为黑马头条项目的热点文章-实时计算部分
目录
一、实时流式计算
1、概念
2、应用场景
3、技术方案选型
二、Kafka Stream
1、概述
2、Kafka Streams的关键概念
3、KStream
4、Kafka Stream入门案例编写
5、SpringBoot集成Kafka Stream
三、app端热点文章计算
1、思路说明
2、功能实现 一、实时流式计算 1、概念 一般流式计算会与批量计算相比较。在流式计算模型中输入是持续的可以认为在时间上是无界的也就意味着永远拿不到全量数据去做计算。同时计算结果是持续输出的也即计算结果在时间上也是无界的。流式计算一般对实时性要求较高同时一般是先定义目标计算然后数据到来之后将计算逻辑应用于数据。同时为了提高计算效率往往尽可能采用增量计算代替全量计算。 流式计算就相当于上图的右侧扶梯是可以源源不断的产生数据源源不断的接收数据没有边界。
2、应用场景 日志分析 网站的用户访问日志进行实时的分析计算访问量用户画像留存率等等实时的进行数据分析帮助企业进行决策 大屏看板统计 可以实时的查看网站注册数量订单数量购买数量金额等。 公交实时数据 可以随时更新公交车方位计算多久到达站牌等 实时文章分值计算 头条类文章的分值计算通过用户的行为实时文章的分值分值越高就越被推荐。 3、技术方案选型 Hadoop Apche Storm Storm 是一个分布式实时大数据处理系统可以帮助我们方便地处理海量数据具有高可靠、高容错、高扩展的特点。是流式框架有很高的数据吞吐能力。 Kafka Stream 可以轻松地将其嵌入任何Java应用程序中并与用户为其流应用程序所拥有的任何现有打包部署和操作工具集成。
二、Kafka Stream 1、概述 Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature。它是提供了对存储于Kafka内的数据进行流式处理和分析的功能。 Kafka Stream的特点如下 Kafka Stream提供了一个非常简单而轻量的Library它可以非常方便地嵌入任意Java应用中也可以任意方式打包和部署 除了Kafka外无任何外部依赖 充分利用Kafka分区机制实现水平扩展和顺序性保证 通过可容错的state store实现高效的状态操作如windowed join和aggregation 支持正好一次处理语义 提供记录级的处理能力从而实现毫秒级的低延迟 支持基于事件时间的窗口操作并且可处理晚到的数据late arrival of records 同时提供底层的处理原语Processor类似于Storm的spout和bolt以及高层抽象的DSL类似于Spark的map/group/reduce 2、Kafka Streams的关键概念 源处理器Source Processor源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器。 Sink处理器sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题。 3、KStream
1数据结构类似于map,如下图key-value键值对 2KStream KStream数据流data stream即是一段顺序的可以无限长不断更新的数据集。 数据流中比较常记录的是事件这些事件可以是一次鼠标点击click一次交易或是传感器记录的位置数据。 KStream负责抽象的就是数据流。与Kafka自身topic中的数据一样类似日志每一次操作都是向其中插入insert新数据。 为了说明这一点让我们想象一下以下两个数据记录正在发送到流中 “ alice”1-“” alice“3 如果您的流处理应用是要总结每个用户的价值它将返回4了alice。为什么因为第二条数据记录将不被视为先前记录的更新。insert新数据 4、Kafka Stream入门案例编写
1需求分析求单词个数word count 2引入依赖
在之前的kafka-demo工程的pom文件中引入
dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-streams/artifactIdexclusionsexclusionartifactIdconnect-json/artifactIdgroupIdorg.apache.kafka/groupId/exclusionexclusiongroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactId/exclusion/exclusions
/dependency
3创建原生的kafka staream入门案例
package com.heima.kafka.sample;import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;/*** 流式处理*/
public class KafkaStreamQuickStart {public static void main(String[] args) {//kafka的配置信心Properties prop new Properties();prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,192.168.200.130:9092);prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());prop.put(StreamsConfig.APPLICATION_ID_CONFIG,streams-quickstart);//stream 构建器StreamsBuilder streamsBuilder new StreamsBuilder();//流式计算streamProcessor(streamsBuilder);//创建kafkaStream对象KafkaStreams kafkaStreams new KafkaStreams(streamsBuilder.build(),prop);//开启流式计算kafkaStreams.start();}/*** 流式计算* 消息的内容hello kafka hello itcast* param streamsBuilder*/private static void streamProcessor(StreamsBuilder streamsBuilder) {//创建kstream对象同时指定从那个topic中接收消息KStreamString, String stream streamsBuilder.stream(itcast-topic-input);/*** 处理消息的value*/stream.flatMapValues(new ValueMapperString, IterableString() {Overridepublic IterableString apply(String value) {return Arrays.asList(value.split( ));}})//按照value进行聚合处理.groupBy((key,value)-value)//时间窗口.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))//统计单词的个数.count()//转换为kStream.toStream().map((key,value)-{System.out.println(key:key,vlaue:value);return new KeyValue(key.key().toString(),value.toString());})//发送消息.to(itcast-topic-out);}
}
4测试准备 使用生产者在topic为itcast_topic_input中发送多条消息 使用消费者接收topic为itcast_topic_out 结果 通过流式计算会把生产者的多条消息汇总成一条发送到消费者中输出
5、SpringBoot集成Kafka Stream
1自定配置参数
package com.heima.kafka.config;import lombok.Getter;
import lombok.Setter;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;import java.util.HashMap;
import java.util.Map;/*** 通过重新注册KafkaStreamsConfiguration对象设置自定配置参数*/Setter
Getter
Configuration
EnableKafkaStreams
ConfigurationProperties(prefixkafka)
public class KafkaStreamConfig {private static final int MAX_MESSAGE_SIZE 16* 1024 * 1024;private String hosts;private String group;Bean(name KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {MapString, Object props new HashMap();props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()_stream_aid);props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()_stream_cid);props.put(StreamsConfig.RETRIES_CONFIG, 10);props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());return new KafkaStreamsConfiguration(props);}
}
修改application.yml文件在最下方添加自定义配置
kafka:hosts: 192.168.200.130:9092group: ${spring.application.name}
2新增配置类创建KStream对象进行聚合
package com.heima.kafka.stream;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.time.Duration;
import java.util.Arrays;Configuration
Slf4j
public class KafkaStreamHelloListener {Beanpublic KStreamString,String kStream(StreamsBuilder streamsBuilder){//创建kstream对象同时指定从那个topic中接收消息KStreamString, String stream streamsBuilder.stream(itcast-topic-input);stream.flatMapValues(new ValueMapperString, IterableString() {Overridepublic IterableString apply(String value) {return Arrays.asList(value.split( ));}})//根据value进行聚合分组.groupBy((key,value)-value)//聚合计算时间间隔.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))//求单词的个数.count().toStream()//处理后的结果转换为string字符串.map((key,value)-{System.out.println(key:key,value:value);return new KeyValue(key.key().toString(),value.toString());})//发送消息.to(itcast-topic-out);return stream;}
}
测试
启动微服务正常发送消息可以正常接收到消息
三、app端热点文章计算 1、思路说明 2、功能实现
1.用户行为阅读量评论点赞收藏发送消息以阅读和点赞为例
①在heima-leadnews-behavior微服务中集成kafka生产者配置
修改nacos新增内容
spring:application:name: leadnews-behaviorkafka:bootstrap-servers: 192.168.200.130:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
②修改ApLikesBehaviorServiceImpl新增发送消息
定义消息发送封装类UpdateArticleMess
package com.heima.model.mess;import lombok.Data;Data
public class UpdateArticleMess {/*** 修改文章的字段类型*/private UpdateArticleType type;/*** 文章ID*/private Long articleId;/*** 修改数据的增量可为正负*/private Integer add;public enum UpdateArticleType{COLLECTION,COMMENT,LIKES,VIEWS;}
}
topic常量类
package com.heima.common.constants;public class HotArticleConstants {public static final String HOT_ARTICLE_SCORE_TOPIChot.article.score.topic;}
完整代码如下
package com.heima.behavior.service.impl;import com.alibaba.fastjson.JSON;
import com.heima.behavior.service.ApLikesBehaviorService;
import com.heima.common.constants.BehaviorConstants;
import com.heima.common.constants.HotArticleConstants;
import com.heima.common.redis.CacheService;
import com.heima.model.behavior.dtos.LikesBehaviorDto;
import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.common.enums.AppHttpCodeEnum;
import com.heima.model.mess.UpdateArticleMess;
import com.heima.model.user.pojos.ApUser;
import com.heima.utils.thread.AppThreadLocalUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;Service
Transactional
Slf4j
public class ApLikesBehaviorServiceImpl implements ApLikesBehaviorService {Autowiredprivate CacheService cacheService;Autowiredprivate KafkaTemplateString,String kafkaTemplate;Overridepublic ResponseResult like(LikesBehaviorDto dto) {//1.检查参数if (dto null || dto.getArticleId() null || checkParam(dto)) {return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);}//2.是否登录ApUser user AppThreadLocalUtil.getUser();if (user null) {return ResponseResult.errorResult(AppHttpCodeEnum.NEED_LOGIN);}UpdateArticleMess mess new UpdateArticleMess();mess.setArticleId(dto.getArticleId());mess.setType(UpdateArticleMess.UpdateArticleType.LIKES);//3.点赞 保存数据if (dto.getOperation() 0) {Object obj cacheService.hGet(BehaviorConstants.LIKE_BEHAVIOR dto.getArticleId().toString(), user.getId().toString());if (obj ! null) {return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID, 已点赞);}// 保存当前keylog.info(保存当前key:{} ,{}, {}, dto.getArticleId(), user.getId(), dto);cacheService.hPut(BehaviorConstants.LIKE_BEHAVIOR dto.getArticleId().toString(), user.getId().toString(), JSON.toJSONString(dto));mess.setAdd(1);} else {// 删除当前keylog.info(删除当前key:{}, {}, dto.getArticleId(), user.getId());cacheService.hDelete(BehaviorConstants.LIKE_BEHAVIOR dto.getArticleId().toString(), user.getId().toString());mess.setAdd(-1);}//发送消息数据聚合kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);}/*** 检查参数** return*/private boolean checkParam(LikesBehaviorDto dto) {if (dto.getType() 2 || dto.getType() 0 || dto.getOperation() 1 || dto.getOperation() 0) {return true;}return false;}
}
③修改阅读行为的类ApReadBehaviorServiceImpl发送消息
完整代码
package com.heima.behavior.service.impl;import com.alibaba.fastjson.JSON;
import com.heima.behavior.service.ApReadBehaviorService;
import com.heima.common.constants.BehaviorConstants;
import com.heima.common.constants.HotArticleConstants;
import com.heima.common.redis.CacheService;
import com.heima.model.behavior.dtos.ReadBehaviorDto;
import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.common.enums.AppHttpCodeEnum;
import com.heima.model.mess.UpdateArticleMess;
import com.heima.model.user.pojos.ApUser;
import com.heima.utils.thread.AppThreadLocalUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;Service
Transactional
Slf4j
public class ApReadBehaviorServiceImpl implements ApReadBehaviorService {Autowiredprivate CacheService cacheService;Autowiredprivate KafkaTemplateString,String kafkaTemplate;Overridepublic ResponseResult readBehavior(ReadBehaviorDto dto) {//1.检查参数if (dto null || dto.getArticleId() null) {return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);}//2.是否登录ApUser user AppThreadLocalUtil.getUser();if (user null) {return ResponseResult.errorResult(AppHttpCodeEnum.NEED_LOGIN);}//更新阅读次数String readBehaviorJson (String) cacheService.hGet(BehaviorConstants.READ_BEHAVIOR dto.getArticleId().toString(), user.getId().toString());if (StringUtils.isNotBlank(readBehaviorJson)) {ReadBehaviorDto readBehaviorDto JSON.parseObject(readBehaviorJson, ReadBehaviorDto.class);dto.setCount((short) (readBehaviorDto.getCount() dto.getCount()));}// 保存当前keylog.info(保存当前key:{} {} {}, dto.getArticleId(), user.getId(), dto);cacheService.hPut(BehaviorConstants.READ_BEHAVIOR dto.getArticleId().toString(), user.getId().toString(), JSON.toJSONString(dto));//发送消息数据聚合UpdateArticleMess mess new UpdateArticleMess();mess.setArticleId(dto.getArticleId());mess.setType(UpdateArticleMess.UpdateArticleType.VIEWS);mess.setAdd(1);kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);}
}
2.使用kafkaStream实时接收消息聚合内容
①在leadnews-article微服务中集成kafkaStream (参考kafka-demo)
②定义实体类用于聚合之后的分值封装
package com.heima.model.article.mess;import lombok.Data;Data
public class ArticleVisitStreamMess {/*** 文章id*/private Long articleId;/*** 阅读*/private int view;/*** 收藏*/private int collect;/*** 评论*/private int comment;/*** 点赞*/private int like;
}
修改常量类增加常量
package com.heima.common.constans;public class HotArticleConstants {public static final String HOT_ARTICLE_SCORE_TOPIChot.article.score.topic;public static final String HOT_ARTICLE_INCR_HANDLE_TOPIChot.article.incr.handle.topic;
}
③ 定义stream,接收消息并聚合
package com.heima.article.stream;import com.alibaba.fastjson.JSON;
import com.heima.common.constants.HotArticleConstants;
import com.heima.model.mess.ArticleVisitStreamMess;
import com.heima.model.mess.UpdateArticleMess;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.time.Duration;Configuration
Slf4j
public class HotArticleStreamHandler {Beanpublic KStreamString,String kStream(StreamsBuilder streamsBuilder){//接收消息KStreamString,String stream streamsBuilder.stream(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC);//聚合流式处理stream.map((key,value)-{UpdateArticleMess mess JSON.parseObject(value, UpdateArticleMess.class);//重置消息的key:1234343434 和 value: likes:1return new KeyValue(mess.getArticleId().toString(),mess.getType().name():mess.getAdd());})//按照文章id进行聚合.groupBy((key,value)-key)//时间窗口.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))/*** 自行的完成聚合的计算*/.aggregate(new InitializerString() {/*** 初始方法返回值是消息的value* return*/Overridepublic String apply() {return COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0;}/*** 真正的聚合操作返回值是消息的value*/}, new AggregatorString, String, String() {Overridepublic String apply(String key, String value, String aggValue) {if(StringUtils.isBlank(value)){return aggValue;}String[] aggAry aggValue.split(,);int col 0,com0,lik0,vie0;for (String agg : aggAry) {String[] split agg.split(:);/*** 获得初始值也是时间窗口内计算之后的值*/switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){case COLLECTION:col Integer.parseInt(split[1]);break;case COMMENT:com Integer.parseInt(split[1]);break;case LIKES:lik Integer.parseInt(split[1]);break;case VIEWS:vie Integer.parseInt(split[1]);break;}}/*** 累加操作*/String[] valAry value.split(:);switch (UpdateArticleMess.UpdateArticleType.valueOf(valAry[0])){case COLLECTION:col Integer.parseInt(valAry[1]);break;case COMMENT:com Integer.parseInt(valAry[1]);break;case LIKES:lik Integer.parseInt(valAry[1]);break;case VIEWS:vie Integer.parseInt(valAry[1]);break;}String formatStr String.format(COLLECTION:%d,COMMENT:%d,LIKES:%d,VIEWS:%d, col, com, lik, vie);System.out.println(文章的id:key);System.out.println(当前时间窗口内的消息处理结果formatStr);return formatStr;}}, Materialized.as(hot-atricle-stream-count-001)).toStream().map((key,value)-{return new KeyValue(key.key().toString(),formatObj(key.key().toString(),value));})//发送消息.to(HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC);return stream;}/*** 格式化消息的value数据* param articleId* param value* return*/public String formatObj(String articleId,String value){ArticleVisitStreamMess mess new ArticleVisitStreamMess();mess.setArticleId(Long.valueOf(articleId));//COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0String[] valAry value.split(,);for (String val : valAry) {String[] split val.split(:);switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){case COLLECTION:mess.setCollect(Integer.parseInt(split[1]));break;case COMMENT:mess.setComment(Integer.parseInt(split[1]));break;case LIKES:mess.setLike(Integer.parseInt(split[1]));break;case VIEWS:mess.setView(Integer.parseInt(split[1]));break;}}log.info(聚合消息处理之后的结果为:{},JSON.toJSONString(mess));return JSON.toJSONString(mess);}
}
3.重新计算文章的分值更新到数据库和缓存中
①在ApArticleService添加方法用于更新数据库中的文章分值
/*** 更新文章的分值 同时更新缓存中的热点文章数据* param mess*/
public void updateScore(ArticleVisitStreamMess mess);
实现类方法
/*** 更新文章的分值 同时更新缓存中的热点文章数据* param mess*/
Override
public void updateScore(ArticleVisitStreamMess mess) {//1.更新文章的阅读、点赞、收藏、评论的数量ApArticle apArticle updateArticle(mess);//2.计算文章的分值Integer score computeScore(apArticle);score score * 3;//3.替换当前文章对应频道的热点数据replaceDataToRedis(apArticle, score, ArticleConstants.HOT_ARTICLE_FIRST_PAGE apArticle.getChannelId());//4.替换推荐对应的热点数据replaceDataToRedis(apArticle, score, ArticleConstants.HOT_ARTICLE_FIRST_PAGE ArticleConstants.DEFAULT_TAG);}/*** 替换数据并且存入到redis* param apArticle* param score* param s*/
private void replaceDataToRedis(ApArticle apArticle, Integer score, String s) {String articleListStr cacheService.get(s);if (StringUtils.isNotBlank(articleListStr)) {ListHotArticleVo hotArticleVoList JSON.parseArray(articleListStr, HotArticleVo.class);boolean flag true;//如果缓存中存在该文章只更新分值for (HotArticleVo hotArticleVo : hotArticleVoList) {if (hotArticleVo.getId().equals(apArticle.getId())) {hotArticleVo.setScore(score);flag false;break;}}//如果缓存中不存在查询缓存中分值最小的一条数据进行分值的比较如果当前文章的分值大于缓存中的数据就替换if (flag) {if (hotArticleVoList.size() 30) {hotArticleVoList hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());HotArticleVo lastHot hotArticleVoList.get(hotArticleVoList.size() - 1);if (lastHot.getScore() score) {hotArticleVoList.remove(lastHot);HotArticleVo hot new HotArticleVo();BeanUtils.copyProperties(apArticle, hot);hot.setScore(score);hotArticleVoList.add(hot);}} else {HotArticleVo hot new HotArticleVo();BeanUtils.copyProperties(apArticle, hot);hot.setScore(score);hotArticleVoList.add(hot);}}//缓存到redishotArticleVoList hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());cacheService.set(s, JSON.toJSONString(hotArticleVoList));}
}/*** 更新文章行为数量* param mess*/
private ApArticle updateArticle(ArticleVisitStreamMess mess) {ApArticle apArticle getById(mess.getArticleId());apArticle.setCollection(apArticle.getCollection()null?0:apArticle.getCollection()mess.getCollect());apArticle.setComment(apArticle.getComment()null?0:apArticle.getComment()mess.getComment());apArticle.setLikes(apArticle.getLikes()null?0:apArticle.getLikes()mess.getLike());apArticle.setViews(apArticle.getViews()null?0:apArticle.getViews()mess.getView());updateById(apArticle);return apArticle;}/*** 计算文章的具体分值* param apArticle* return*/
private Integer computeScore(ApArticle apArticle) {Integer score 0;if(apArticle.getLikes() ! null){score apArticle.getLikes() * ArticleConstants.HOT_ARTICLE_LIKE_WEIGHT;}if(apArticle.getViews() ! null){score apArticle.getViews();}if(apArticle.getComment() ! null){score apArticle.getComment() * ArticleConstants.HOT_ARTICLE_COMMENT_WEIGHT;}if(apArticle.getCollection() ! null){score apArticle.getCollection() * ArticleConstants.HOT_ARTICLE_COLLECTION_WEIGHT;}return score;
}
②定义监听接收聚合之后的数据文章的分值重新进行计算
package com.heima.article.listener;import com.alibaba.fastjson.JSON;
import com.heima.article.service.ApArticleService;
import com.heima.common.constants.HotArticleConstants;
import com.heima.model.mess.ArticleVisitStreamMess;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;Component
Slf4j
public class ArticleIncrHandleListener {Autowiredprivate ApArticleService apArticleService;KafkaListener(topics HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC)public void onMessage(String mess){if(StringUtils.isNotBlank(mess)){ArticleVisitStreamMess articleVisitStreamMess JSON.parseObject(mess, ArticleVisitStreamMess.class);apArticleService.updateScore(articleVisitStreamMess);}}
}
结束