当前位置: 首页 > news >正文

网站建站系统程序长宁区网站建设网站制

网站建站系统程序,长宁区网站建设网站制,wordpress安装主题要多久,动态asp.net网站开发在工作中遇到异构数据库同步的问题,从Oracle数据库同步数据到Postgres#xff0c;其中的很多数据库表超过百万#xff0c;并且包含空间字段。经过筛选#xff0c;选择了开源的DataXDataX Web作为基础框架。DataX 是阿里云的开源产品#xff0c;大厂的产品值得信赖#xff…        在工作中遇到异构数据库同步的问题,从Oracle数据库同步数据到Postgres其中的很多数据库表超过百万并且包含空间字段。经过筛选选择了开源的DataXDataX Web作为基础框架。DataX 是阿里云的开源产品大厂的产品值得信赖而且DataX在阿里巴巴集团内被广泛使用承担了所有大数据的离线同步业务并已持续稳定运行了6年之久每天完成同步8w多道作业每日传输数据量超过300TB经过了时间、实践的检验。这里顺便分析一下源码看看大厂的程序员是怎么实现数据库的快速全表查询、写入操作怎么进行多线程管理的。 部分内容参见         GitHub - alibaba/DataX: DataX是阿里云DataWorks数据集成的开源版本。         DataX/introduction.md at master · alibaba/DataX · GitHub DataX/dataxPluginDev.md at master · alibaba/DataX · GitHub 一、DataX介绍 DataX 是阿里云 DataWorks数据集成 的开源版本在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS, databend 等各种异构数据源之间高效的数据同步功能。 ​        DataX 是一个异构数据源离线同步工具致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。 为了解决异构数据源同步问题DataX将复杂的网状的同步链路变成了星型数据链路DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候只需要将此数据源对接到DataX便能跟已有的数据源做到无缝数据同步。 二、源码解析基于DataX v202309版本 DataX本身作为离线数据同步框架采用Framework plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件纳入到整个同步框架中。 ReaderReader为数据采集模块负责采集数据源的数据将数据发送给Framework。Writer Writer为数据写入模块负责不断向Framework取数据并将数据写入到目的端。FrameworkFramework用于连接reader和writer作为两者的数据传输通道并处理缓冲流控并发数据转换等核心技术问题。 2.1 Reader 源码解析 oraclereader插件包括Constant.java、OracleReader.java和OracleReaderErrorCode.java三个Java类。先关注一下OracleReaderOracleReader继承Reader基类在其中通过内部类Task实现读取数据库操作将读取的数据交由框架处理。具体为CommonRdbmsReader.Task来实现。在代码中包含了commonRdbmsReaderTask的初始化及读取数据操作等内容。核心为this.commonRdbmsReaderTask.startRead。 public static class Task extends Reader.Task {private Configuration readerSliceConfig;private CommonRdbmsReader.Task commonRdbmsReaderTask;Overridepublic void init() {this.readerSliceConfig super.getPluginJobConf();this.commonRdbmsReaderTask new CommonRdbmsReader.Task(DATABASE_TYPE ,super.getTaskGroupId(), super.getTaskId());this.commonRdbmsReaderTask.init(this.readerSliceConfig);}Overridepublic void startRead(RecordSender recordSender) {int fetchSize this.readerSliceConfig.getInt(com.alibaba.datax.plugin.rdbms.reader.Constant.FETCH_SIZE);this.commonRdbmsReaderTask.startRead(this.readerSliceConfig,recordSender, super.getTaskPluginCollector(), fetchSize);}Overridepublic void post() {this.commonRdbmsReaderTask.post(this.readerSliceConfig);}Overridepublic void destroy() {this.commonRdbmsReaderTask.destroy(this.readerSliceConfig);}} CommonRdbmsReader.Task的startRead方法如下 public void startRead(Configuration readerSliceConfig,RecordSender recordSender,TaskPluginCollector taskPluginCollector, int fetchSize) {String querySql readerSliceConfig.getString(Key.QUERY_SQL);String table readerSliceConfig.getString(Key.TABLE);PerfTrace.getInstance().addTaskDetails(taskId, table , basicMsg);LOG.info(Begin to read record by Sql: [{}\n] {}.,querySql, basicMsg);PerfRecord queryPerfRecord new PerfRecord(taskGroupId, taskId, PerfRecord.PHASE.SQL_QUERY);queryPerfRecord.start();Connection conn DBUtil.getConnection(this.dataBaseType, jdbcUrl,username, password);// session config .etc relatedDBUtil.dealWithSessionConfig(conn, readerSliceConfig,this.dataBaseType, basicMsg);int columnNumber 0;ResultSet rs null;try {rs DBUtil.query(conn, querySql, fetchSize);queryPerfRecord.end();ResultSetMetaData metaData rs.getMetaData();columnNumber metaData.getColumnCount();//这个统计干净的result_Next时间PerfRecord allResultPerfRecord new PerfRecord(taskGroupId, taskId, PerfRecord.PHASE.RESULT_NEXT_ALL);allResultPerfRecord.start();long rsNextUsedTime 0;long lastTime System.nanoTime();while (rs.next()) {rsNextUsedTime (System.nanoTime() - lastTime);this.transportOneRecord(recordSender, rs,metaData, columnNumber, mandatoryEncoding, taskPluginCollector);lastTime System.nanoTime();}allResultPerfRecord.end(rsNextUsedTime);//目前大盘是依赖这个打印而之前这个Finish read record是包含了sql查询和result next的全部时间LOG.info(Finished read record by Sql: [{}\n] {}.,querySql, basicMsg);} catch (Exception e) {throw RdbmsException.asQueryException(this.dataBaseType, e, querySql, table, username);} finally {DBUtil.closeDBResources(null, conn);}} 上述代码可见查询数据库的常规步骤。 1.建立数据库链接 Connection conn DBUtil.getConnection(this.dataBaseType, jdbcUrl,username, password); DBUtil内部通过原生Jdbc实现代码如下 private static synchronized Connection connect(DataBaseType dataBaseType,String url, Properties prop) {try {Class.forName(dataBaseType.getDriverClassName());DriverManager.setLoginTimeout(Constant.TIMEOUT_SECONDS);return DriverManager.getConnection(url, prop);} catch (Exception e) {throw RdbmsException.asConnException(dataBaseType, e, prop.getProperty(user), null);}} 2.执行查询操作返回ResultSet ResultSet rs null; try {rs DBUtil.query(conn, querySql, fetchSize); }catch (Exception e) {throw RdbmsException.asQueryException(this.dataBaseType, e, querySql, table, username); } finally {DBUtil.closeDBResources(null, conn); } DBUtil内部查询实现代码如下。 /*** a wrapped method to execute select-like sql statement .** param conn Database connection .* param sql sql statement to be executed* return a {link ResultSet}* throws SQLException if occurs SQLException.*/public static ResultSet query(Connection conn, String sql, int fetchSize)throws SQLException {// 默认3600 s 的query Timeoutreturn query(conn, sql, fetchSize, Constant.SOCKET_TIMEOUT_INSECOND);}/*** a wrapped method to execute select-like sql statement .** param conn Database connection .* param sql sql statement to be executed* param fetchSize* param queryTimeout unit:second* return* throws SQLException*/public static ResultSet query(Connection conn, String sql, int fetchSize, int queryTimeout)throws SQLException {// make sure autocommit is offconn.setAutoCommit(false);Statement stmt conn.createStatement(ResultSet.TYPE_FORWARD_ONLY,ResultSet.CONCUR_READ_ONLY);stmt.setFetchSize(fetchSize);stmt.setQueryTimeout(queryTimeout);return query(stmt, sql);}/*** a wrapped method to execute select-like sql statement .** param stmt {link Statement}* param sql sql statement to be executed* return a {link ResultSet}* throws SQLException if occurs SQLException.*/public static ResultSet query(Statement stmt, String sql)throws SQLException {return stmt.executeQuery(sql);} 3.获取数据元数据信息 ResultSetMetaData metaData rs.getMetaData(); 4.遍历数据对数据进行转换并传递给框架 while (rs.next()) {rsNextUsedTime (System.nanoTime() - lastTime);this.transportOneRecord(recordSender, rs,metaData, columnNumber, mandatoryEncoding, taskPluginCollector);lastTime System.nanoTime(); } 5.在finally块中关闭数据链接 finally {DBUtil.closeDBResources(null, conn); } DBUtil内部关闭链接代码如下。很明显上述代码调用时只传入了connection会造成只关闭链接未关闭ResultSet和Statement有瑕疵。 public static void closeDBResources(ResultSet rs, Statement stmt,Connection conn) {if (null ! rs) {try {rs.close();} catch (SQLException unused) {}}if (null ! stmt) {try {stmt.close();} catch (SQLException unused) {}}if (null ! conn) {try {conn.close();} catch (SQLException unused) {}}}public static void closeDBResources(Statement stmt, Connection conn) {closeDBResources(null, stmt, conn);} 在第2步DBUtil内部的查询代码部分指定了fetchSize参数。 stmt.setFetchSize(fetchSize); fetchSize是实现读取数据源表的关键点之一。简单理解fetchSize定义了本地缓存大小例如fetchSize1000即可简单理解为本地缓存区大小为1000条数据大小当执行ResultSet.next取数据时如果本地缓存中没有数据会从数据库中取出1000条剩余数据大于1000时为1000小于1000时为剩余数据数据放到缓存中接下来的rs.next操作就是从本地缓存中读取数据直至缓存区为空才再次请求数据库。通过减少与数据库的交互次数提升性能。 如果 fetchsize 设置的太小会导致程序频繁地访问数据库影响性能如果 fetchsize 设置的太大则可能会导致内存不足。在oraclereader插件的代码Constant.java中定义了fetchSize的默认值。 package com.alibaba.datax.plugin.reader.oraclereader;public class Constant {public static final int DEFAULT_FETCH_SIZE 1024;} 接下来我们看一下transportOneRecord的代码该代码将一条数据进行转换后传递给Writer。 protected Record transportOneRecord(RecordSender recordSender, ResultSet rs, ResultSetMetaData metaData, int columnNumber, String mandatoryEncoding, TaskPluginCollector taskPluginCollector) {Record record buildRecord(recordSender,rs,metaData,columnNumber,mandatoryEncoding,taskPluginCollector); recordSender.sendToWriter(record);return record; } buildRecord方法将一条数据的各个字段按照类型转换为标准数据方便后续各类数据库写入插件使用实现数据插入。如果数据中包含了不支持的其他字段类型需要在SQL中通过转换函数进行转换否则对于不支持的其他字段类型或在转换过程中出现其他错误这条数据将被作为脏数据扔掉。当然也可以修改buildRecord方法代码让DataX支持更多数据类型的查询和写入。代码如下 protected Record buildRecord(RecordSender recordSender, ResultSet rs, ResultSetMetaData metaData, int columnNumber, String mandatoryEncoding,TaskPluginCollector taskPluginCollector) {Record record recordSender.createRecord();try {for (int i 1; i columnNumber; i) {switch (metaData.getColumnType(i)) {case Types.CHAR:case Types.NCHAR:case Types.VARCHAR:case Types.LONGVARCHAR:case Types.NVARCHAR:case Types.LONGNVARCHAR:String rawData;if (StringUtils.isBlank(mandatoryEncoding)) {rawData rs.getString(i);} else {rawData new String((rs.getBytes(i) null ? EMPTY_CHAR_ARRAY :rs.getBytes(i)), mandatoryEncoding);}record.addColumn(new StringColumn(rawData));break;case Types.CLOB:case Types.NCLOB:record.addColumn(new StringColumn(rs.getString(i)));break;case Types.SMALLINT:case Types.TINYINT:case Types.INTEGER:case Types.BIGINT:record.addColumn(new LongColumn(rs.getString(i)));break;case Types.NUMERIC:case Types.DECIMAL:record.addColumn(new DoubleColumn(rs.getString(i)));break;case Types.FLOAT:case Types.REAL:case Types.DOUBLE:record.addColumn(new DoubleColumn(rs.getString(i)));break;case Types.TIME:record.addColumn(new DateColumn(rs.getTime(i)));break;// for mysql bug, see http://bugs.mysql.com/bug.php?id35115case Types.DATE:if (metaData.getColumnTypeName(i).equalsIgnoreCase(year)) {record.addColumn(new LongColumn(rs.getInt(i)));} else {record.addColumn(new DateColumn(rs.getDate(i)));}break;case Types.TIMESTAMP:record.addColumn(new DateColumn(rs.getTimestamp(i)));break;case Types.BINARY:case Types.VARBINARY:case Types.BLOB:case Types.LONGVARBINARY:record.addColumn(new BytesColumn(rs.getBytes(i)));break;// warn: bit(1) - Types.BIT 可使用BoolColumn// warn: bit(1) - Types.VARBINARY 可使用BytesColumncase Types.BOOLEAN:case Types.BIT:record.addColumn(new BoolColumn(rs.getBoolean(i)));break;case Types.NULL:String stringData null;if (rs.getObject(i) ! null) {stringData rs.getObject(i).toString();}record.addColumn(new StringColumn(stringData));break;case Types.OTHER:if (dataBaseType DataBaseType.PostgreSQL) {record.addColumn(new StringColumn(rs.getString(i)));break;}default:throw DataXException.asDataXException(DBUtilErrorCode.UNSUPPORTED_TYPE,String.format(您的配置文件中的列配置信息有误. 因为DataX 不支持数据库读取这种字段类型. 字段名:[%s], 字段名称:[%s], 字段Java类型:[%s]. 请尝试使用数据库函数将其转换datax支持的类型 或者不同步该字段 .,metaData.getColumnName(i),metaData.getColumnType(i),metaData.getColumnClassName(i)));}}} catch (Exception e) {if (IS_DEBUG) {LOG.debug(read data record.toString() occur exception:, e);}//TODO 这里识别为脏数据靠谱吗taskPluginCollector.collectDirtyRecord(record, e);if (e instanceof DataXException) {throw (DataXException) e;}}return record;} } 未完待续...
http://www.zqtcl.cn/news/834197/

