网站开发规格,地方做什么网站,娄底网站建设79ld,俄罗斯最新消息今天简介#xff1a; 这个架构不光适用于游戏运营行业#xff0c;其实任何大数据采集传输的场景都是适用的#xff0c;目前也已经有很多客户正在基于Serverless的架构跑在生产环境#xff0c;或者正走在改造Serverless 架构的路上。 众所周知#xff0c;游戏行业在当今的互联网… 简介 这个架构不光适用于游戏运营行业其实任何大数据采集传输的场景都是适用的目前也已经有很多客户正在基于Serverless的架构跑在生产环境或者正走在改造Serverless 架构的路上。 众所周知游戏行业在当今的互联网行业中算是一棵常青树。在疫情之前的2019年中国游戏市场营收规模约2884.8亿元同比增长17.1%。2020年因为疫情游戏行业更是突飞猛进。玩游戏本就是中国网民最普遍的娱乐方式之一疫情期间更甚。据不完全统计截至2019年中国移动游戏用户规模约6.6亿人占中国总网民规模8.47亿的77.92%可见游戏作为一种低门槛、低成本的娱乐手段已成为大部分人生活中习以为常的一部分。
对于玩家而言市面上的游戏数量多如牛毛那么玩家如何能发现和认知到一款游戏并且持续的玩下去恐怕是所有游戏厂商需要思考的问题。加之2018年游戏版号停发事件游戏厂商更加珍惜每一个已获得版号的游戏产品所以这也使得“深度打磨产品质量”和“提高运营精细程度”这两个游戏产业发展方向成为广大游戏厂商的发展思路无论是新游戏还是老游戏都在努力落实这两点
• 新游戏面向玩家需要提供更充足的推广资源和更完整的游戏内容。 • 老游戏通过用户行为分析投入更多的精力和成本制作更优质的版本内容。
这里我们重点来看新游戏。一家游戏企业辛辛苦苦研发三年等着新游戏发售时一飞冲天。那么问题来了新游戏如何被广大玩家看到
首先来看看游戏行业公司的分类 • 游戏研发商研发游戏的公司生产和制作游戏内容。比如王者荣耀的所有英雄设计、游戏战斗场景、战斗逻辑等全部由游戏研发公司提供。 • 游戏发行商游戏发行商的主要工作分三大块市场工作、运营工作、客服工作。游戏发行商把控游戏命脉市场工作核心是导入玩家运营工作核心是将用户价值最大化、赚取更多利益。 • 游戏平台/渠道商游戏平台和渠道商的核心目的就是曝光游戏让尽量多的人能发现你的游戏。
这三种类型的业务有专注于其中某一领域的独立公司也有能承接全部业务的公司但无论那一种这三者之间的关系是不会变的 所以不难理解想让更多的玩家看到你的游戏游戏发行和运营是关键。通俗来讲如果你的游戏出现在目前所有大家熟知的平台广告中那么最起码游戏的新用户注册数量是很可观的。因此这就引入了一个关键词买量。
根据数据显示2019年月均买量手游数达6000款而2018年仅为4200款。另一方面随着抖音、微博等超级APP在游戏买量市场的资源倾斜也助推手游买量的效果和效率有所提升游戏厂商更愿意使用买量的方式来吸引用户。
但需要注意的是在游戏买量的精准化程度不断提高的同时买量的成本也在节节攀升唯有合理配置买量、渠道与整合营销之间的关系才能将宣发资源发挥到最大的效果。
通俗来讲买量其实就是在各大主流平台投放广告广大用户看到游戏广告后有可能会点击广告然后进入游戏厂商的宣传页面同时会采集用户的一些信息然后游戏厂商对采集到的用户信息进行大数据分析进行进一步的定向推广。
游戏运营核心诉求
游戏厂商花钱买量换来的用户信息以及新用户注册信息是为持续的游戏运营服务的那么这个场景的核心诉求就是采集用户信息的完整性。比如说某游戏厂商一天花5000w投放广告在某平台某时段产生了每秒1w次的广告点击率那么在这个时段内每一个点击广告的用户信息要完整的被采集到然后入库进行后续分析。这就对数据采集系统提出了很高的要求。这其中最核心的一点就是系统暴露接口的环节要能够平稳承载买量期间不定时的流量脉冲。在买量期间游戏厂商通常会在多个平台投放广告每个平台投放广告的时间是不一样的所以就会出现全天不定时的流量脉冲现象。如果这个环节出现问题那么相当于买量的钱就打水漂了。
数据采集系统传统架构 上图是一个相对传统的数据采集系统架构最关键的就是暴露HTTP接口回传数据这部分这部分如果出问题那么采集数据的链路就断了。但这部分往往会面临两个挑战
• 当流量脉冲来的时候这部分是否可以快速扩容以应对流量冲击。 • 游戏运营具备潮汐特性并非天天都在进行这就需要考虑如何优化资源利用率。
通常情况下在游戏有运营活动之前会提前通知运维同学对这个环节的服务增加节点但要增加多少其实是无法预估的只能大概拍一个数字。这是在传统架构下经常会出现的场景这就会导致两个问题
• 流量太大节点加少了导致一部分流量的数据没有采集到。 • 流量没有预期那么大节点加多了导致资源浪费。
数据采集系统Serverless架构
我们可以通过 函数计算函数计算的基本概念可以参考这篇文章来取代传统架构中暴露HTTP回传数据这部分从而完美解决传统架构中存在问题先来看架构图 传统架构中的两个问题均可以通过函数计算百毫秒弹性的特性来解决。我们并不需要去估算营销活动会带来多大的流量也不需要去担心和考虑对数据采集系统的性能运维同学更不需要提前预备ECS。
因为函数计算的极致弹性特性当没有买量、没有营销活动的时候函数计算的运行实例是零。有买量活动时在流量脉冲的情况下函数计算会快速拉起实例来承载流量压力当流量减少时函数计算会及时释放没有请求的实例进行缩容。所以Serverless架构带来的优势有以下三点
• 无需运维介入研发同学就可以很快的搭建出来。 • 无论流量大小均可以平稳的承接。 • 函数计算拉起的实例数量可以紧贴流量大小的曲线做到资源利用率最优化再加上按量计费的模式可以最大程度优化成本。
架构解析
从上面的架构图可以看到整个采集数据阶段分了两个函数来实现第一个函数的作用是单纯的暴露HTTP接口接收数据第二个函数用于处理数据然后将数据发送至消息队列Kafka和数据库RDS。
1.接收数据函数
我们打开函数计算控制台创建一个函数 • 函数类型HTTP即触发器为HTTP • 函数名称receiveData • 运行环境Python3 • 函数实例类型弹性实例 • 函数执行内存512MB • 函数运行超时时间60秒 • 函数单实例并发度1 • 触发器类型HTTP触发器 • 触发器名称defaultTrigger • 认证方式anonymous即无需认证 • 请求方式GETPOST 创建好函数之后我们通过在线编辑器编写代码
# -*- coding: utf-8 -*-
import logging
import json
import urllib.parse
HELLO_WORLD bHello world!\n
def handler(environ, start_response):logger logging.getLogger() context environ[fc.context]request_uri environ[fc.request_uri]for k, v in environ.items():if k.startswith(HTTP_):# process custom request headerspasstry: request_body_size int(environ.get(CONTENT_LENGTH, 0)) except (ValueError): request_body_size 0 # 接收回传的数据request_body environ[wsgi.input].read(request_body_size) request_body_str urllib.parse.unquote(request_body.decode(GBK))request_body_obj json.loads(request_body_str)logger.info(request_body_obj[action])logger.info(request_body_obj[articleAuthorId])status 200 OKresponse_headers [(Content-type, text/plain)]start_response(status, response_headers)return [HELLO_WORLD]
此时的代码非常简单就是接收用户传来的参数我们可以调用接口进行验证 可以在函数的日志查询中看到此次调用的日志 同时我们也可以查看函数的链路追踪来分析每一个步骤的调用耗时比如函数接到请求→冷启动无活跃实例时→准备代码→执行初始化方法→执行入口函数逻辑这个过程 从调用链路图中可以看到刚才的那次请求包含了冷启动的时间因为当时没有活跃实例整个过程耗时418毫秒真正执行入口函数代码的时间为8毫秒。
当再次调用接口时可以看到就直接执行了入口函数的逻辑因为此时已经有实例在运行整个耗时只有2.3毫秒 2.处理数据的函数
第一个函数是通过在函数计算控制台在界面上创建的选择了运行环境是Python3我们可以在官方文档中查看预置的Python3运行环境内置了哪些模块因为第二个函数要操作Kafka和RDS所以需要我们确认对应的模块。
从文档中可以看到内置的模块中包含RDS的SDK模块但是没有Kafka的SDK模块此时就需要我们手动安装Kafka SDK模块并且创建函数也会使用另一种方式。
Funcraft
Funcraft是一个用于支持 Serverless 应用部署的命令行工具能帮助我们便捷地管理函数计算、API 网关、日志服务等资源。它通过一个资源配置文件template.yml协助我们进行开发、构建、部署操作。
所以第二个函数我们需要使用Fun来进行操作整个操作分为四个步骤
• 安装fun工具。 • 编写template.yml模板文件用来描述函数。 • 安装我们需要的第三方依赖。 • 上传部署函数。
安装Fun
Fun提供了三种安装方式
• 通过 npm 包管理安装 —— 适合所有平台Windows/Mac/Linux且已经预装了 npm 的开发者。 • 通过下载二进制安装 —— 适合所有平台Windows/Mac/Linux。 • 通过 Homebrew 包管理器安装 —— 适合 Mac 平台更符合 MacOS 开发者习惯。
文本示例环境为Mac所以使用npm方式安装非常的简单一行命令搞定sudo npm install alicloud/fun -g
安装完成之后。在控制终端输入 fun 命令可以查看版本信息
$ fun --version
3.6.20
在第一次使用 fun 之前需要先执行 fun config 命令进行配置按照提示依次配置 Account ID、Access Key Id、Secret Access Key、 Default Region Name 即可。其中 Account ID、Access Key Id 你可以从函数计算控制台首页的右上方获得
fun config ? Aliyun Account ID 01 ? Aliyun Access Key ID qef6j ? Aliyun Access Key Secret ***UFJG ? Default region name cn-hangzhou ? The timeout in seconds for each SDK client invoking 60 ? The maximum number of retries for each SDK client 3
编写template.yml
新建一个目录在该目录下创建一个名为template.yml的YAML文件该文件主要描述要创建的函数的各项配置说白了就是将函数计算控制台上配置的那些配置信息以YAML格式写在文件里
ROSTemplateFormatVersion: 2015-09-01
Transform: Aliyun::Serverless-2018-04-03
Resources:
FCBigDataDemo:
Type: Aliyun::Serverless::Service
Properties:
Description: local invoke demo
VpcConfig:
VpcId: vpc-xxxxxxxxxxx
VSwitchIds: [ vsw-xxxxxxxxxx ]
SecurityGroupId: sg-xxxxxxxxx
LogConfig:
Project: fcdemo
Logstore: fc_demo_store
dataToKafka:
Type: Aliyun::Serverless::Function
Properties:
Initializer: index.my_initializer
Handler: index.handler
CodeUri: ./
Description:
Runtime: python3
我们来解析以上文件的核心内容
• FCBigDataDemo自定义的服务名称。通过下面的Type属性标明是服务即Aliyun::Serverless::Service。 • PropertiesProperties下的属性都是该服务的各配置项。 • VpcConfig服务的VPC配置包含
VpcIdVPC ID。VSwitchIds交换机ID这里是数组可以配置多个交换机。SecurityGroupId安全组ID。
• LogConfig服务绑定的日志服务SLS配置包含
Project日志服务项目。LogstoreLogStore名称。
• dataToKafka该服务下自定义的函数名称。通过下面的Type属性标明是函数即Aliyun::Serverless::Function。 • PropertiesProperties下的属性都是该函数的各配置项。 • Initializer配置初始化函数。 • Handler配置入口函数。 • Runtime函数运行环境。
目录结构为 安装第三方依赖
服务和函数的模板创建好之后我们来安装需要使用的第三方依赖。在这个示例的场景中第二个函数需要使用Kafka SDK所以可以通过fun工具结合Python包管理工具pip进行安装
fun install --runtime python3 --package-type pip kafka-python
执行命令后有如下提示信息 此时我们会发现在目录下会生成一个.fun文件夹 我们安装的依赖包就在该目录下 部署函数
现在编写好了模板文件以及安装好了我们需要的Kafka SDK后还需要添加我们的代码文件index.py代码内容如下
# -*- coding: utf-8 -*-
import logging
import json
import urllib.parse
from kafka import KafkaProducer
producer None
def my_initializer(context): logger logging.getLogger() logger.info(init kafka producer)global producerproducer KafkaProducer(bootstrap_serversXX.XX.XX.XX:9092,XX.XX.XX.XX:9092,XX.XX.XX.XX:9092)
def handler(event, context):logger logging.getLogger() # 接收回传的数据event_str json.loads(event)event_obj json.loads(event_str)logger.info(event_obj[action])logger.info(event_obj[articleAuthorId])# 向Kafka发送消息global producerproducer.send(ikf-demo, json.dumps(event_str).encode(utf-8))producer.close()return hello world
代码很简单这里做以简单的解析
• my_initializer函数实例被拉起时会先执行该函数然后再执行handler函数 当函数实例在运行时之后的请求都不会执行my_initializer函数 。一般用于各种连接的初始化工作这里将初始化Kafka Producer的方法放在了这里避免反复初始化Produer。 • handler该函数只有两个逻辑接收回传的数据和将数据发送至Kafka的指定Topic。 下面通过fun deploy命令部署函数该命令会做两件事 • 根据template.yml中的配置创建服务和函数。 • 将index.py和.fun上传至函数中。 登录函数计算控制台可以看到通过fun命令部署的服务和函数 进入函数也可以清晰的看到第三方依赖包的目录结构 3.函数之间调用
目前两个函数都创建好了下面的工作就是由第一个函数接收到数据后拉起第二个函数发送消息给Kafka。我们只需要对第一个函数做些许改动即可
# -*- coding: utf-8 -*-
import logging
import json
import urllib.parse
import fc2
HELLO_WORLD bHello world!\n
client None
def my_initializer(context): logger logging.getLogger() logger.info(init fc client)global clientclient fc2.Client(endpointhttp://your_account_id.cn-hangzhou-internal.fc.aliyuncs.com,accessKeyIDyour_ak,accessKeySecretyour_sk)
def handler(environ, start_response):logger logging.getLogger() context environ[fc.context]request_uri environ[fc.request_uri]for k, v in environ.items():if k.startswith(HTTP_):# process custom request headerspasstry: request_body_size int(environ.get(CONTENT_LENGTH, 0)) except (ValueError): request_body_size 0 # 接收回传的数据request_body environ[wsgi.input].read(request_body_size) request_body_str urllib.parse.unquote(request_body.decode(GBK))request_body_obj json.loads(request_body_str)logger.info(request_body_obj[action])logger.info(request_body_obj[articleAuthorId])global clientclient.invoke_function(FCBigDataDemo,dataToKafka,payloadjson.dumps(request_body_str),headers {x-fc-invocation-type: Async})status 200 OKresponse_headers [(Content-type, text/plain)]start_response(status, response_headers)return [HELLO_WORLD]
如上面代码所示对第一个函数的代码做了三个地方的改动
• 导入函数计算的库import fc2 • 添加初始化方法用于创建函数计算Client
def my_initializer(context):logger logging.getLogger()logger.info(init fc client)global clientclient fc2.Client(endpointhttp://your_account_id.cn-hangzhou-internal.fc.aliyuncs.com,accessKeyIDyour_ak,accessKeySecretyour_sk
)
这里需要注意的时当我们在代码里增加了初始化方法后需要在函数配置中指定初始化方法的入口 • 通过函数计算Client调用第二个函数
global clientclient.invoke_function(FCBigDataDemo,dataToKafka,payloadjson.dumps(request_body_str),headers {x-fc-invocation-type: Async}
)
invoke_function函数有四个参数
• 第一个参数调用函数所在的服务名称。 • 第二个参数调用函数的函数名称。 • 第三个参数向调用函数传的数据。 • 第四个参数调用第二个函数Request Header信息。这里主要通过x-fc-invocation-type这个Key来设置是同步调用还是异步调用。这里设置Async为异步调用。
如此设置我们便可以验证通过第一个函数提供的HTTP接口发起请求→采集数据→调用第二个函数→将数据作为消息传给Kafka这个流程了。
使用两个函数的目的
到这里有些同学可能会有疑问为什么需要两个函数而不在第一个函数里直接向Kafka发送数据呢我们先来看这张图 当我们使用异步调用函数时在函数内部会默认先将请求的数据放入消息队列进行第一道削峰填谷然后每一个队列在对应函数实例通过函数实例的弹性拉起多个实例进行第二道削峰填谷。所以这也就是为什么这个架构能稳定承载大并发请求的核心原因之一。
4.配置Kafka
在游戏运营这个场景中数据量是比较大的所以对Kafka的性能要求也是比较高的相比开源自建使用云上的Kafka省去很多的运维操作比如
• 我们不再需要再维护Kafka集群的各个节点。 • 不需要关心主从节点数据同步问题。 • 可以快速、动态扩展Kafka集群规格动态增加Topic动态增加分区数。 • 完善的指标监控功能消息查询功能。
总的来说就是一切SLA都有云上兜底我们只需要关注在消息发送和消息消费即可。
所以我们可以打开Kafka开通界面根据实际场景的需求一键开通Kafka实例开通Kafka后登录控制台在基本信息中可以看到Kafka的接入点
• 默认接入点走VPC内网场景的接入点。 • SSL接入点走公网场景的接入点。
将默认接入点配置到函数计算的第二个函数中即可。
....
producer KafkaProducer(bootstrap_serversXX.XX.XX.XX:9092,XX.XX.XX.XX:9092,XX.XX.XX.XX:9092)
....
然后点击左侧控制台Topic管理创建Topic 将创建好的Topic配置到函数计算的第二个函数中即可。
...
# 第一个参数为Topic名称
producer.send(ikf-demo, json.dumps(event_str).encode(utf-8))
...
上文已经列举过云上Kafka的优势比如动态增加Topic的分区数我们可以在Topic列表中对Topic的分区数进行动态调整 单Topic最大支持到360个分区这是开源自建无法做到的。
接下来点击控制台左侧Consumer Group管理创建Consumer Group 至此云上的Kafka就算配置完毕了即Producer可以往刚刚创建的Topic中发消息了Consumer可以设置刚刚创建的GID以及订阅Topic进行消息接受和消费。
Flink Kafka消费者
在这个场景中Kafka后面往往会跟着Flink所以这里简要给大家介绍一下在Flink中如何创建Kafka Consumer并消费数据。代码片段如下
final ParameterTool parameterTool ParameterTool.fromArgs(args);
String kafkaTopic parameterTool.get(kafka-topic,ikf-demo);
String brokers parameterTool.get(brokers, XX.XX.XX.XX:9092,XX.XX.XX.XX:9092,XX.XX.XX.XX:9092);
Properties kafkaProps new Properties();
kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, ikf-demo);
FlinkKafkaConsumerUserBehaviorEvent kafka new FlinkKafkaConsumer(kafkaTopic, new UserBehaviorEventSchema(), kafkaProps);
kafka.setStartFromLatest();
kafka.setCommitOffsetsOnCheckpoints(false);
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSourceUserBehaviorEvent dataStreamByEventTime env.addSource(kafka);
以上就是构建Flink Kafka Consumer和添加Kafka Source的代码片段还是非常简单的。
压测验证
至此整个数据采集的架构就搭建完毕了下面我们通过压测来检验一下整个架构的性能。这里使用阿里云PTS来进行压测。
创建压测场景
打开PTS控制台点击左侧菜单创建压测/创建PTS场景 在场景配置中将第一个函数计算函数暴露的HTTP接口作为串联链路配置如下图所示 接口配置完后我们来配置施压 • 压力模式 • 并发模式指定有多少并发用户同时发请求。 • RPS模式指定每秒有多少请求数。 • 递增模式在压测过程中可以通过手动调节压力也可以自动按百分比递增压力。 • 最大并发同时有多少个虚拟用户发起请求。 • 递增百分比如果是自动递增的话按这里的百分比递增。 • 单量级持续时长在未完全达到压力全量的时候每一级梯度的压力保持的时长。 • 压测总时长一共需要压测的时长。
这里因为资源成本原因并发用户数设置为2500来进行验证。 从上图压测中的情况来看TPS达到了2w的封顶549w的请求99.99%的请求是成功的那369个异常也可以点击查看都是压测工具请求超时导致的。
总结
至此整个基于Serverless搭建的大数据采集传输的架构就搭建好了并且进行了压测验证整体的性能也是不错的并且整个架构搭建起来也非常简单和容易理解。这个架构不光适用于游戏运营行业其实任何大数据采集传输的场景都是适用的目前也已经有很多客户正在基于Serverless的架构跑在生产环境或者正走在改造Serverless 架构的路上。
作者计缘阿里云解决方案架构师
原文链接
本文为阿里云原创内容未经允许不得转载