网站建设不能使用的广告违禁词,北海百度seo,北京网站seowyhseo,通辽网站开发招聘实现HBase表和RDB表的转化 一、引入
转化为HBase表的三大来源#xff1a;RDB Table、Client API、Files 如何构造通用性的代码模板实现向HBase表的转换#xff0c;是一个值得考虑的问题。这篇文章着重讲解RDB表向HBase表的转换。
首先#xff0c;我们需要分别构造rdb和hba…实现HBase表和RDB表的转化 一、引入
转化为HBase表的三大来源RDB Table、Client API、Files 如何构造通用性的代码模板实现向HBase表的转换是一个值得考虑的问题。这篇文章着重讲解RDB表向HBase表的转换。
首先我们需要分别构造rdb和hbase的对象根据批处理的思想我们可以考虑批量将rdb中的数据导出并且转化为ListPut的格式直接导入HBase表中最后释放资源伪代码模板如下
rdb...
hbase...
rdb.init();
hbase.init();
while(rdb.hasNextBatch()){ListPut batch rdb.nextBatch();hbase.putBatch(batch);
}
hbase.close();
rdb.close();二、代码讲解
1. 目录结构 2. 具体实现
transfer.properties
内含HBase和RDB转换所有配置信息的配置文件因为该配置文件是在启动时就需要进行配置因此我们需要按以下图片进行配置导入配置文件
在Run/Debug Configurations中新建一个Application配置好主类配置好配置文件的具体路径
RDB 接口
public interface RDB extends Com {// 要提升性能需要使用批处理boolean hasNextBatch() throws SQLException;// 是否存在下一个批次ListPut nextBatch() throws SQLException;// 一个put代表往一个hbase表的一行的一个列族的一个列插入一条数据对Hbase来说批次就是ListPut
}RDB 实现类
public class RDBImpl implements RDB {private static Logger logger Logger.getLogger(RDBImpl.class);// JDBC 的基本元素连接对象(装载[驱动]、[URL]、[账号]、[密码])-执行对象(SQL语句)-结果集private Properties config;/*** 它们需要设置成全局变量的原因是它们需要共享*/private Connection con;private PreparedStatement pst;private ResultSet rst;// 定义每个批次处理的记录数的最大数量private int batchSize;// hbase的行键对应rdb的列的列名private String hbaseRowKeyRdbCol;private MapString,MapString,String hbaseRdbColMapping;// RDB配置可以灵活地从外部传入(构造方法)从内部读取(config())public RDBImpl(Properties config) {this.config config;}Overridepublic Properties config() {return config;}/*** 内部资源初始化*/Overridepublic void init() throws Exception{con getConnection();logger.info(RDB 创建 [ 连接 ] 对象成功);pst getStatement(con);logger.info(RDB 创建 [ 执行 ] 对象成功);rst getResult(pst);logger.info(RDB 创建 [ 结果集 ] 成功);batchSize batchSize();hbaseRdbColMapping hbaseRdbColumnsMapping();}Overridepublic void close() {closeAll(rst,pst,con);}private String driver(){return checkAndGetConfig(rdb.driver);}private String url(){return checkAndGetConfig(rdb.url);}private String username(){return checkAndGetConfig(rdb.username);}private String password(){return checkAndGetConfig(rdb.password);}private String sql(){return checkAndGetConfig(rdb.sql);}private int batchSize(){return Integer.parseInt(checkAndGetConfig(rdb.batchSize));}// java.sql下的Connectionprivate Connection getConnection() throws ClassNotFoundException, SQLException {// 装载驱动Class.forName(driver());// 获取并返回连接对象return DriverManager.getConnection(url(),username(),password());}private PreparedStatement getStatement(Connection con) throws SQLException {return con.prepareStatement(sql());}private ResultSet getResult(PreparedStatement statement) throws SQLException {return statement.executeQuery();}/*** hbase 列族和列与rdb中列的映射关系* hbase列族 hbase列 rdb列* return MapString,MapString,String*/private MapString, MapString,String hbaseRdbColumnsMapping(){String mapping checkAndGetConfig(rdb.hbase.columns.mapping);MapString,MapString,String map new HashMap();String[] pss mapping.split(,);for(String ps : pss){String[] pp ps.split(-);String[] p pp[0].split(:);String rdbCol pp[1],hbaseColFamily,hbaseColName;if(p.length1){hbaseRowKeyRdbCol pp[1];}else {hbaseColFamily p[0];hbaseColName p[1];if(!map.containsKey(hbaseColFamily)){map.put(hbaseColFamily,new HashMap());}map.get(hbaseColFamily).put(hbaseColName,rdbCol);}}return map;}/*** 将RDB的列转化为字节数组(需要确定列的数据类型)* param rdbColumn* return* throws SQLException*/private byte[] toBytesFromRdb(String rdbColumn) throws SQLException {Object obj rst.getObject(rdbColumn);if(obj instanceof String){return Bytes.toBytes((String)obj);} else if(obj instanceof Float){return Bytes.toBytes(((Float)obj).floatValue());} else if(obj instanceof Double){return Bytes.toBytes(((Double)obj).doubleValue());} else if(obj instanceof BigDecimal){return Bytes.toBytes((BigDecimal)obj);} else if(obj instanceof Short){return Bytes.toBytes(((Short) obj).shortValue());} else if(obj instanceof Integer){return Bytes.toBytes(((Integer)obj).intValue());} else if(obj instanceof Boolean){return Bytes.toBytes((Boolean)((Boolean) obj).booleanValue());} else {throw new SQLException(HBase不支持转化为字节数组的类型:obj.getClass().getName());}}/*** 将HBase的列名或列族名转化为字节数组* param name* return*/private byte[] toBytes(String name){return Bytes.toBytes(name);}// 最后一个批次的数据最少有一条Overridepublic boolean hasNextBatch() throws SQLException{return rst.next();}Overridepublic ListPut nextBatch() throws SQLException{// 预先分配容量ListPut list new ArrayList(batchSize);int count 0;do{/*** 如何将一行解析为多个put(结合配置文件)* 对每条数据创建一个带行键的put向put中放入HBase列族名HBase列名RDB列名*/Put put new Put(toBytesFromRdb(hbaseRowKeyRdbCol));for (Map.EntryString, MapString, String e : hbaseRdbColMapping.entrySet()) {String columnFamily e.getKey();for (Map.EntryString, String s : e.getValue().entrySet()) {String hbaseColumn s.getKey();String rdbColumn s.getValue();// 需要将内容转变为字节数组传入方法put.addColumn(toBytes(columnFamily),toBytes(hbaseColumn),toBytesFromRdb(rdbColumn));}}list.add(put);}while(countbatchSize rst.next());return list;}}如何理解一行转化为多个put? 结果集的实质 rst.next() 的两个作用
rst.next();
// 1.判定是否存在下一个有效行
// 2.若存在下一个有效行则指向该有效行a. 只通过config作为参数构造rdb b. 以JDBC为核心需要连接对象(驱动URL账号密码)执行对象(SQL)结果集这些都需要被设计为全局变量(因为需要被共享) c. 既实现了RDB接口还实现了RDB的继承接口Com中的init()、close()进行资源的初始化和释放checkAndGetConfig()根据传入的配置文件获取配置信息并且赋值给全局变量。 d. 重点我们还需要对RDB和HBase的映射关系进行解析最终解析出RDB列名HBase列族名HBase列名具体如何解析参考配置文件transfer.properties并将解析出来的名字构造成一个Put对象由于构造Put对象只能放字节数组所以需要转化为字节数组的方法又因为解析RDB的列名需要考虑列的数据类型而解析HBase的列族或列名不需要考虑因此需要有两个转换方法ToBytesFromRDB()和ToBytes()分别实现两种情况的字节数组转化。
HBase接口
public interface HBase extends Com {// RDBImpl的nextBatch()返回的就是ListPut直接放入HBase表即可。void putBatch(ListPut batch) throws IOException;
}HBase实现类
public class HBaseImpl implements HBase {private static Logger loggerHBase Logger.getLogger(HBaseImpl.class);private Properties config;private Connection con;private Table hbaseTable;public HBaseImpl(Properties config) {this.config config;}Overridepublic Properties config() {return config;}Overridepublic void init() throws Exception {con getCon();loggerHBase.info(HBase 创建 [ 连接 ] 成功);hbaseTable checkAndGetTable(con);loggerHBase.info(HBase 创建 [ 数据表 ] 成功);}Overridepublic void close() {closeAll(hbaseTable,con);}private String tableName(){return checkAndGetConfig(hbase.table.name);}private String zkUrl(){return checkAndGetConfig(hbase.zk);}private Connection getCon() throws IOException {// hadoop.conf的configurationConfiguration config HBaseConfiguration.create();config.set(hbase.zookeeper.quorum,zkUrl());return ConnectionFactory.createConnection(config);}private Table checkAndGetTable(Connection con) throws IOException {/*** Admin : HBase DDL*/Admin admin con.getAdmin();TableName tableName TableName.valueOf(tableName());// 通过tableName判定表是否存在if(!admin.tableExists(tableName)){throw new IOException(HBase表不存在异常tableName);}/*** Table : HBase DML DQL*/// 传入的参数可以是TableName tableName,ExecutorService pool(表操作可以并发)return con.getTable(tableName);}Overridepublic void putBatch(ListPut batch) throws IOException{hbaseTable.put(batch);}
}HBase的实现类和RDB的实现类也非常类似 先重写HBase接口中的方法和Com接口中的方法发现往里放数据需要构造一个Table对象而Table对象的构建需要一个连接对象和TableName因此在构造了两个方法tableName()获取配置信息中的TableName(注意此时的TableName是字符串类型)zkUrl()获取zk.url作为配置构造连接对象。
Com接口
public interface Com {Logger logger Logger.getLogger(Com.class);// 获取配置对象Properties config();// 初始化资源void init() throws Exception;// 释放资源void close();default String checkAndGetConfig(String key){if(!config().containsKey(key)){// 因为该方法可能被用于HBase和RDBthrow new RuntimeException(配置项缺失异常key);}String item config().getProperty(key);logger.info(String.format(获取配置项 %s : %s,key,item));return item;}default void closeAll(AutoCloseable...acs){for (AutoCloseable ac : acs) {if (Objects.nonNull(ac)) {try {ac.close();logger.info(String.format(释放 %s 成功,ac.getClass().getName()));} catch (Exception e) {logger.error(释放资源异常e);}}}}
}在Com接口中设计了一些普通方法config()实现配置的导出init()、close()资源的初始化和关闭同样还设计了一些无需实现的默认方法便于实现init()和close()方法。这些方法适用于RDB和HBase的实现类。
RDBToHBase接口
public interface RDBToHBase {// 创建一个RDB对象void setRDB(RDB rdb);// 创建一个HBase对象void setHBase(HBase hbase);// 进行数据的传输void startTransfer();
}RDBToHBase实现类
public class RDBToHBaseImpl implements RDBToHBase {// 日志显示private static Logger loggerRH Logger.getLogger(RDBToHBaseImpl.class);private RDB rdb;private HBase hbase;Overridepublic void setRDB(RDB rdb) {this.rdb rdb;}Overridepublic void setHBase(HBase hbase) {this.hbase hbase;}Overridepublic void startTransfer() {try {rdb.init();loggerRH.info(RDB 初始化成功);hbase.init();loggerRH.info(HBase 初始化成功);loggerRH.info(数据从 RDB 迁移至 HBase 开始...);int count 0;while (rdb.hasNextBatch()) {final ListPut batch rdb.nextBatch();hbase.putBatch(batch);loggerRH.info(String.format(第 %d 批%d 条数据插入成功,count,batch.size()));}loggerRH.info(数据从 RDB 迁移至 HBase 结束...);} catch (Exception e){loggerRH.error(将 RDB 数据批量迁移至 HBase 异常,e);} finally{hbase.close();rdb.close();}}
}AppRDBToHBase 实现类
public class AppRDBToHBase
{private static Logger logger Logger.getLogger(AppRDBToHBase.class);private static void start(String[] args){try {if (Objects.isNull(args) || args.length 0 || Objects.isNull(args[0])) {throw new NullPointerException(配置文件路径空指针异常);}final String PATH args[0];final File file new File(PATH);if (!file.exists() || file.length() 0 || !file.canRead()) {throw new IOException(配置文件不存在、不可读、空白);}Properties config new Properties();// final String path args[0];config.load(new FileReader(file));RDB rdb new RDBImpl(config);HBase hBase new HBaseImpl(config);RDBToHBase rdbToHBase new RDBToHBaseImpl();rdbToHBase.setRDB(rdb);rdbToHBase.setHBase(hBase);rdbToHBase.startTransfer();}catch(Exception e){logger.error(配置异常,e);}}public static void main( String[] args ) {start(args);}
}对于传入的配置文件路径既要检查路径本身也要检查路径代表的文件本身。 通过流的方式将文件进行配置并且利用该配置构造RDB和HBase并进行数据的传输
其他日志文件系统Log.4j的应用
准备需要在Resources模块下配置log4j.properties文件注意 日志文件信息的输出方式有三种logger.error()、logger.info()、logger.warn() 除了对错误信息进行输出之外也要习惯于补充正常信息的输出以增强代码的可读性。log.4j除了在控制台打印日志信息之外还能在磁盘下的日志文件中打印日志信息因此在导入log4j.properties文件之后需要修改日志文件的路径。对于不同类或接口下的logger需要注意进行名字的区分。