衡阳网站推广优化公司,软件开发需要用什么软件,宁波网络推广专员,wordpress图像背景
量级庞大的日志通过mysql不足以支撑业务需求#xff0c;以前通过任务调度定时跑批从mysql同步到hive存储#xff0c;这种方式时效性为T1#xff0c;也就是说今天的日志#xff0c;明天才能同步到hive#xff0c;总而言之时效性不高。为了提高时效性#xff0c;改为…背景
量级庞大的日志通过mysql不足以支撑业务需求以前通过任务调度定时跑批从mysql同步到hive存储这种方式时效性为T1也就是说今天的日志明天才能同步到hive总而言之时效性不高。为了提高时效性改为流式计算flink实时同步
那么作为测试人员我们如何保证切换同步方式后的数据正确性呢通过对比新旧表数据是否一致显然是最简单的方法这次改动涉及600多张表每一张表的字段数基本在千以上甚至部分表字段数达万以上面对如此庞大的数据量通过人眼一个个去对比显然不太现实
探索与实践
方案一sql脚本
SELECT column_names, COUNT(*) AS count_diff
FROM (SELECT CONCAT_WS(,,A,B) FROM udc_test.s000 WHERE dt20230814UNION ALL SELECT CONCAT_WS(,,A,B) FROM test.s000 WHERE dt 20230814 and rule_log_id in (select rule_log_id from udc_test.s000)
) AS combined
GROUP BY column_names
HAVING COUNT(column_names) 1select * from (select table1,A,B from udc_test.s000 WHERE dt20230814 and rule_log_id in (123456)union all select table2,A,B from test.s000 WHERE dt20230814 and rule_log_id in (123456)
)a order by a.table1 asc方案二python脚本
from pyhive import hive
from datetime import datetimeif __name__ __main__:#换成生产的连接conn hive.Connection(hostxxx, portxxx, authxxx, databasexxx, usernamexxx,passwordxxx)#这里换成需要比较的表名tableName1 test.ssc_python_compare_fields1tableName2 test.ssc_python_compare_fields2current_time datetime.now()hash_code str(hash(current_time))# 获取表结构query1 desc tableName1query2 desc tableName2cursor conn.cursor()cursor.execute(query1)columns1 [row[0] for row in cursor.fetchall()]cursor.execute(query2)columns2 [row[0] for row in cursor.fetchall()]# 去除掉不需要比较的字段columns1.remove(# Partition Information)columns1.remove(# col_name)columns1.remove(dt)columns2.remove(# Partition Information)columns2.remove(# col_name)columns2.remove(dt)set1 set(columns1)set2 set(columns2)# 取出来表1特有的字段可以保存到文件diffrence1 set1 - set2print(diffrence1)# 取出来表2特有的字段可以保存到文件diffrence2 set2 - set1print(diffrence2)# 取表1和表2共有的字段用于比较差异intersection set1 set2# 生成比较的sqlsql select for element in intersection:sql sql if( nvl(t1. element , hash_code )! nvl( t2. element , hash_code ) , \no\,\yes\) as element , #print(sql)sql sql[:-2]#print(sql)#sql中的dt可以改成具体需要比较的日期sql sql from tableName1 as t1 left join tableName2 \ as t2 on t1.rule_log_idt2.rule_log_id \ and t1.dt \20230815\ and t2.dt \20230815\ and t1.apply_typet2.apply_type where for element in intersection:sql sql t1. element !t2. element or sql sql[:-3]print(sql)sql sql limit 1 # 执行sql,获取到结果如果两列不相等的话值为no相等的话值为yescursor.execute(sql)result cursor.fetchone()# print(result)# 获取上述sql的元数据信息metadatas cursor.descriptionprint()# 遍历结果集查找出比较结果不相同的数据拿到列名index 0while index len(metadatas):if (result[index] ! yes):print(metadatas[index][0])index 1print()