当前位置: 首页 > news >正文

用jsp做的购物网站做电影类网站收入怎么样

用jsp做的购物网站,做电影类网站收入怎么样,河南省建设厅网站103,如何找有需求做网站的公司大数据团队搞数据质量评测。自动化质检和监控平台是用django#xff0c;MR也是通过python实现的。(后来发现有orc压缩问题#xff0c;python不知道怎么解决#xff0c;正在改成java版本) 这里展示一个python编写MR的例子吧。 抄一句话#xff1a;Hadoop Streaming是Hadoop提…大数据团队搞数据质量评测。自动化质检和监控平台是用djangoMR也是通过python实现的。(后来发现有orc压缩问题python不知道怎么解决正在改成java版本) 这里展示一个python编写MR的例子吧。 抄一句话Hadoop Streaming是Hadoop提供的一个编程工具它允许用户使用任何可执行文件或者脚本文件作为Mapper和Reducer。   1、首先先介绍一下背景我们的数据是存放在hive里的。hive建表语句如下 我们将会解析元数据和HDFS上的数据进行merge方便处理。这里的partition_key用的是year/month/day。 hive (gulfstream_ods) desc g_order; OK col_name data_type comment order_id bigint 订单id driver_id bigint 司机id司机抢单前该值为0 driver_phone string 司机电话 passenger_id bigint 乘客id passenger_phone string 乘客电话 car_id int 接驾车辆id area int 城市id district string 城市区号 type int 订单时效0 实时 1预约 current_lng decimal(19,6) 乘客发单时的经度 current_lat decimal(19,6) 乘客发单时的纬度 starting_name string 起点名称 starting_lng decimal(19,6) 起点经度 starting_lat decimal(19,6) 起点纬度 dest_name string 终点名称 dest_lng decimal(19,6) 终点经度 dest_lat decimal(19,6) 终点纬度 driver_start_distance int 司机与出发地的路面距离单位米 start_dest_distance int 出发地与终点的路面距离单位米 departure_time string 出发时间预约单的预约时间实时单为发单时间 strive_time string 抢单成功时间 consult_time string 协商时间 arrive_time string 司机点击‘我已到达’的时间 setoncar_time string 上车时间暂时不用 begin_charge_time string 司机点机‘开始计费’的时间 finish_time string 完成时间 year string month string day string # Partition Information # col_name data_type comment year string month string day string   2、我们解析元数据 这里是解析元数据的过程。之后我们把元数据序列化后存入文件desc.gulfstream_ods.g_order我们将会将此配置文件连同MR脚本一起上传到hadoop集群。 import subprocess from subprocess import Popendef desc_table(db, table):process Popen(hive -e desc %s.%s % (db, table),shellTrue, stdoutsubprocess.PIPE, stderrsubprocess.PIPE)stdout, stderr process.communicate()is_column Truestructure_list list()column_list list()for line in stdout.split(\n):value_list list()if not line or len(line.split()) 2:breakif is_column:column_list line.split()is_column Falsecontinueelse:value_list line.split()structure_dict dict(zip(column_list, value_list))structure_list.append(structure_dict)return structure_list   3、下面是hadoop streaming执行脚本。 #!/bin/bashsource /etc/profilesource ~/.bash_profile#hadoop目录echo HADOOP_HOME: $HADOOP_HOMEHADOOP$HADOOP_HOME/bin/hadoopDB$1TABLE$2YEAR$3MONTH$4DAY$5echo $DB--$TABLE--$YEAR--$MONTH--$DAYif [ $DB gulfstream_ods ]then DB_NAMEgulfstreamelse DB_NAME$DBfiTABLE_NAME$TABLE#输入路径input_path/user/xiaoju/data/bi/$DB_NAME/$TABLE_NAME/$YEAR/$MONTH/$DAY/*#标记文件后缀名input_mark_SUCCESSecho $input_path#输出路径output_path/user/bigdata-t/QA/yangfan/$DB_NAME/$TABLE_NAME/$YEAR/$MONTH/$DAYoutput_mark_SUCCESSecho $output_path#性能约束参数capacity_mapper500capacity_reducer200map_num10reducer_num10queue_nameroot.dashujudidiyanjiuyuan-zhinengpingtaibu.datapolicy-develop#启动job namejob_nameDW_Monitor_${DB_NAME}_${TABLE_NAME}_${YEAR}${MONTH}${DAY}mapperpython mapper.py $DB $TABLE_NAMEreducerpython reducer.py$HADOOP fs -rmr $output_path$HADOOP jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.2.jar \ -jobconf mapred.job.name$job_name \ -jobconf mapred.job.queue.name$queue_name \ -jobconf mapred.map.tasks$map_num \ -jobconf mapred.reduce.tasks$reducer_num \ -jobconf mapred.map.capacity$capacity_mapper \ -jobconf mapred.reduce.capacity$capacity_reducer \ -input $input_path \ -output $output_path \ -file ./mapper.py \ -file ./reducer.py \ -file ./utils.py \ -file ./desc.${DB}.${TABLE_NAME} \ -mapper $mapper \ -reducer $reducerif [ $? -ne 0 ]; then echo $DB_NAME $TABLE_NAME $YEAR $MONTH $DAY run faildfi$HADOOP fs -touchz ${output_path}/$output_markrm -rf ./${DB_NAME}.${TABLE_NAME}.${YEAR}-${MONTH}-${DAY}$HADOOP fs -get $output_path/part-00000 ./${DB_NAME}.${TABLE_NAME}.${YEAR}-${MONTH}-${DAY}    4、这里是Wordcount的进阶版本第一个功能是分区域统计订单量第二个功能是在一天中分时段统计订单量。 mapper脚本 # -*- coding:utf-8 -*- #!/usr/bin/env python import sys import json import pickle reload(sys) sys.setdefaultencoding(utf-8)# 将字段和元数据匹配, 返回迭代器 def read_from_input(file, separator, columns):for line in file:if line is None or line :continuedata_list mapper_input(line, separator)if not data_list:continueitem None# 最后3列, 年月日作为partitionkey, 无用if len(data_list) len(columns) - 3:item dict(zip(columns, data_list))elif len(data_list) len(columns):item dict(zip(columns, data_list))if not item:continueyield itemdef index_columns(db, table):with open(desc.%s.%s % (db, table), r) as fr:structure_list deserialize(fr.read())return [column.get(col_name) for column in structure_list]# map入口 def main(separator, columns):items read_from_input(sys.stdin, separator, columns)mapper_result {}for item in items:mapper_plugin_1(item, mapper_result)mapper_plugin_2(item, mapper_result) def mapper_plugin_1(item, mapper_result):# key在现实中可以是不同appkey, 是用来分发到不同的reducer上的, 相同的route用来分发到相同的reducerkey route1area item.get(area)district item.get(district)order_id item.get(order_id)if not area or not district or not order_id:returnmapper_output(key, {area: area, district: district, order_id: order_id, count: 1})def mapper_plugin_2(item, mapper_result):key route2strive_time item.get(strive_time)order_id item.get(order_id)if not strive_time or not order_id:returntry:day_hour strive_time.split(:)[0]mapper_output(key, {order_id: order_id, strive_time: strive_time, count: 1, day_hour: day_hour})except Exception, ex:passdef serialize(data, typejson):if type json:try:return json.dumps(data)except Exception, ex:return elif type pickle:try:return pickle.dumps(data)except Exception, ex:return else:return def deserialize(data, typejson):if type json:try:return json.loads(data)except Exception, ex:return []elif type pickle:try:return pickle.loads(data)except Exception, ex:return []else:return []def mapper_input(line, separator\t):try:return line.split(separator)except Exception, ex:return Nonedef mapper_output(key, data, separator\t):key str(key)data serialize(data)print %s%s%s % (key, separator, data)# print sys.stderr, %s%s%s % (key, separator, data)if __name__ __main__:db sys.argv[1]table sys.argv[2]columns index_columns(db, table)main(||, columns) reducer脚本 #!/usr/bin/env python # vim: set fileencodingutf-8 import sys reload(sys) sys.setdefaultencoding(utf-8) import json import pickle from itertools import groupby from operator import itemgetterdef read_from_mapper(file, separator):for line in file:yield reducer_input(line)def main(separator\t):reducer_result {}line_list read_from_mapper(sys.stdin, separator)for route_key, group in groupby(line_list, itemgetter(0)):if route_key is None:continuereducer_result.setdefault(route_key, {})if route_key route1:reducer_plugin_1(route_key, group, reducer_result)reducer_output(route_key, reducer_result[route_key])if route_key route2:reducer_plugin_2(route_key, group, reducer_result)reducer_output(route_key, reducer_result[route_key]) def reducer_plugin_1(route_key, group, reducer_result):for _, data in group:if data is None or len(data) 0:continueif not data.get(area) or not data.get(district) or not data.get(count):continuekey _.join([data.get(area), data.get(district)])reducer_result[route_key].setdefault(key, 0)reducer_result[route_key][key] int(data.get(count))# print sys.stderr, %s % json.dumps(reducer_result[route_key])def reducer_plugin_2(route_key, group, reducer_result):for _, data in group:if data is None or len(data) 0:continueif not data.get(order_id) or not data.get(strive_time) or not data.get(count) or not data.get(day_hour):continuekey data.get(day_hour)reducer_result[route_key].setdefault(key, {})reducer_result[route_key][key].setdefault(count, 0)reducer_result[route_key][key].setdefault(order_list, [])reducer_result[route_key][key][count] int(data.get(count))if len(reducer_result[route_key][key][order_list]) 100:reducer_result[route_key][key][order_list].append(data.get(order_id))# print sys.stderr, %s % json.dumps(reducer_result[route_key]) def serialize(data, typejson):if type json:try:return json.dumps(data)except Exception, ex:return elif type pickle:try:return pickle.dumps(data)except Exception, ex:return else:return def deserialize(data, typejson):if type json:try:return json.loads(data)except Exception, ex:return []elif type pickle:try:return pickle.loads(data)except Exception, ex:return []else:return []def reducer_input(data, separator\t):data_list data.strip().split(separator, 2)key data_list[0]data deserialize(data_list[1])return [key, data]def reducer_output(key, data, separator\t):key str(key)data serialize(data)print %s\t%s % (key, data)# print sys.stderr, %s\t%s % (key, data)if __name__ __main__:main()   5、上一个版本遭遇了reduce慢的情况原因有两个一是因为route的设置所有相同的route都将分发到同一个reducer造成单个reducer处理压力大性能下降。二是因为集群是搭建在虚拟机上的性能本身就差。可以对这个问题进行改进。改进版本如下方案是在mapper阶段先对数据进行初步的统计缓解reducer的计算压力。 mapper脚本 # -*- coding:utf-8 -*- #!/usr/bin/env python import sys import json import pickle reload(sys) sys.setdefaultencoding(utf-8)# 将字段和元数据匹配, 返回迭代器 def read_from_input(file, separator, columns):for line in file:if line is None or line :continuedata_list mapper_input(line, separator)if not data_list:continueitem None# 最后3列, 年月日作为partitionkey, 无用if len(data_list) len(columns) - 3:item dict(zip(columns, data_list))elif len(data_list) len(columns):item dict(zip(columns, data_list))if not item:continueyield itemdef index_columns(db, table):with open(desc.%s.%s % (db, table), r) as fr:structure_list deserialize(fr.read())return [column.get(col_name) for column in structure_list]# map入口 def main(separator, columns):items read_from_input(sys.stdin, separator, columns)mapper_result {}for item in items:mapper_plugin_1(item, mapper_result)mapper_plugin_2(item, mapper_result)for route_key, route_value in mapper_result.iteritems():for key, value in route_value.iteritems():ret_dict dict()ret_dict[route_key] route_keyret_dict[key] keyret_dict.update(value)mapper_output(route_total, ret_dict)def mapper_plugin_1(item, mapper_result):# key在现实中可以是不同appkey, 是用来分发到不同的reducer上的, 相同的route用来分发到相同的reducerkey route1area item.get(area)district item.get(district)order_id item.get(order_id)if not area or not district or not order_id:returntry:# total统计mapper_result.setdefault(key, {})mapper_result[key].setdefault(_.join([area, district]), {})mapper_result[key][_.join([area, district])].setdefault(count, 0)mapper_result[key][_.join([area, district])].setdefault(order_id, [])mapper_result[key][_.join([area, district])][count] 1if len(mapper_result[key][_.join([area, district])][order_id]) 10:mapper_result[key][_.join([area, district])][order_id].append(order_id)except Exception, ex:passdef mapper_plugin_2(item, mapper_result):key route2strive_time item.get(strive_time)order_id item.get(order_id)if not strive_time or not order_id:returntry:day_hour strive_time.split(:)[0]# total统计mapper_result.setdefault(key, {})mapper_result[key].setdefault(day_hour, {})mapper_result[key][day_hour].setdefault(count, 0)mapper_result[key][day_hour].setdefault(order_id, [])mapper_result[key][day_hour][count] 1if len(mapper_result[key][day_hour][order_id]) 10:mapper_result[key][day_hour][order_id].append(order_id)except Exception, ex:passdef serialize(data, typejson):if type json:try:return json.dumps(data)except Exception, ex:return elif type pickle:try:return pickle.dumps(data)except Exception, ex:return else:return def deserialize(data, typejson):if type json:try:return json.loads(data)except Exception, ex:return []elif type pickle:try:return pickle.loads(data)except Exception, ex:return []else:return []def mapper_input(line, separator\t):try:return line.split(separator)except Exception, ex:return Nonedef mapper_output(key, data, separator\t):key str(key)data serialize(data)print %s%s%s % (key, separator, data)# print sys.stderr, %s%s%s % (key, separator, data)if __name__ __main__:db sys.argv[1]table sys.argv[2]columns index_columns(db, table)main(||, columns) reducer脚本 #!/usr/bin/env python # vim: set fileencodingutf-8 import sys reload(sys) sys.setdefaultencoding(utf-8) import json import pickle from itertools import groupby from operator import itemgetterdef read_from_mapper(file, separator):for line in file:yield reducer_input(line)def main(separator\t):reducer_result {}line_list read_from_mapper(sys.stdin, separator)for route_key, group in groupby(line_list, itemgetter(0)):if route_key is None:continuereducer_result.setdefault(route_key, {})if route_key route_total:reducer_total(route_key, group, reducer_result)reducer_output(route_key, reducer_result[route_key])def reducer_total(route_key, group, reducer_result):for _, data in group:if data is None or len(data) 0:continueif data.get(route_key) route1:reducer_result[route_key].setdefault(data.get(route_key), {})reducer_result[route_key][data.get(key)].setdefault(count, 0)reducer_result[route_key][data.get(key)].setdefault(order_id, [])reducer_result[route_key][data.get(key)][count] data.get(count)for order_id in data.get(order_id):if len(reducer_result[route_key][data.get(key)][order_id]) 10:reducer_result[route_key][data.get(key)][order_id].append(order_id)elif data.get(route_key) route2:reducer_result[route_key].setdefault(data.get(route_key), {})reducer_result[route_key][data.get(key)].setdefault(count, 0)reducer_result[route_key][data.get(key)].setdefault(order_id, [])reducer_result[route_key][data.get(key)][count] data.get(count)for order_id in data.get(order_id):if len(reducer_result[route_key][data.get(key)][order_id]) 10:reducer_result[route_key][data.get(key)][order_id].append(order_id)else:passdef serialize(data, typejson):if type json:try:return json.dumps(data)except Exception, ex:return elif type pickle:try:return pickle.dumps(data)except Exception, ex:return else:return def deserialize(data, typejson):if type json:try:return json.loads(data)except Exception, ex:return []elif type pickle:try:return pickle.loads(data)except Exception, ex:return []else:return []def reducer_input(data, separator\t):data_list data.strip().split(separator, 2)key data_list[0]data deserialize(data_list[1])return [key, data]def reducer_output(key, data, separator\t):key str(key)data serialize(data)print %s\t%s % (key, data)# print sys.stderr, %s\t%s % (key, data)if __name__ __main__:main()   遇到的问题 1、The DiskSpace /user/bigdata/qa quota of  is exceeded 在reducer结束后遭遇如上问题是因为HDFS  路径下的disk容量已经被沾满释放容量即可;  转载于:https://www.cnblogs.com/kangoroo/p/6151104.html
http://www.zqtcl.cn/news/531106/

