招代理网站怎么做,为什么网站要友情链接,教育培训网站抄袭,官网设计公司有哪些HBase模糊查询优化 - 并发查询
HBase查询优化续集#xff0c;继上次优化后查询速度还是很慢#xff0c; 这次优化我们使用并发查询#xff0c;查询HBase库里的各region拆分情况#xff0c;然后对查询的rowkey切分成多段#xff0c;每一段单独去不同的region中查询#x…HBase模糊查询优化 - 并发查询
HBase查询优化续集继上次优化后查询速度还是很慢 这次优化我们使用并发查询查询HBase库里的各region拆分情况然后对查询的rowkey切分成多段每一段单独去不同的region中查询使我们可以并发查询来提升查询速度。
优化后经过测试查询速度大大提升
代码如下
package query;import main.TaskExecutors;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.util.Bytes;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;/*** p* 用来作为并发查询HBase使用原理是使用scan来设置startrow和endrow。* 因为HBase存储rowkey是字典序排序;* 所以如果是单线程查询我们一般设置startrow为rowkey0 , endrow为rowkeyz。* 我们并发scan可以将startrow和endrow拆分成多份例如:* 分为rowkey0 - rowkey9 和 rowkeyA - rowkeyZ 和 rowkeya - rowkeyz这样就可以分为三个线程并发查询* p/** author chun* date 2022/7/21 16:48*/public class ConcurrentScanHBase {private ExecutorService pool null;private String rowKey;//Pair自定了也可以使用javafx.util.Pair需要看服务器上的jdk是oracle还是openjdk,openjdk没有,可以把oracleJDK的Pair类直接复制过来使用private ListPairCharacter, Character rowkeyRanges;private ListScan scanList new ArrayList();private SetScan setScan null;private CountDownLatch countDownLatch;private static final String format yyyyMMddHH;public ConcurrentScanHBase(ExecutorService pool, String rowkey, ListPairCharacter, Character rowkeyRanges, SetScan setScan) {if (setScan null) {throw new NullPointerException(SetScan is NULL);}this.pool pool;this.rowKey rowkey;this.rowkeyRanges rowkeyRanges;this.setScan setScan;init();}private void init() {for (PairCharacter, Character rowkeyRange : rowkeyRanges) {scanList.add(getScann(rowKey.getBytes(), (rowKey rowkeyRange.getKey()).getBytes(), (rowKey rowkeyRange.getValue()).getBytes()));}countDownLatch new CountDownLatch(rowkeyRanges.size());}//如果需要主线程等待此次任务结束调用await()方法;public void await() {try {this.countDownLatch.await();} catch (InterruptedException e) {throw new RuntimeException(e);}}public void exec(ExecScan execScan) {for (Scan scan : scanList) {pool.execute(() - {execScan.exec(scan);countDownLatch.countDown();});}}//只设置startrow和endrow其他设置需要返回后自己设置private Scan getScann(byte[] rowkey, byte[] startRow, byte[] endRow) {Scan scan new Scan();scan.withStartRow(startRow);scan.withStopRow(endRow);PrefixFilter prefixFilter new PrefixFilter(rowkey);scan.setFilter(prefixFilter);setScan.initScan(scan);return scan;}
}匿名内部类接口
package query;import org.apache.hadoop.hbase.client.Scan;/*** 作为执行扫描表的接口使用* 通过设置匿名内部类来加载扫描表的实现过程** author chun* date 2022/7/21 17:52*/
public interface ExecScan {public void exec(Scan scan);
}
package query;import org.apache.hadoop.hbase.client.Scan;/*** 作为ConcurrentScanHBase中init方法的参数接口* 为了方便每个调用者对Scan的初始化而设计* 调用者可以通过匿名内部类的方式使用* 来设置除了startrow和endrow之外的其他参数** author chun* date 2022/7/21 17:52*/
public interface SetScan {public void initScan(Scan scan);
}
Pair类
package query;import javafx.beans.NamedArg;import java.io.Serializable;/*** author chun* date 2022/7/22 10:10*/
public class PairK, V implements Serializable {/*** Key of this codePair/code.*/private K key;/*** Gets the key for this pair.** return key for this pair*/public K getKey() {return key;}/*** Value of this this codePair/code.*/private V value;/*** Gets the value for this pair.** return value for this pair*/public V getValue() {return value;}/*** Creates a new pair** param key The key for this pair* param value The value to use for this pair*/public Pair(NamedArg(key) K key, NamedArg(value) V value) {this.key key;this.value value;}/*** pcodeString/code representation of this* codePair/code./p** pThe default name/value delimiter is always used./p** return codeString/code representation of this codePair/code*/Overridepublic String toString() {return key value;}/*** pGenerate a hash code for this codePair/code./p** pThe hash code is calculated using both the name and* the value of the codePair/code./p** return hash code for this codePair/code*/Overridepublic int hashCode() {// names hashCode is multiplied by an arbitrary prime number (13)// in order to make sure there is a difference in the hashCode between// these two parameters:// name: a value: aa// name: aa value: areturn key.hashCode() * 13 (value null ? 0 : value.hashCode());}/*** pTest this codePair/code for equality with another* codeObject/code./p** pIf the codeObject/code to be tested is not a* codePair/code or is codenull/code, then this method* returns codefalse/code./p** pTwo codePair/codes are considered equal if and only if* both the names and values are equal./p** param o the codeObject/code to test for* equality with this codePair/code* return codetrue/code if the given codeObject/code is* equal to this codePair/code else codefalse/code*/Overridepublic boolean equals(Object o) {if (this o) return true;if (o instanceof Pair) {Pair pair (Pair) o;if (key ! null ? !key.equals(pair.key) : pair.key ! null) return false;if (value ! null ? !value.equals(pair.value) : pair.value ! null) return false;return true;}return false;}
}
使用
public static void main(String[] args) {ArrayListPairCharacter, Character pairs new ArrayList();pairs.add(new Pair(-, 3));pairs.add(new Pair(4, 9));pairs.add(new Pair(A, J));pairs.add(new Pair(K, T));pairs.add(new Pair(U, Z));pairs.add(new Pair(a, j));pairs.add(new Pair(k, t));pairs.add(new Pair(u, z));ConcurrentScanHBase concurrentScanHBase new ConcurrentScanHBase(TaskExecutors.getPool(), baidu.com, pairs, new SetScan() {Overridepublic void initScan(Scan scan) {scan.setCacheBlocks(false);scan.setBatch(6000);scan.addFamily(Bytes.toBytes(D));}});concurrentScanHBase.exec(new ExecScan() {Overridepublic void exec(Scan scan) {try (Table table HBaseHelper.getConnection().getTable(TableName.valueOf(HBaseHelper.TABLE_NAME));ResultScanner scanner table.getScanner(scan)) {int index 0;for (Result[] results scanner.next(6000); results.length ! 0 index 3; results scanner.next(6000)) {for (Result result : results) {for (Cell cell : result.rawCells()) {try {byte[] bytes Bytes.copy(cell.getValueArray(), cell.getValueOffset(),cell.getValueLength());System.out.println(bytes);} catch (Exception e) {e.printStackTrace();}}}}} catch (IOException e) {e.printStackTrace();}}});try {concurrentScanHBase.await();} catch (RuntimeException e) {e.printStackTrace();}}