淮安市网站建设,crm网站,可视化响应式网站建设,wordpress聚合广告平台大数据团队搞数据质量评测。自动化质检和监控平台是用django#xff0c;MR也是通过python实现的。(后来发现有orc压缩问题#xff0c;python不知道怎么解决#xff0c;正在改成java版本) 这里展示一个python编写MR的例子吧。 抄一句话#xff1a;Hadoop Streaming是Hadoop提…大数据团队搞数据质量评测。自动化质检和监控平台是用djangoMR也是通过python实现的。(后来发现有orc压缩问题python不知道怎么解决正在改成java版本) 这里展示一个python编写MR的例子吧。 抄一句话Hadoop Streaming是Hadoop提供的一个编程工具它允许用户使用任何可执行文件或者脚本文件作为Mapper和Reducer。 1、首先先介绍一下背景我们的数据是存放在hive里的。hive建表语句如下 我们将会解析元数据和HDFS上的数据进行merge方便处理。这里的partition_key用的是year/month/day。 hive (gulfstream_ods) desc g_order;
OK
col_name data_type comment
order_id bigint 订单id
driver_id bigint 司机id司机抢单前该值为0
driver_phone string 司机电话
passenger_id bigint 乘客id
passenger_phone string 乘客电话
car_id int 接驾车辆id
area int 城市id
district string 城市区号
type int 订单时效0 实时 1预约
current_lng decimal(19,6) 乘客发单时的经度
current_lat decimal(19,6) 乘客发单时的纬度
starting_name string 起点名称
starting_lng decimal(19,6) 起点经度
starting_lat decimal(19,6) 起点纬度
dest_name string 终点名称
dest_lng decimal(19,6) 终点经度
dest_lat decimal(19,6) 终点纬度
driver_start_distance int 司机与出发地的路面距离单位米
start_dest_distance int 出发地与终点的路面距离单位米
departure_time string 出发时间预约单的预约时间实时单为发单时间
strive_time string 抢单成功时间
consult_time string 协商时间
arrive_time string 司机点击‘我已到达’的时间
setoncar_time string 上车时间暂时不用
begin_charge_time string 司机点机‘开始计费’的时间
finish_time string 完成时间
year string
month string
day string # Partition Information
# col_name data_type comment year string
month string
day string 2、我们解析元数据 这里是解析元数据的过程。之后我们把元数据序列化后存入文件desc.gulfstream_ods.g_order我们将会将此配置文件连同MR脚本一起上传到hadoop集群。 import subprocess
from subprocess import Popendef desc_table(db, table):process Popen(hive -e desc %s.%s % (db, table),shellTrue, stdoutsubprocess.PIPE, stderrsubprocess.PIPE)stdout, stderr process.communicate()is_column Truestructure_list list()column_list list()for line in stdout.split(\n):value_list list()if not line or len(line.split()) 2:breakif is_column:column_list line.split()is_column Falsecontinueelse:value_list line.split()structure_dict dict(zip(column_list, value_list))structure_list.append(structure_dict)return structure_list 3、下面是hadoop streaming执行脚本。 #!/bin/bashsource /etc/profilesource ~/.bash_profile#hadoop目录echo HADOOP_HOME: $HADOOP_HOMEHADOOP$HADOOP_HOME/bin/hadoopDB$1TABLE$2YEAR$3MONTH$4DAY$5echo $DB--$TABLE--$YEAR--$MONTH--$DAYif [ $DB gulfstream_ods ]then DB_NAMEgulfstreamelse DB_NAME$DBfiTABLE_NAME$TABLE#输入路径input_path/user/xiaoju/data/bi/$DB_NAME/$TABLE_NAME/$YEAR/$MONTH/$DAY/*#标记文件后缀名input_mark_SUCCESSecho $input_path#输出路径output_path/user/bigdata-t/QA/yangfan/$DB_NAME/$TABLE_NAME/$YEAR/$MONTH/$DAYoutput_mark_SUCCESSecho $output_path#性能约束参数capacity_mapper500capacity_reducer200map_num10reducer_num10queue_nameroot.dashujudidiyanjiuyuan-zhinengpingtaibu.datapolicy-develop#启动job namejob_nameDW_Monitor_${DB_NAME}_${TABLE_NAME}_${YEAR}${MONTH}${DAY}mapperpython mapper.py $DB $TABLE_NAMEreducerpython reducer.py$HADOOP fs -rmr $output_path$HADOOP jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.2.jar \ -jobconf mapred.job.name$job_name \ -jobconf mapred.job.queue.name$queue_name \ -jobconf mapred.map.tasks$map_num \ -jobconf mapred.reduce.tasks$reducer_num \ -jobconf mapred.map.capacity$capacity_mapper \ -jobconf mapred.reduce.capacity$capacity_reducer \ -input $input_path \ -output $output_path \ -file ./mapper.py \ -file ./reducer.py \ -file ./utils.py \ -file ./desc.${DB}.${TABLE_NAME} \ -mapper $mapper \ -reducer $reducerif [ $? -ne 0 ]; then echo $DB_NAME $TABLE_NAME $YEAR $MONTH $DAY run faildfi$HADOOP fs -touchz ${output_path}/$output_markrm -rf ./${DB_NAME}.${TABLE_NAME}.${YEAR}-${MONTH}-${DAY}$HADOOP fs -get $output_path/part-00000 ./${DB_NAME}.${TABLE_NAME}.${YEAR}-${MONTH}-${DAY} 4、这里是Wordcount的进阶版本第一个功能是分区域统计订单量第二个功能是在一天中分时段统计订单量。 mapper脚本 # -*- coding:utf-8 -*-
#!/usr/bin/env python
import sys
import json
import pickle
reload(sys)
sys.setdefaultencoding(utf-8)# 将字段和元数据匹配, 返回迭代器
def read_from_input(file, separator, columns):for line in file:if line is None or line :continuedata_list mapper_input(line, separator)if not data_list:continueitem None# 最后3列, 年月日作为partitionkey, 无用if len(data_list) len(columns) - 3:item dict(zip(columns, data_list))elif len(data_list) len(columns):item dict(zip(columns, data_list))if not item:continueyield itemdef index_columns(db, table):with open(desc.%s.%s % (db, table), r) as fr:structure_list deserialize(fr.read())return [column.get(col_name) for column in structure_list]# map入口
def main(separator, columns):items read_from_input(sys.stdin, separator, columns)mapper_result {}for item in items:mapper_plugin_1(item, mapper_result)mapper_plugin_2(item, mapper_result)
def mapper_plugin_1(item, mapper_result):# key在现实中可以是不同appkey, 是用来分发到不同的reducer上的, 相同的route用来分发到相同的reducerkey route1area item.get(area)district item.get(district)order_id item.get(order_id)if not area or not district or not order_id:returnmapper_output(key, {area: area, district: district, order_id: order_id, count: 1})def mapper_plugin_2(item, mapper_result):key route2strive_time item.get(strive_time)order_id item.get(order_id)if not strive_time or not order_id:returntry:day_hour strive_time.split(:)[0]mapper_output(key, {order_id: order_id, strive_time: strive_time, count: 1, day_hour: day_hour})except Exception, ex:passdef serialize(data, typejson):if type json:try:return json.dumps(data)except Exception, ex:return elif type pickle:try:return pickle.dumps(data)except Exception, ex:return else:return def deserialize(data, typejson):if type json:try:return json.loads(data)except Exception, ex:return []elif type pickle:try:return pickle.loads(data)except Exception, ex:return []else:return []def mapper_input(line, separator\t):try:return line.split(separator)except Exception, ex:return Nonedef mapper_output(key, data, separator\t):key str(key)data serialize(data)print %s%s%s % (key, separator, data)# print sys.stderr, %s%s%s % (key, separator, data)if __name__ __main__:db sys.argv[1]table sys.argv[2]columns index_columns(db, table)main(||, columns) reducer脚本 #!/usr/bin/env python
# vim: set fileencodingutf-8
import sys
reload(sys)
sys.setdefaultencoding(utf-8)
import json
import pickle
from itertools import groupby
from operator import itemgetterdef read_from_mapper(file, separator):for line in file:yield reducer_input(line)def main(separator\t):reducer_result {}line_list read_from_mapper(sys.stdin, separator)for route_key, group in groupby(line_list, itemgetter(0)):if route_key is None:continuereducer_result.setdefault(route_key, {})if route_key route1:reducer_plugin_1(route_key, group, reducer_result)reducer_output(route_key, reducer_result[route_key])if route_key route2:reducer_plugin_2(route_key, group, reducer_result)reducer_output(route_key, reducer_result[route_key])
def reducer_plugin_1(route_key, group, reducer_result):for _, data in group:if data is None or len(data) 0:continueif not data.get(area) or not data.get(district) or not data.get(count):continuekey _.join([data.get(area), data.get(district)])reducer_result[route_key].setdefault(key, 0)reducer_result[route_key][key] int(data.get(count))# print sys.stderr, %s % json.dumps(reducer_result[route_key])def reducer_plugin_2(route_key, group, reducer_result):for _, data in group:if data is None or len(data) 0:continueif not data.get(order_id) or not data.get(strive_time) or not data.get(count) or not data.get(day_hour):continuekey data.get(day_hour)reducer_result[route_key].setdefault(key, {})reducer_result[route_key][key].setdefault(count, 0)reducer_result[route_key][key].setdefault(order_list, [])reducer_result[route_key][key][count] int(data.get(count))if len(reducer_result[route_key][key][order_list]) 100:reducer_result[route_key][key][order_list].append(data.get(order_id))# print sys.stderr, %s % json.dumps(reducer_result[route_key])
def serialize(data, typejson):if type json:try:return json.dumps(data)except Exception, ex:return elif type pickle:try:return pickle.dumps(data)except Exception, ex:return else:return def deserialize(data, typejson):if type json:try:return json.loads(data)except Exception, ex:return []elif type pickle:try:return pickle.loads(data)except Exception, ex:return []else:return []def reducer_input(data, separator\t):data_list data.strip().split(separator, 2)key data_list[0]data deserialize(data_list[1])return [key, data]def reducer_output(key, data, separator\t):key str(key)data serialize(data)print %s\t%s % (key, data)# print sys.stderr, %s\t%s % (key, data)if __name__ __main__:main() 5、上一个版本遭遇了reduce慢的情况原因有两个一是因为route的设置所有相同的route都将分发到同一个reducer造成单个reducer处理压力大性能下降。二是因为集群是搭建在虚拟机上的性能本身就差。可以对这个问题进行改进。改进版本如下方案是在mapper阶段先对数据进行初步的统计缓解reducer的计算压力。 mapper脚本 # -*- coding:utf-8 -*-
#!/usr/bin/env python
import sys
import json
import pickle
reload(sys)
sys.setdefaultencoding(utf-8)# 将字段和元数据匹配, 返回迭代器
def read_from_input(file, separator, columns):for line in file:if line is None or line :continuedata_list mapper_input(line, separator)if not data_list:continueitem None# 最后3列, 年月日作为partitionkey, 无用if len(data_list) len(columns) - 3:item dict(zip(columns, data_list))elif len(data_list) len(columns):item dict(zip(columns, data_list))if not item:continueyield itemdef index_columns(db, table):with open(desc.%s.%s % (db, table), r) as fr:structure_list deserialize(fr.read())return [column.get(col_name) for column in structure_list]# map入口
def main(separator, columns):items read_from_input(sys.stdin, separator, columns)mapper_result {}for item in items:mapper_plugin_1(item, mapper_result)mapper_plugin_2(item, mapper_result)for route_key, route_value in mapper_result.iteritems():for key, value in route_value.iteritems():ret_dict dict()ret_dict[route_key] route_keyret_dict[key] keyret_dict.update(value)mapper_output(route_total, ret_dict)def mapper_plugin_1(item, mapper_result):# key在现实中可以是不同appkey, 是用来分发到不同的reducer上的, 相同的route用来分发到相同的reducerkey route1area item.get(area)district item.get(district)order_id item.get(order_id)if not area or not district or not order_id:returntry:# total统计mapper_result.setdefault(key, {})mapper_result[key].setdefault(_.join([area, district]), {})mapper_result[key][_.join([area, district])].setdefault(count, 0)mapper_result[key][_.join([area, district])].setdefault(order_id, [])mapper_result[key][_.join([area, district])][count] 1if len(mapper_result[key][_.join([area, district])][order_id]) 10:mapper_result[key][_.join([area, district])][order_id].append(order_id)except Exception, ex:passdef mapper_plugin_2(item, mapper_result):key route2strive_time item.get(strive_time)order_id item.get(order_id)if not strive_time or not order_id:returntry:day_hour strive_time.split(:)[0]# total统计mapper_result.setdefault(key, {})mapper_result[key].setdefault(day_hour, {})mapper_result[key][day_hour].setdefault(count, 0)mapper_result[key][day_hour].setdefault(order_id, [])mapper_result[key][day_hour][count] 1if len(mapper_result[key][day_hour][order_id]) 10:mapper_result[key][day_hour][order_id].append(order_id)except Exception, ex:passdef serialize(data, typejson):if type json:try:return json.dumps(data)except Exception, ex:return elif type pickle:try:return pickle.dumps(data)except Exception, ex:return else:return def deserialize(data, typejson):if type json:try:return json.loads(data)except Exception, ex:return []elif type pickle:try:return pickle.loads(data)except Exception, ex:return []else:return []def mapper_input(line, separator\t):try:return line.split(separator)except Exception, ex:return Nonedef mapper_output(key, data, separator\t):key str(key)data serialize(data)print %s%s%s % (key, separator, data)# print sys.stderr, %s%s%s % (key, separator, data)if __name__ __main__:db sys.argv[1]table sys.argv[2]columns index_columns(db, table)main(||, columns) reducer脚本 #!/usr/bin/env python
# vim: set fileencodingutf-8
import sys
reload(sys)
sys.setdefaultencoding(utf-8)
import json
import pickle
from itertools import groupby
from operator import itemgetterdef read_from_mapper(file, separator):for line in file:yield reducer_input(line)def main(separator\t):reducer_result {}line_list read_from_mapper(sys.stdin, separator)for route_key, group in groupby(line_list, itemgetter(0)):if route_key is None:continuereducer_result.setdefault(route_key, {})if route_key route_total:reducer_total(route_key, group, reducer_result)reducer_output(route_key, reducer_result[route_key])def reducer_total(route_key, group, reducer_result):for _, data in group:if data is None or len(data) 0:continueif data.get(route_key) route1:reducer_result[route_key].setdefault(data.get(route_key), {})reducer_result[route_key][data.get(key)].setdefault(count, 0)reducer_result[route_key][data.get(key)].setdefault(order_id, [])reducer_result[route_key][data.get(key)][count] data.get(count)for order_id in data.get(order_id):if len(reducer_result[route_key][data.get(key)][order_id]) 10:reducer_result[route_key][data.get(key)][order_id].append(order_id)elif data.get(route_key) route2:reducer_result[route_key].setdefault(data.get(route_key), {})reducer_result[route_key][data.get(key)].setdefault(count, 0)reducer_result[route_key][data.get(key)].setdefault(order_id, [])reducer_result[route_key][data.get(key)][count] data.get(count)for order_id in data.get(order_id):if len(reducer_result[route_key][data.get(key)][order_id]) 10:reducer_result[route_key][data.get(key)][order_id].append(order_id)else:passdef serialize(data, typejson):if type json:try:return json.dumps(data)except Exception, ex:return elif type pickle:try:return pickle.dumps(data)except Exception, ex:return else:return def deserialize(data, typejson):if type json:try:return json.loads(data)except Exception, ex:return []elif type pickle:try:return pickle.loads(data)except Exception, ex:return []else:return []def reducer_input(data, separator\t):data_list data.strip().split(separator, 2)key data_list[0]data deserialize(data_list[1])return [key, data]def reducer_output(key, data, separator\t):key str(key)data serialize(data)print %s\t%s % (key, data)# print sys.stderr, %s\t%s % (key, data)if __name__ __main__:main() 遇到的问题 1、The DiskSpace /user/bigdata/qa quota of is exceeded 在reducer结束后遭遇如上问题是因为HDFS 路径下的disk容量已经被沾满释放容量即可; 转载于:https://www.cnblogs.com/kangoroo/p/6151104.html