相关文章:

  • 阿里巴巴的网站建设与维护北京发布会直播回放
  • 深圳技术支持 骏域网站建设微信官方公众号
  • dns解析失败登录不了网站推广网站平台有哪些
  • 网站建设许可证网页设计找工作
  • 想通过网站卖自己做的东西网络公司如何建网站
  • 商务网站开发实训任务书网站建设验收合格确认书
  • 手机网站百度关键词排名查询wordpress 敏感词
  • 网站分页导航常州网约车哪个平台最好
  • 上海 网站开发设计方案参考网站
  • 网站一键备案外呼电销系统
  • 淘宝客购物网站源码网站项目开发的一般流程
  • 如何更改公司网站内容网站开发需要哪些文档
  • 洛阳建设工程网站宁波led网站建设
  • 五莲网站建设报价wordpress dux 5.3
  • 怎样做当地网站推广做电商要关注哪些网站
  • 安徽鸿顺鑫城建设集团网站设计外贸英文网站
  • 浦口区教育局网站集约化建设南京淄博网站建设方案
  • 学校网站建设的风险分析wordpress侧边栏显示单个分类列表
  • php网站安装图解帮别人建设网站多少利润
  • vs做的网站如何二手优品哪个网站做
  • 建设银行内部网站6建筑学院官网
  • 大学生做微商网站金华seo扣费
  • 以前老网站邯郸推广网络宣传哪家好
  • 网站建设技术网站建设奉节网站建设公司
  • 动漫视频网站模板动漫制作专业什么电脑最适合
  • 合网站建设注册一个500万的公司需要多少钱
  • 《网站推广策划》wordpress 写博客
  • 网站开发工程师面试问哪些问题免费下载软件商店安装
  • 建网站公司要钱吗公司制作网站跟企业文化的关系
  • 网站改版对用户的影响网站评论怎么做的