网站优化外包价格,免费咨询皮肤科医生在线,做网站找哪个,天津首页优化外包公司我们知道网站用户访问流量是不间断的#xff0c;基于网站的访问日志#xff0c;即 Web log 分析是典型的流式实时计算应用场景。比如百度统计#xff0c;它可以做流量分析、来源分析、网站分析、转化分析。另外还有特定场景分析#xff0c;比如安全分析#xff0c;用来识别…我们知道网站用户访问流量是不间断的基于网站的访问日志即 Web log 分析是典型的流式实时计算应用场景。比如百度统计它可以做流量分析、来源分析、网站分析、转化分析。另外还有特定场景分析比如安全分析用来识别 CC 攻击、 SQL 注入分析、脱库等。这个项目就简单实现一个类似于百度分析的系统。实验原理百度统计(tongji.baidu.com)是百度推出的一款免费的专业网站流量分析工具能够告诉用户访客是如何找到并浏览用户的网站的以及在网站上浏览了哪些页面。这些信息可以帮助用户改善访客在其网站上的使用体验不断提升网站的投资回报率。百度统计提供了几十种图形化报告包括趋势分析、来源分析、页面分析、访客分析、定制分析等多种统计分析服务。这里我们参考百度统计的功能基于 Spark Streaming 简单实现一个分析系统使之包括以下分析功能。流量分析。一段时间内用户网站的流量变化趋势针对不同的 IP 对用户网站的流量进行细分。常见指标是总 PV 和各 IP 的PV。来源分析。各种搜索引擎来源给用户网站带来的流量情况需要精确到具体搜索引擎、具体关键词。通过来源分析用户可以及时了解哪种类型的来源为其带来了更多访客。常见指标是搜索引擎、关键词和终端类型的 PV 。网站分析。各个页面的访问情况包括及时了解哪些页面最吸引访客以及哪些页面最容易导致访客流失从而帮助用户更有针对性地改善网站质量。常见指标是各页面的 PV 。1 日志实时采集Web log 一般在 HTTP 服务器收集比如 Nginx access 日志文件。一个典型的方案是 Nginx 日志文件 Flume Kafka Spark Streaming如下所述接收服务器用 Nginx 根据负载可以部署多台数据落地至本地日志文件每个 Nginx 节点上部署 Flume 使用 tail -f 实时读取 Nginx 日志发送至 KafKa 集群Spark Streaming 程序实时消费 Kafka 集群上的数据实时分析输出结果写入 MySQL 数据库。当然还可以进一步优化比如 CGI 程序直接发日志消息到 Kafka 节省了写访问日志的磁盘开销。这里主要专注 Spark Streaming 的应用所以我们不做详细论述。2 流式分析系统实现我们简单模拟一下数据收集和发送的环节用一个 Python 脚本随机生成 Nginx 访问日志并通过脚本的方式自动上传至 HDFS 然后移动至指定目录。 Spark Streaming 程序监控 HDFS 目录自动处理新的文件。生成 Nginx 日志的 Python 代码如下保存为文件 sample_web_log.py 。#!/usr/bin/env python# -*- coding: utf-8 -*-import randomimport timeclass WebLogGeneration(object):# 类属性由所有类的对象共享site_url_base http://www.xxx.com/# 基本构造函数def __init__(self): # 前面7条是IE,所以大概浏览器类型70%为IE 接入类型上20%为移动设备分别是7和8条,5% 为空# https://github.com/mssola/user_agent/blob/master/all_test.goself.user_agent_dist {0.0:Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; Trident/6.0), 0.1:Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; Trident/6.0), 0.2:Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Trident/4.0; .NET CLR 2.0.50727), 0.3:Mozilla/4.0 (compatible; MSIE6.0; Windows NT 5.0; .NET CLR 1.1.4322), 0.4:Mozilla/5.0 (Windows NT 6.1; Trident/7.0; rv:11.0) like Gecko, 0.5:Mozilla/5.0 (Windows NT 6.1; WOW64; rv:41.0) Gecko/20100101 Firefox/41.0, 0.6:Mozilla/4.0 (compatible; MSIE6.0; Windows NT 5.0; .NET CLR 1.1.4322), 0.7:Mozilla/5.0 (iPhone; CPU iPhone OS 7_0_3 like Mac OS X) AppleWebKit/537.51.1 (KHTML, like Gecko) Version/7.0 Mobile/11B511 Safari/9537.53, 0.8:Mozilla/5.0 (Linux; Android 4.2.1; Galaxy Nexus Build/JOP40D) AppleWebKit/535.19 (KHTML, like Gecko) Chrome/18.0.1025.166 Mobile Safari/535.19, 0.9:Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.85 Safari/537.36, 1: ,} self.ip_slice_list [10, 29, 30, 46, 55, 63, 72, 87, 98,132,156,124,167,143,187,168,190,201,202,214,215,222] self.url_path_list [login.php,view.php,list.php,upload.php,admin/login.php,edit.php,index.html] self.http_refer [ http://www.baidu.com/s?wd{query},http://www.google.cn/search?q{query},http://www.sogou.com/web?query{query},http://one.cn.yahoo.com/s?p{query},http://cn.bing.com/search?q{query}] self.search_keyword [spark,hadoop,hive,spark mlib,spark sql] def sample_ip(self):slice random.sample(self.ip_slice_list, 4) #从ip_slice_list中随机获取4个元素作为一个片断返回return ..join([str(item) for item in slice]) # tododef sample_url(self): return random.sample(self.url_path_list,1)[0] def sample_user_agent(self):dist_uppon random.uniform(0, 1) return self.user_agent_dist[float(%0.1f % dist_uppon)] # 主要搜索引擎referrer参数def sample_refer(self): if random.uniform(0, 1) 0.2: # 只有20% 流量有referreturn -refer_strrandom.sample(self.http_refer,1)query_strrandom.sample(self.search_keyword,1) return refer_str[0].format(queryquery_str[0]) def sample_one_log(self,count 3):time_str time.strftime(%Y-%m-%d %H:%M:%S,time.localtime()) while count 1:query_log {ip} - - [{local_time}] \GET /{url} HTTP/1.1\ 200 0 \{refer}\ \{user_agent}\ \-\.format(ipself.sample_ip(),local_timetime_str,urlself.sample_url(),referself.sample_refer(),user_agentself.sample_user_agent())print query_logcount count -1if __name__ __main__:web_log_gene WebLogGeneration() #while True:# time.sleep(random.uniform(0, 3))web_log_gene.sample_one_log(random.uniform(10, 100))这是一条日志的示例为一行形式各字段间用空格分隔字符串类型的值用双引号包围46.202.124.63 - - [2015-11-26 09:54:27] GET /view.php HTTP/1.1 200 0 http://www.google.cn/search?qhadoop Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; Trident/6.0) -然后需要一个简单的脚本来调用上面的脚本以随机生成日志上传至 HDFS 然后移动到目标目录#!/bin/bash# HDFS命令 HDFS/usr/local/myhadoop/hadoop-2.6.0/bin/hadoop fs# Streaming程序监听的目录注意跟后面Streaming程序的配置要保持一致 streaming_dir”/spark/streaming”# 清空旧数据 $HDFS -rm ${streaming_dir}/tmp/* /dev/null 21$HDFS -rm ${streaming_dir}/* /dev/null 21# 一直运行 while [ 1 ]; do./sample_web_log.py test.log# 给日志文件加上时间戳避免重名tmplogaccess.date %s.log# 先放在临时目录再move至Streaming程序监控的目录下确保原子性# 临时目录用的是监控目录的子目录因为子目录不会被监控$HDFS -put test.log ${streaming_dir}/tmp/$tmplog$HDFS -mv ${streaming_dir}/tmp/$tmplog ${streaming_dir}/echo date %F %T put $tmplog to HDFS succeedsleep 1doneSpark Streaming 程序代码如下所示可以在 bin/spark-shell 交互式环境下运行如果要以 Spark 程序的方式运行按注释中的说明调整一下 StreamingContext 的生成方式即可。启动 bin/spark-shell 时为了避免因 DEBUG 日志信息太多而影响观察输出可以将 DEBUG 日志重定向至文件屏幕上只显示主要输出方法是./bin/spark-shell 2spark-shell-debug.log// 导入类import org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}// 设计计算的周期单位秒val batch 10/** 这是bin/spark-shell交互式模式下创建StreamingContext的方法* 非交互式请使用下面的方法来创建*/val ssc new StreamingContext(sc, Seconds(batch))/*// 非交互式下创建StreamingContext的方法val conf new SparkConf().setAppName(NginxAnay)val ssc new StreamingContext(conf, Seconds(batch))*//** 创建输入DStream是文本文件目录类型* 本地模式下也可以使用本地文件系统的目录比如 file:///home/spark/streaming*/val lines ssc.textFileStream(hdfs:///spark/streaming)/** 下面是统计各项指标调试时可以只进行部分统计方便观察结果*/// 1. 总PVlines.count().print()// 2. 各IP的PV按PV倒序// 空格分隔的第一个字段就是IPlines.map(line {(line.split( )(0), 1)}).reduceByKey(_ _).transform(rdd {rdd.map(ip_pv (ip_pv._2, ip_pv._1)).sortByKey(false).map(ip_pv (ip_pv._2, ip_pv._1))}).print()// 3. 搜索引擎PVval refer lines.map(_.split(\)(3))// 先输出搜索引擎和查询关键词避免统计搜索关键词时重复计算// 输出(host, query_keys)val searchEnginInfo refer.map(r {val f r.split(/)val searchEngines Map( www.google.cn - q, www.yahoo.com - p, cn.bing.com - q, www.baidu.com - wd, www.sogou.com - query) if (f.length 2) {val host f(2) if (searchEngines.contains(host)) {val query r.split(?)(1) if (query.length 0) {val arr_search_q query.split().filter(_.indexOf(searchEngines(host)) 0) if (arr_search_q.length 0)(host, arr_search_q(0).split()(1)) else(host, )} else {(host, )}} else(, )} else(, )})// 输出搜索引擎PVsearchEnginInfo.filter(_._1.length 0).map(p {(p._1, 1)}).reduceByKey(_ _).print()// 4. 关键词PVsearchEnginInfo.filter(_._2.length 0).map(p {(p._2, 1)}).reduceByKey(_ _).print()// 5. 终端类型PVlines.map(_.split(\)(5)).map(agent {val types Seq(iPhone, Android) var r Defaultfor (t r t}(r, 1)}).reduceByKey(_ _).print()// 6. 各页面PVlines.map(line {(line.split(\)(1).split( )(1), 1)}).reduceByKey(_ _).print()// 启动计算,等待执行结束(出错或Ctrl-C退出)ssc.start()ssc.awaitTermination()打开两个终端一个调用上面的 bash 脚本模拟提交日志一个在交互式环境下运行上面的 Streaming 程序。你可以看到各项指标的输出比如某个批次下的输出为(依次对应上面的 6 个计算项)总PV-------------------------------------------Time: 1448533850000 ms-------------------------------------------44374各IP的PV按PV倒序------------------------------------------- Time: 1448533850000 ms------------------------------------------- (72.63.87.30,30)(63.72.46.55,30)(98.30.63.10,29)(72.55.63.46,29)(63.29.10.30,29)(29.30.63.46,29)(55.10.98.87,27)(46.29.98.30,27)(72.46.63.30,27)(87.29.55.10,26)搜索引擎PV------------------------------------------- Time: 1448533850000 ms ------------------------------------------- (cn.bing.com,1745)(www.baidu.com,1773)(www.google.cn,1793)(www.sogou.com,1845)关键词PV-------------------------------------------Time: 1448533850000 ms-------------------------------------------(spark,1426)(hadoop,1455)(spark sql,1429)(spark mlib,1426)(hive,1420)终端类型PV-------------------------------------------Time: 1448533850000 ms-------------------------------------------(Android,4281)(Default,35745)(iPhone,4348)各页面PV-------------------------------------------Time: 1448533850000 ms-------------------------------------------(/edit.php,6435)(/admin/login.php,6271)(/login.php,6320)(/upload.php,6278)(/list.php,6411)(/index.html,6309)(/view.php,6350)查看数据更直观的做法是用图形来展示常见做法是将结果写入外部 DB 然后通过一些图形化报表展示系统展示出来。比如对于终端类型我们可以用饼图展示。对于连续的数据我们也可以用拆线图来展示趋势。比如某页面的PV。除了常规的每个固定周期进行一次统计我们还可以对连续多个周期的数据进行统计。以统计总 PV 为例上面的示例是每 10 秒统计一次可能还需要每分钟统计一次相当于 6 个 10 秒的周期。我们可以利用窗口方法实现不同的代码如下// 窗口方法必须配置checkpint可以这样配置 ssc.checkpoint(hdfs:///spark/checkpoint)// 这是常规每10秒一个周期的PV统计 lines.count().print()// 这是每分钟(连续多个周期)一次的PV统计 lines.countByWindow(Seconds(batch*6), Seconds(batch*6)).print()使用相同的办法运行程序之后我们首先会看到连续 6 次 10 秒周期的 PV 统计输出-------------------------------------------Time: 1448535090000 ms-------------------------------------------1101-------------------------------------------Time: 1448535100000 ms-------------------------------------------816-------------------------------------------Time: 1448535110000 ms-------------------------------------------892-------------------------------------------Time: 1448535120000 ms-------------------------------------------708-------------------------------------------Time: 1448535130000 ms-------------------------------------------881-------------------------------------------Time: 1448535140000 ms-------------------------------------------872在这之后有一个 1 分钟周期的 PV 统计输出它的值刚好是上面 6 次计算结果的总和-------------------------------------------Time: 1448535140000 ms-------------------------------------------5270最后以上内容截选自实验楼教程 【流式实时日志分析系统——《Spark 最佳实践》】教程主要是教你开发一个类似百度统计的系统文章主要截选了其实验原理部分后面还有具体的开发部分开发准备准备生成日志的Python代码启动Spark Shell实验步骤创建日志目录通过 bash 脚本生成日志在 Spark Streaming 中进行日志分析开始生成日志并查看结果作者实验楼链接https://www.jianshu.com/p/241bec487619