网站建设与网页设计的论文,郴州网站制作公司地址,织梦网站文章发布模板下载,苏中建设集团网站网址HBase数据刷写 之前提到过这个方法#xff0c;那么BufferedMutator是什么#xff1f;又应该如何实现呢#xff1f;
写缓存
HBase的每一个put操作实际上是一个RPC操作#xff0c;将客户端的数据传输到服务器再返回结果#xff0c;这只适用于小数据量的操作#xff0c;如…HBase数据刷写 之前提到过这个方法那么BufferedMutator是什么又应该如何实现呢
写缓存
HBase的每一个put操作实际上是一个RPC操作将客户端的数据传输到服务器再返回结果这只适用于小数据量的操作如果数据量多的话每次put都需要建立一次RPC的连接TCP连接而建立连接传输数据是需要时间的因此减少RPC的调用可以提高数据传输的效率减少建立连接的时间和IO消耗。
HBase的客户端API提供了写缓存区put的数据一开始放在缓存区内当数量到达指定的容量或者用户强制提交是才将数据一次性提交到HBase的服务器。这个缓冲区可以通过调用 HTable.setAutoFlush(false) 来开启。而新版HBbase的API中使用了BufferedMutator替换了老版的缓冲区通过BufferedMutator对象提交的数据自动存放在缓冲区中。
BufferedMutator
通过获取 BufferedMutator 对象并调用 mutator.mutate(ListMutation mutations) 方法来进行批量插入数据。可以使用 Put 类型的对象列表作为 mutations 参数进行插入。BufferedMutator 提供了自动管理缓冲区和写入操作的功能可以提高插入数据的性能。
单次一张表批量写入 Configuration conf HBaseConfiguration.create();conf.set(hbase.zookeeper.quorum, zookeeperHost);final BufferedMutator.ExceptionListener listener new BufferedMutator.ExceptionListener() {Overridepublic void onException(RetriesExhaustedWithDetailsException e, BufferedMutator mutator) {for (int i 0; i e.getNumExceptions(); i) {LOG.info(Failed to sent put e.getRow(i) .);}}};BufferedMutatorParams params new BufferedMutatorParams(TABLE).listener(listener);params.writeBufferSize(123123L);try {Connection conn ConnectionFactory.createConnection(conf);BufferedMutator mutator conn.getBufferedMutator(params);Put p new Put(Bytes.toBytes(someRow));p.addColumn(FAMILY, Bytes.toBytes(someQualifier), Bytes.toBytes(some value));mutator.mutate(p);mutator.close();conn.close();} catch (IOException e1) {// TODO Auto-generated catch blocke1.printStackTrace();}
单次多张表批量写入
private static MapString, BufferedMutator tableConnectionMgr new ConcurrentHashMap();
private BufferedMutator getTableConnection(String tableName) throws IOException {if (tableConnectionMgr.get(tableName) ! null) {return tableConnectionMgr.get(tableName);}Connection connection ConnectionFactory.createConnection(config);BufferedMutator table connection.getBufferedMutator(TableName.valueOf(tableName));tableConnectionMgr.put(tableName, table);log.info(hbase table: {} connect established!, tableName);return tableConnectionMgr.get(tableName);
}