相关文章:

  • 合肥网站建设合肥做网站wordpress 关于页面
  • 软件开发公司赚钱吗北京网站优化解决方案
  • 泰安的网站建设公司哪家好国外ps网站
  • 网站建设制作方案做字典网站开发
  • 安徽道遂建设工程有限公司网站汽车之家网页
  • 仙居网站建设贴吧马鞍山钢铁建设集团有限公司网站
  • 编写网站 语言微网站开发语言
  • 深圳网站建设优化网站建设与维护培训
  • 张家港网站开发wordpress后台登录地址改
  • 郑州做网站的公司哪家好做网站运营工资是不是很低
  • 做网站电销公司开发个网站怎么做
  • 廊坊做网站哪家好深圳快速网站制
  • 网站开发文档实训小结与讨论做网站建设业务员好吗
  • 网站开发知识产权归属好看的个人网站设计
  • 怎么学习企业网站维护江西省城乡建设培训网站官方网站
  • 电脑网站 源码php网站数据库修改
  • 做网站系统的答辩ppt范文商品关键词优化的方法
  • 长沙网站设计公司怎么样如何在网站上推广自己的产品
  • 龙岗网站设计农业网站模板WordPress
  • 摄像头监控设备企业网站模板聊城网站设计公司
  • 做英文网站賺钱建筑设计资料网站
  • 上海专业网站建设平台百度sem认证
  • 个人房产查询系统网站官网推广普通话 奋进新征程
  • 网站设计理念介绍石家庄业之峰装饰公司怎么样
  • 博乐建设工程信息网站ppt软件下载免费版
  • 宿州公司网站建设企业管理培训课程讲座大全
  • 企业网站营销的优缺点Vs做的网站调试时如何适应网页
  • 策划案网站构成怎么写wordpress建个人博客
  • 自己做的网站别人怎么访问美容行业网站建设多少价格
  • 网站建设与运营 教材 崔海口个人建站模板