网站建设总体要求,龙岗优化网站建设,php+mysql网站开发全程实例.pdf,wordpress 多语言基于HBase和ElasticSearch构建大数据实时检索项目 一、项目说明二、环境搭建三、编写程序四、测试流程 一、项目说明
利用HBase存储海量数据#xff0c;解决海量数据存储和实时更新查询的问题#xff1b;利用ElasticSearch作为HBase索引#xff0c;加快大数据集中实时查询数… 基于HBase和ElasticSearch构建大数据实时检索项目 一、项目说明二、环境搭建三、编写程序四、测试流程 一、项目说明
利用HBase存储海量数据解决海量数据存储和实时更新查询的问题利用ElasticSearch作为HBase索引加快大数据集中实时查询数据使用到的大数据组件有Hadoop-2.7.3、HBase-1.3.1、zookeeper-3.4.5、ElasticSearch-7.8.0实验环境 虚拟机操作系统CentOS7.6 个人PCWindows Eclipse或者Idea大数据环境3节点构成的全分布式环境项目系统架构图如下 本项目是利用hbase和elasticsearch的API来完成数据的写入和检索
二、环境搭建
创建3台虚拟机即3节点主节点内存4G、从节点内存3G可根据自己电脑配置来设置安装部署Hadoop全分布式可参考Hadoop2.7.3全分布式环境搭建安装部署zookeeper全分布式可参考Zookeeper的集群安装安装部署HBase全分布式可参考HBase几种安装方式注意需要先安装zookeeper并启动后再安装和启动hbase安装部署ElasticSearch集群可参考Linux下安装ElasticSearch集群注意需要使用es普通用户启动集群安装成功后各个节点上启动
三、编写程序
本项目是在eclipse上编写 构建maven工程配置settings.xml可配置阿里或华为maven仓库如下所示 ?xml version1.0 encodingutf-8?
settings xmlnshttp://maven.apache.org/SETTINGS/1.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instance xsi:schemaLocation http://maven.apache.org/SETTINGS/1.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsdmirrorsmirror idnexus-aliyun/id mirrorOfcentral/mirrorOf nameNexus aliyun/name urlhttp://maven.aliyun.com/nexus/content/groups/public//url /mirror mirror idnet-cn/id mirrorOfcentral/mirrorOf nameNexus net/name urlhttp://maven.net.cn/content/groups/public//url /mirror /mirrors profiles profile repositories repository idnexus/id namelocal private nexus/name urlhttp://maven.aliyun.com/nexus/content/groups/public//url releases enabledtrue/enabled /releases snapshots enabledfalse/enabled /snapshots /repository /repositories pluginRepositories pluginRepository idnexus/id namelocal private nexus/name urlhttp://maven.aliyun.com/nexus/content/groups/public//url releases enabledtrue/enabled /releases snapshots enabledfalse/enabled /snapshots /pluginRepository /pluginRepositories /profile /profiles activeProfiles activeProfilenexus/activeProfile /activeProfiles
/settings添加依赖到pom.xml中如下所示 ?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.bigdata/groupIdartifactIdrealtimesearch/artifactIdversion1.0-SNAPSHOT/version!-- Spring boot 父引用 --parentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion1.4.0.RELEASE/version/parentpropertiesproject.build.sourceEncodingUTF-8/project.build.sourceEncodingmaven.compiler.source1.8/maven.compiler.sourcemaven.compiler.target1.8/maven.compiler.target/properties!--仓库源--repositoriesrepositoryidalimaven/idnamealiyun maven/nameurlhttp://maven.aliyun.com/nexus/content/groups/public//urlreleasesenabledtrue/enabled/releasessnapshotsenabledfalse/enabled/snapshots/repository/repositoriesdependencies!-- Spring boot 核心web --dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependency!-- 解决thymeleaf模板引擎对h5页面检查太严格问题 --dependencygroupIdnet.sourceforge.nekohtml/groupIdartifactIdnekohtml/artifactIdversion1.9.22/version/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-thymeleaf/artifactId/dependency!-- HBase --dependencygroupIdorg.apache.hbase/groupIdartifactIdhbase-client/artifactIdversion1.3.1/versionexclusionsexclusionartifactIdhadoop-mapreduce-client-core/artifactIdgroupIdorg.apache.hadoop/groupId/exclusion/exclusions/dependencydependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-hdfs/artifactIdversion2.2.0/version/dependencydependencygroupIdorg.apache.hbase/groupIdartifactIdhbase-protocol/artifactIdversion1.3.1/version/dependency!-- ElasticSearch --dependencygroupIdorg.elasticsearch/groupIdartifactIdelasticsearch/artifactIdversion7.8.0/version/dependencydependencygroupIdorg.elasticsearch.client/groupIdartifactIdtransport/artifactIdversion7.8.0/version/dependency!-- 解锁ES运行时没有对应方法的的错误 --dependencygroupIdorg.locationtech.spatial4j/groupIdartifactIdspatial4j/artifactIdversion0.6/version/dependency!-- zookeeper --dependencygroupIdorg.apache.zookeeper/groupIdartifactIdzookeeper/artifactIdversion3.4.9/versionexclusionsexclusiongroupIdorg.slf4j/groupIdartifactIdslf4j-log4j12/artifactId/exclusion/exclusions/dependency!-- 解决ES和HBase中 io.netty包冲突 --dependencygroupIdio.netty/groupIdartifactIdnetty-all/artifactIdversion4.1.16.Final/version/dependency!-- json --dependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.13/version/dependency/dependenciesdependencyManagementdependenciesdependencygroupIdjdk.tools/groupIdartifactIdjdk.tools/artifactIdversion1.8/versionscopesystem/scopesystemPathC:\Program Files\Java\jdk1.8.0_301\lib\tools.jar/systemPath/dependency/dependencies/dependencyManagementbuildpluginsplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-compiler-plugin/artifactIdversion3.1/versionexecutionsexecutionphasecompile/phasegoalsgoalcompile/goal/goals/execution/executions/plugin/pluginsresourcesresourcedirectorysrc/main/resources/directory/resource/resources/build
/project新建data目录并将测试数据放在该目录下如下图所示测试数据下载 添加各类配置文件如conf.properties、application.properties、log4j.properties、log4j2.properties等如下图所示 配置conf.properties内容如下 #原始数据路径inputPath data/#HBase的配置#通过CloudTable服务列表获取的ZK连接地址,运行后可看到日志打印具体内网地址ZKServerhostname01:2181,hostname02:2181,hostname03:2181#HBase表名tableNamePublicSecurity#HBase列族columnFamily1BasiccolumnFamily2OtherInfo#ElasticSearch的配置如ES集群名称虚拟机IP默认端口clusterNameEs-clusterhostName192.168.1.109tcpPort9300indexNamepublicsecuritytypeNameinfo配置application.properties内容如下 server.port8084server.contextPath/bigdata#web页面热布署spring.thymeleaf.cachefalse#解决html5检查太严格问题spring.thymeleaf.mode LEGACYHTML5配置log4j.properties内容如下 log4j.rootLoggerINFO,consolelog4j.appender.consoleorg.apache.log4j.ConsoleAppenderlog4j.appender.console.targetSystem.outlog4j.appender.console.layoutorg.apache.log4j.PatternLayoutlog4j.appender.console.layout.ConversionPattern%-4r [%t] %-5p %c %x - %m%n配置log4j2.properties内容如下 name PropertiesConfigproperty.filename target/logs#appenders console, file#配置值是appender的类型并不是具体appender实例的nameappenders rollingappender.rolling.type RollingFileappender.rolling.name RollingLogFileappender.rolling.fileName${filename}/automationlogs.logappender.rolling.filePattern ${filename}/automationlogs-%d{MM-dd-yy-HH-mm-ss}-%i.logappender.rolling.layout.type PatternLayoutappender.rolling.layout.pattern[%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} - %msg%nappender.rolling.policies.type Policiesappender.rolling.policies.size.type SizeBasedTriggeringPolicyappender.rolling.policies.size.size100MBappender.rolling.strategy.type DefaultRolloverStrategyappender.rolling.strategy.max 5rootLogger.level INFO,consolerootLogger.appenderRef.rolling.ref rollingrootLogger.appenderRef.rolling.ref RollingLogFile编写读取配置文件的工具类ConstantUtil代码如下 package com.bigdata.utils;import org.apache.log4j.PropertyConfigurator;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.FileInputStream;import java.io.IOException;import java.util.Properties;/*** {docRoot 用于读取配置内容}* author suben*/public class ConstantUtil {public static final Properties PROPS new Properties();public static final Logger LOG LoggerFactory.getLogger(ConstantUtil.class);public static final String INPUT_PATH;public static final String ZK_SERVER;public static final String TABLE_NAME;public static final String COLUMN_FAMILY_1;public static final String COLUMN_FAMILY_2;public static final String INDEX_NAME;public static final String TYPE_NAME;//ES集群名,默认值elasticsearchpublic static final String CLUSTER_NAME;//ES集群中某个节点public static final String HOSTNAME;//ES连接端口号public static final int TCP_PORT;static {try {//加载日志配置PropertyConfigurator.configure(ConstantUtil.class.getClassLoader().getResource(log4j.properties).getPath());//加载连接配置PROPS.load(new FileInputStream(ConstantUtil.class.getClassLoader().getResource(conf.properties).getPath()));} catch (IOException e) {e.printStackTrace();}INPUT_PATH PROPS.getProperty(inputPath);ZK_SERVER PROPS.getProperty(ZKServer);TABLE_NAME PROPS.getProperty(tableName);INDEX_NAME PROPS.getProperty(indexName).toLowerCase();TYPE_NAME PROPS.getProperty(typeName);COLUMN_FAMILY_1 PROPS.getProperty(columnFamily1);COLUMN_FAMILY_2 PROPS.getProperty(columnFamily2);CLUSTER_NAME PROPS.getProperty(clusterName);HOSTNAME PROPS.getProperty(hostName);TCP_PORT Integer.valueOf(PROPS.getProperty(tcpPort));}} 编写HBase工具类代码如下 package com.bigdata.utils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.*;import org.apache.hadoop.hbase.client.*;import org.apache.hadoop.hbase.util.Bytes;import org.slf4j.Logger;import java.io.IOException;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;public class HBaseUtil {public static Admin admin null;public static Configuration conf null;public static Connection conn null;private HashMapString, Table tables null;private static final Logger LOG ConstantUtil.LOG;public HBaseUtil() {this(ConstantUtil.ZK_SERVER);}public HBaseUtil(String zkServer) {init(zkServer);}private void ifNotConnTableJustConn(String tableName) {if (!tables.containsKey(tableName)) {this.addTable(tableName);}}public Table getTable(String tableName) {ifNotConnTableJustConn(tableName);return tables.get(tableName);}public void addTable(String tableName) {try {tables.put(tableName, conn.getTable(TableName.valueOf(tableName)));} catch (IOException e) {e.printStackTrace();}}public boolean put(String tableName, ListPut putList) throws Exception {boolean res false;ifNotConnTableJustConn(tableName);try {getTable(tableName).put(putList);res true;} catch (IOException e) {e.printStackTrace();}return res;}public Result get(String tableName, String row) throws IOException {Result result null;ifNotConnTableJustConn(tableName);Table newTable getTable(tableName);Get get new Get(Bytes.toBytes(row));try {result newTable.get(get);KeyValue[] raw result.raw();} catch (IOException e) {e.printStackTrace();}return result;}public boolean createTable(String tableName, String... columnFamilys) {boolean result false;try {if (admin.tableExists(TableName.valueOf(tableName))) {LOG.info(tableName 表已经存在);} else {HTableDescriptor tableDesc new HTableDescriptor(TableName.valueOf(tableName));for (String columnFamily : columnFamilys) {tableDesc.addFamily(new HColumnDescriptor(columnFamily.getBytes()));}admin.createTable(tableDesc);result true;LOG.info(tableName 表创建成功);}} catch (IOException e) {e.printStackTrace();LOG.info(tableName 表创建失败 );}return result;}public boolean tableExists(String tableName) throws IOException {return admin.tableExists(TableName.valueOf(tableName));}public void disableTable(String tableName) throws IOException {if (tableExists(tableName)) {admin.disableTable(TableName.valueOf(tableName));}}/*** 删除表** param tableName*/public void deleteTable(String tableName) throws IOException {disableTable(tableName);admin.deleteTable(TableName.valueOf(tableName));}/*** 查询所有表名** return* throws Exception*/public ListString getALLTableName() throws Exception {ArrayListString tableNames new ArrayListString();if (admin ! null) {HTableDescriptor[] listTables admin.listTables();if (listTables.length 0) {for (HTableDescriptor tableDesc : listTables) {tableNames.add(tableDesc.getNameAsString());}}}return tableNames;}/*** 删除所有表,慎用!仅用于测试环境*/public void deleteAllTable() throws Exception {ListString allTbName getALLTableName();for (String s : allTbName) {LOG.info(Start delete table : s ......);deleteTable(s);LOG.info(done delete table : s);}}/*** 初始化配置** param zkServer*/public void init(String zkServer) {tables new HashMapString, Table();conf HBaseConfiguration.create();conf.set(hbase.zookeeper.quorum, zkServer);try {conn ConnectionFactory.createConnection(conf);admin conn.getAdmin();} catch (IOException e) {e.printStackTrace();}}/*** 清理所有连接** throws IOException*/public void clear() throws IOException {for (Map.EntryString, Table m : tables.entrySet()) {m.getValue().close();}admin.close();conn.close();conf.clear();}/*** 关卡登记信息bayonet姓名身份证号年龄性别关卡号日期时间通关形式* 住宿登记信息hotel姓名身份证号年龄性别起始日期结束日期同行人* 网吧登记信息internet姓名身份证号年龄性别网吧名日期逗留时长*///用于提前建好表和列族public static void preDeal() throws Exception {HBaseUtil hBaseUtils new HBaseUtil();hBaseUtils.createTable(ConstantUtil.TABLE_NAME, ConstantUtil.COLUMN_FAMILY_1, ConstantUtil.COLUMN_FAMILY_2);}//测试public static void test() throws Exception {HBaseUtil hBaseUtils new HBaseUtil();long startTime System.currentTimeMillis();String tb testTb;String colFamily info;String col name;String row 100000;String value 张三;hBaseUtils.createTable(tb, colFamily);ListPut listPut new ArrayList();Put put new Put(Bytes.toBytes(row));put.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(col), Bytes.toBytes(value));listPut.add(put);hBaseUtils.put(tb, listPut);Result res hBaseUtils.get(testTb, 100000);ListCell list res.getColumnCells(Bytes.toBytes(info), Bytes.toBytes(name));for (Cell c : list) {LOG.info(Bytes.toString(CellUtil.cloneFamily(c)));LOG.info(Bytes.toString(CellUtil.cloneQualifier(c)));LOG.info(Bytes.toString(CellUtil.cloneValue(c)));}long endTime System.currentTimeMillis();float seconds (endTime - startTime) / 1000F;LOG.info( 耗时 Float.toString(seconds) seconds.);}public static void main(String[] args) throws Exception {test();preDeal();}} 编写ElasticSearch工具类代码如下 package com.bigdata.utils;import com.alibaba.fastjson.JSONObject;import org.apache.lucene.search.TotalHits;import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;//import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;import org.elasticsearch.action.index.IndexRequestBuilder;import org.elasticsearch.action.index.IndexResponse;import org.elasticsearch.action.search.SearchRequestBuilder;import org.elasticsearch.action.search.SearchResponse;import org.elasticsearch.action.support.master.AcknowledgedResponse;import org.elasticsearch.client.IndicesAdminClient;import org.elasticsearch.client.transport.TransportClient;import org.elasticsearch.common.settings.Settings;import org.elasticsearch.common.transport.TransportAddress;import org.elasticsearch.common.xcontent.XContentBuilder;import org.elasticsearch.common.xcontent.XContentType;import org.elasticsearch.index.query.QueryBuilder;import org.elasticsearch.index.query.QueryBuilders;import org.elasticsearch.search.SearchHit;import org.elasticsearch.search.SearchHits;import org.elasticsearch.transport.client.PreBuiltTransportClient;import org.slf4j.Logger;import java.io.IOException;import java.net.InetAddress;import java.net.UnknownHostException;import java.util.ArrayList;import java.util.List;import java.util.Map;import java.util.Set;import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;public class ElasticSearchUtil {//构建Settings对象private static Settings settings Settings.builder().put(cluster.name, ConstantUtil.CLUSTER_NAME).put(client.transport.sniff, false).build();//TransportClient对象用于连接ES集群private volatile TransportClient client;private final static Logger LOG ConstantUtil.LOG;public ElasticSearchUtil() {init();}/*** 同步synchronized(*.class)代码块的作用和synchronized static方法作用一样,* 对当前对应的*.class进行持锁,static方法和.class一样都是锁的该类本身,同一个监听器** return* throws UnknownHostException*/public TransportClient getClient() {if (client null) {synchronized (TransportClient.class) {try {client new PreBuiltTransportClient(settings).addTransportAddress(new TransportAddress(InetAddress.getByName(ConstantUtil.HOSTNAME), ConstantUtil.TCP_PORT));} catch (UnknownHostException e) {e.printStackTrace();}}}return client;}/*** 获取索引管理的IndicesAdminClient*/public IndicesAdminClient getAdminClient() {return getClient().admin().indices();}/*** 判定索引是否存在** param indexName* return*/public boolean isExistsIndex(String indexName) {IndicesExistsResponse response getAdminClient().prepareExists(indexName).get();return response.isExists() ? true : false;}/*** 创建索引** param indexName* return*/public boolean createIndex(String indexName) {CreateIndexResponse createIndexResponse getAdminClient().prepareCreate(indexName.toLowerCase()).get();return createIndexResponse.isAcknowledged() ? true : false;}/*** 删除索引** param indexName* return*/public boolean deleteIndex(String indexName) {AcknowledgedResponse deleteResponse getAdminClient().prepareDelete(indexName.toLowerCase()).execute().actionGet();return deleteResponse.isAcknowledged() ? true : false;}/*** 为索引indexName设置mapping** param indexName* param typeName* param mapping*/public void setMapping(String indexName, String typeName, String mapping) {getAdminClient().preparePutMapping(indexName).setType(typeName).setSource(mapping, XContentType.JSON).get();}/*** 创建文档,相当于往表里面insert一行数据** param indexName* param typeName* param id* param document* return* throws IOException*/public long addDocument(String indexName, String typeName, String id, MapString, Object document) throws IOException {SetMap.EntryString, Object documentSet document.entrySet();IndexRequestBuilder builder getClient().prepareIndex(indexName, typeName, id);XContentBuilder xContentBuilder jsonBuilder().startObject();for (Map.Entry e : documentSet) {xContentBuilder xContentBuilder.field(e.getKey().toString(), e.getValue());}IndexResponse response builder.setSource(xContentBuilder.endObject()).get();return response.getVersion();}public ListMapString, Object queryStringQuery(String text) {ListMapString, Object resListMap null;QueryBuilder match QueryBuilders.queryStringQuery(text);SearchRequestBuilder search getClient().prepareSearch().setQuery(match); //分页 可选//搜索返回搜索结果SearchResponse response search.get();//命中的文档SearchHits hits response.getHits();//命中总数TotalHits total hits.getTotalHits();SearchHit[] hitAarr hits.getHits();//循环查看命中值resListMap new ArrayListMapString, Object();for (SearchHit hit : hitAarr) {//文档元数据String index hit.getIndex();//文档的_source的值MapString, Object resultMap hit.getSourceAsMap();resListMap.add(resultMap);}return resListMap;}private void init() {try {client new PreBuiltTransportClient(settings).addTransportAddress(new TransportAddress(InetAddress.getByName(ConstantUtil.HOSTNAME), ConstantUtil.TCP_PORT));} catch (UnknownHostException e) {e.printStackTrace();}}//用于提前建好索引相当于关系型数据库当中的数据库public static void preDealCreatIndex() {ElasticSearchUtil esUtils new ElasticSearchUtil();LOG.info(start create index..............);esUtils.createIndex(ConstantUtil.INDEX_NAME);LOG.info(finished create index !);}/*** 关卡登记信息bayonet姓名身份证号年龄性别关卡号日期时间通关形式* 住宿登记信息hotel姓名身份证号年龄性别起始日期结束日期同行人* 网吧登记信息internet姓名身份证号年龄性别网吧名日期逗留时长* name,id,age,gender,* hotelAddr,hotelInTime,hotelOutTime,acquaintancer,* barAddr,internetDate,timeSpent,* bayonetAddr,crossDate,tripType*/public static void preDealSetMapping() {JSONObject mappingTypeJson new JSONObject();JSONObject propertiesJson new JSONObject();JSONObject idJson new JSONObject();idJson.put(type, keyword);idJson.put(store, true);propertiesJson.put(id, idJson);JSONObject nameJson new JSONObject();nameJson.put(type, keyword);propertiesJson.put(name, nameJson);JSONObject uidJson new JSONObject();uidJson.put(type, keyword);uidJson.put(store, false);propertiesJson.put(uid, uidJson);JSONObject hotelAddr new JSONObject();hotelAddr.put(type, text);propertiesJson.put(address, hotelAddr);JSONObject happenedDate new JSONObject();happenedDate.put(type, date);happenedDate.put(format, yyyy-MM-dd);propertiesJson.put(happenedDate, happenedDate);JSONObject endDate new JSONObject();endDate.put(type, date);endDate.put(format, yyyy-MM-dd);propertiesJson.put(endDate, endDate);JSONObject acquaintancer new JSONObject();acquaintancer.put(type, keyword);propertiesJson.put(acquaintancer, acquaintancer);mappingTypeJson.put(properties, propertiesJson);LOG.info(start set mapping to ConstantUtil.INDEX_NAME ConstantUtil.TYPE_NAME .....);LOG.info(mappingTypeJson.toString());ElasticSearchUtil esUtils new ElasticSearchUtil();esUtils.setMapping(ConstantUtil.INDEX_NAME, ConstantUtil.TYPE_NAME, mappingTypeJson.toString());LOG.info(set mapping done!!!);}//用于测试public static void test() {String index esindex;System.out.println(createIndex..............);ElasticSearchUtil esUtils new ElasticSearchUtil();esUtils.createIndex(index);System.out.println(createIndex done!!!!!!!!!!!);System.out.println(isExists esUtils.isExistsIndex(index));System.out.println(deleteIndex...............);esUtils.deleteIndex(index);System.out.println(deleteIndex done!!!!);}public static void main(String[] args) throws IOException {preDealCreatIndex();preDealSetMapping();test();}}编写数据写入HBase和ES的实现类代码如下 package com.bigdata.insert;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.util.Bytes;import com.bigdata.utils.ConstantUtil;import com.bigdata.utils.ElasticSearchUtil;import com.bigdata.utils.HBaseUtil;import java.io.BufferedReader;import java.io.File;import java.io.FileReader;import java.util.*;/*** 读取本地文件并解析数据之后插入HBase、ElasticSearch*/public class LoadDataToHBaseAndES {private HBaseUtil hBaseUtil;private ElasticSearchUtil elasticSearchUtil;public LoadDataToHBaseAndES() {}/*** 关卡登记信息bayonet姓名身份证号年龄性别关卡号日期时间通关形式* 住宿登记信息hotel姓名身份证号年龄性别起始日期结束日期同行人* 网吧登记信息internet姓名身份证号年龄性别网吧名日期逗留时长* name,uid,age,gender,* hotelAddr,happenedDate,endDate,acquaintancer,* barAddr,happenedDate,duration,* bayonetAddr,happenedDate,tripType*/public void insert() throws Exception {hBaseUtil new HBaseUtil();elasticSearchUtil new ElasticSearchUtil();String filePath ConstantUtil.INPUT_PATH;File dir new File(filePath);File[] files dir.listFiles();if (files ! null) {for (File file : files) {if (file.isDirectory()) {System.out.println(file.getName() This is a directory!);} else {//住宿登记信息if (file.getName().contains(hotel)) {BufferedReader reader null;reader new BufferedReader(new FileReader(filePath file.getName()));String tempString null;while ((tempString reader.readLine()) ! null) {//Blank line judgmentif (!tempString.isEmpty()) {ListPut putList new ArrayListPut();String[] elements tempString.split(,);//生成不重复用户IDString id UUID.randomUUID().toString();Put put new Put(Bytes.toBytes(id));//将数据添加至hbase库put.addColumn(Bytes.toBytes(Basic), Bytes.toBytes(name), Bytes.toBytes(elements[0]));put.addColumn(Bytes.toBytes(Basic), Bytes.toBytes(uid), Bytes.toBytes(elements[1]));put.addColumn(Bytes.toBytes(Basic), Bytes.toBytes(age), Bytes.toBytes(elements[2]));put.addColumn(Bytes.toBytes(Basic), Bytes.toBytes(gender), Bytes.toBytes(elements[3]));put.addColumn(Bytes.toBytes(OtherInfo), Bytes.toBytes(event), Bytes.toBytes(hotel));put.addColumn(Bytes.toBytes(OtherInfo), Bytes.toBytes(address), Bytes.toBytes(elements[4]));put.addColumn(Bytes.toBytes(OtherInfo), Bytes.toBytes(happenedDate), Bytes.toBytes(elements[5]));put.addColumn(Bytes.toBytes(OtherInfo), Bytes.toBytes(endDate), Bytes.toBytes(elements[6]));put.addColumn(Bytes.toBytes(OtherInfo), Bytes.toBytes(acquaintancer), Bytes.toBytes(elements[7]));putList.add(put);ConstantUtil.LOG.info(hotel_info start putting to HBase ....: id tempString);hBaseUtil.put(ConstantUtil.TABLE_NAME, putList);//将数据添加至ES库MapString, Object esMap new HashMapString, Object();esMap.put(id, id);esMap.put(name, elements[0]);esMap.put(uid, elements[1]);esMap.put(address, elements[4]);esMap.put(happenedDate, elements[5]);esMap.put(endDate, elements[6]);esMap.put(acquaintancer, elements[7]);elasticSearchUtil.addDocument(ConstantUtil.INDEX_NAME, ConstantUtil.TYPE_NAME, id, esMap);ConstantUtil.LOG.info(start add document to ES... ConstantUtil.INDEX_NAME ConstantUtil.TYPE_NAME id esMap);}}reader.close();}//网吧登记信息else if (file.getName().contains(internet)) {BufferedReader reader null;reader new BufferedReader(new FileReader(filePath file.getName()));String tempString null;while ((tempString reader.readLine()) ! null) {//Blank line judgmentif (!tempString.isEmpty()) {ListPut putList new ArrayListPut();String[] elements tempString.split(,);//生成不重复用户IDString id UUID.randomUUID().toString();Put put new Put(Bytes.toBytes(id));//将数据添加至hbase库put.addColumn(Bytes.toBytes(Basic), Bytes.toBytes(name), Bytes.toBytes(elements[0]));put.addColumn(Bytes.toBytes(Basic), Bytes.toBytes(uid), Bytes.toBytes(elements[1]));put.addColumn(Bytes.toBytes(Basic), Bytes.toBytes(age), Bytes.toBytes(elements[2]));put.addColumn(Bytes.toBytes(Basic), Bytes.toBytes(gender), Bytes.toBytes(elements[3]));put.addColumn(Bytes.toBytes(OtherInfo), Bytes.toBytes(event), Bytes.toBytes(internetBar));put.addColumn(Bytes.toBytes(OtherInfo), Bytes.toBytes(address), Bytes.toBytes(elements[4]));put.addColumn(Bytes.toBytes(OtherInfo), Bytes.toBytes(happenedDate), Bytes.toBytes(elements[5]));put.addColumn(Bytes.toBytes(OtherInfo), Bytes.toBytes(duration), Bytes.toBytes(elements[6]));putList.add(put);ConstantUtil.LOG.info(internet_info start putting to HBase ... : id tempString);hBaseUtil.put(ConstantUtil.TABLE_NAME, putList);//将数据添加至ES库MapString, Object esMap new HashMapString, Object();esMap.put(id, id);esMap.put(name, elements[0]);esMap.put(uid, elements[1]);esMap.put(address, elements[4]);esMap.put(happenedDate, elements[5]);elasticSearchUtil.addDocument(ConstantUtil.INDEX_NAME, ConstantUtil.TYPE_NAME, id, esMap);ConstantUtil.LOG.info(start add document to ES... ConstantUtil.INDEX_NAME ConstantUtil.TYPE_NAME id esMap);}}reader.close();}//关卡登记信息else if (file.getName().contains(bayonet)) {BufferedReader reader null;reader new BufferedReader(new FileReader(filePath file.getName()));String tempString null;while ((tempString reader.readLine()) ! null) {//Blank line judgmentif (!tempString.isEmpty()) {ListPut putList new ArrayListPut();String[] elements tempString.split(,);//生成不重复用户IDString id UUID.randomUUID().toString();Put put new Put(Bytes.toBytes(id));//将数据添加至hbase库put.addColumn(Bytes.toBytes(Basic), Bytes.toBytes(name), Bytes.toBytes(elements[0]));put.addColumn(Bytes.toBytes(Basic), Bytes.toBytes(uid), Bytes.toBytes(elements[1]));put.addColumn(Bytes.toBytes(Basic), Bytes.toBytes(age), Bytes.toBytes(elements[2]));put.addColumn(Bytes.toBytes(Basic), Bytes.toBytes(gender), Bytes.toBytes(elements[3]));put.addColumn(Bytes.toBytes(OtherInfo), Bytes.toBytes(event), Bytes.toBytes(bayonet));put.addColumn(Bytes.toBytes(OtherInfo), Bytes.toBytes(address), Bytes.toBytes(elements[4]));put.addColumn(Bytes.toBytes(OtherInfo), Bytes.toBytes(happenedDate), Bytes.toBytes(elements[5]));put.addColumn(Bytes.toBytes(OtherInfo), Bytes.toBytes(tripType), Bytes.toBytes(elements[6]));putList.add(put);hBaseUtil.put(ConstantUtil.TABLE_NAME, putList);ConstantUtil.LOG.info(bayonet_info start putting to HBase....: id tempString);//将数据添加至ES库MapString, Object esMap new HashMapString, Object();esMap.put(id, id);esMap.put(name, elements[0]);esMap.put(uid, elements[1]);esMap.put(address, elements[4]);esMap.put(happenedDate, elements[5]);elasticSearchUtil.addDocument(ConstantUtil.INDEX_NAME, ConstantUtil.TYPE_NAME, id, esMap);ConstantUtil.LOG.info(start add document to ES... ConstantUtil.INDEX_NAME ConstantUtil.TYPE_NAME id esMap);}}reader.close();}//数据描述文件跳过else {continue;}}}ConstantUtil.LOG.info(load and insert done !!!!!!!!!!!!!!!!!!);}}public static void start() throws Exception {LoadDataToHBaseAndES load2DB new LoadDataToHBaseAndES();load2DB.insert();}public static void main(String[] args) throws Exception {start();}}编写Query查询类代码如下 package com.bigdata.query;import com.alibaba.fastjson.JSONObject;import com.bigdata.utils.ConstantUtil;import com.bigdata.utils.ElasticSearchUtil;import com.bigdata.utils.HBaseUtil;import org.apache.hadoop.hbase.Cell;import org.apache.hadoop.hbase.CellUtil;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.util.Bytes;import java.io.IOException;import java.util.List;import java.util.Map;/**** 搜索逻辑是先搜索ElasticSearch再查HBase*/public class Query {private HBaseUtil hBaseUtil new HBaseUtil();private ElasticSearchUtil elasticSearchUtil new ElasticSearchUtil();private JSONObject result new JSONObject();private JSONObject tmpJS new JSONObject();public String query(String target) {result.clear();tmpJS.clear();long startTime System.currentTimeMillis();ListMapString, Object listMap elasticSearchUtil.queryStringQuery(target);long endTime System.currentTimeMillis();float seconds (endTime - startTime) / 1000F;ConstantUtil.LOG.info(ElasticSearch查询耗时 Float.toString(seconds) seconds.);for (MapString, Object m : listMap) {String id m.get(id).toString();JSONObject tmpJS new JSONObject();tmpJS.put(id, id);Result res null;try {long s1 System.currentTimeMillis();res hBaseUtil.get(ConstantUtil.TABLE_NAME, id);long e1 System.currentTimeMillis();float se1 (e1 - s1) / 1000F;ConstantUtil.LOG.info(HBase查询耗时 Float.toString(se1) seconds.);Cell[] cells res.rawCells();for (Cell cell : cells) {String col Bytes.toString(CellUtil.cloneQualifier(cell));System.out.println(col);String value Bytes.toString(CellUtil.cloneValue(cell));System.out.println(value);tmpJS.put(col, value);}result.put(id, tmpJS);} catch (IOException e) {e.printStackTrace();result.put(id, 查询失败!);}}return result.toString();}public static void main(String[] args) throws Exception {Query query new Query();long startTime System.currentTimeMillis();System.out.println(query.query(100004));long endTime System.currentTimeMillis();float seconds (endTime - startTime) / 1000F;ConstantUtil.LOG.info( 耗时 Float.toString(seconds) seconds.);}} 编写ManagerQuery查询类代码如下 package com.bigdata.manager;import org.springframework.stereotype.Component;import com.bigdata.query.Query;Componentpublic class ManagerQuery {private static Query query new Query();public static String getQueryResult(String target) {try {String result query.query(target);System.out.println(result);return result;} catch (Exception e) {e.printStackTrace();return 查询出现异常请通知研发人员!;}}public static void main(String[] args) {String target 牧之桃;String result ManagerQuery.getQueryResult(target);System.out.println(result);}}编写SearchService服务类可参考SpringMVC代码写作代码如下 package com.bigdata.service;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.EnableAutoConfiguration;import org.springframework.context.annotation.ComponentScan;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import com.bigdata.manager.ManagerQuery;RestControllerEnableAutoConfigurationComponentScan(basePackages {com.bigdata})public class SearchService {RequestMapping(/search)public String search(String target) {try {return ManagerQuery.getQueryResult(target);} catch (Exception e) {e.printStackTrace();}return 不小心出错了!;}// 主方法像一般的Java类一般去右击run as application时候执行该方法public static void main(String[] args) throws Exception {SpringApplication.run(SearchService.class, args);}} 编写SearchController控制类可参考SpringMVC代码写作代码如下 package com.bigdata.controller;import org.springframework.boot.SpringApplication;import org.springframework.stereotype.Controller;import org.springframework.ui.ModelMap;import org.springframework.web.bind.annotation.RequestMapping;/*** 注解声明该类为Controller类 并自动加载所需要的其它类*/Controllerpublic class SearchController {RequestMapping(/index)String testdo(ModelMap map) {//这里返回HTML页面return index_search;}// 主方法像一般的Java类一般去右击run as application时候执行该方法public static void main(String[] args) {SpringApplication.run(SearchController.class, args);}} 编写ApplicationBootSystem启动类代码如下 package com.bigdata.boot;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;/*** 根启动类*/
SpringBootApplication
ComponentScan(basePackages com.bigdata)
public class ApplicationBootSystem {public static void main(String[] args) {SpringApplication.run(ApplicationBootSystem.class, args);}
}新建static并在其下新建plugins并将bootstrap-3.3.7和bootstrap-table包复制到该目录下 新建template目录并在其下面新建index_search.html文件 具体代码如下 !DOCTYPE html
html langen
headmeta charsetutf-8meta http-equivX-UA-Compatible contentIEedgemeta nameviewport contentwidthdevice-width, initial-scale1!-- The above 3 meta tags *must* come first in the head; any other head content must come *after* these tags --titleRealtime Search/title!-- Bootstrap --link hrefplugins/bootstrap-3.3.7/css/bootstrap.min.css relstylesheetlink hrefplugins/bootstrap-table/bootstrap-table.min.css relstylesheet!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries --!-- WARNING: Respond.js doesnt work if you view the page via file:// --!--[if lt IE 9]script srchttps://oss.maxcdn.com/html5shiv/3.7.3/html5shiv.min.js/script;script srchttps://oss.maxcdn.com/respond/1.4.2/respond.min.js/script;![endif]--
/head
body
div classcontainerdiv classrow!-- onsubmit设置成return false,不再显式提交form --div classcol-md-8 col-md-offset-2 text-centeronsubmitreturn falseform classform-inlinediv classform-grouplabel fortarget请输入条件/label input typetextclassform-control idtarget nametarget placeholder请输入条件/divbutton typesubmit idsubmit classbtn btn-primary搜一下/button/form/div/div!-- 在下一行中添加一个bs系统自带的表格 --div classrowtable idtable/table/div
/div
!-- jQuery (necessary for Bootstraps JavaScript plugins) --
script srchttp://code.jquery.com/jquery-1.12.1.min.js ;/script
!-- Include all compiled plugins (below), or include individual files as needed --
script srcplugins/bootstrap-3.3.7/js/bootstrap.min.js/script
!-- 加入bootstrap table依赖 --
script srcplugins/bootstrap-table/bootstrap-table.min.js/script
script srcplugins/bootstrap-table/bootstrap-table-locale-all.min.js/script
script typetext/javascript$(function () {!--初始化表格的样式 --$(#table).bootstrapTable({columns: [{field: id,title: 记录id,formatter: function (value, row, index) {var a ;if (value $(#target).val()) {a span stylecolor:#5858FA value /span;} else {a span stylecolor#190707 value /span;}return a;}},{field: name,title: 姓名,formatter: function (value, row, index) {var a ;if (value $(#target).val()) {a span stylecolor:#5858FA value /span;} else {a span stylecolor#190707 value /span;}return a;}}, {field: uid,title: 用户id,formatter: function (value, row, index) {var a ;if (value $(#target).val()) {a span stylecolor:#5858FA value /span;} else {a span stylecolor#190707 value /span;}return a;}}, {field: age,title: 年龄,formatter: function (value, row, index) {var a ;if (value $(#target).val()) {a span stylecolor:#5858FA value /span;} else {a span stylecolor#190707 value /span;}return a;}}, {field: gender,title: 性别,formatter: function (value, row, index) {var a ;if (value $(#target).val()) {a span stylecolor:#5858FA value /span;} else {a span stylecolor#190707 value /span;}return a;}}, {field: event,title: 事件,formatter: function (value, row, index) {var a ;if (value $(#target).val()) {a span stylecolor:#5858FA value /span;} else {a span stylecolor#190707 value /span;}return a;}},{field: address,title: 地址,formatter: function (value, row, index) {var a ;if (value $(#target).val()) {a span stylecolor:#5858FA value /span;} else {a span stylecolor#190707 value /span;}return a;}}, {field: happenedDate,title: 发生时间,formatter: function (value, row, index) {var a ;if (value $(#target).val()) {a span stylecolor:#5858FA value /span;} else {a span stylecolor#190707 value /span;}return a;}}, {field: acquaintancer,title: 同行人,formatter: function (value, row, index) {var a ;if (value $(#target).val()) {a span stylecolor:#5858FA value /span;} else {a span stylecolor#190707 value /span;}return a;}},{field: endDate,title: 结束时间,formatter: function (value, row, index) {var a ;if (value $(#target).val()) {a span stylecolor:#5858FA value /span;} else {a span stylecolor#190707 value /span;}return a;}}]});//为submit按钮绑定click事件,填充点击查询后的数据查询$(#submit).click(function () {$.ajax({url: /bigdata/search,data: target $(#target).val(),cache: false,//false是不缓存true为缓存async: true,//true为异步false为同步beforeSend: function () {//请求前},success: function (result) {try {var resultArray new Array();js JSON.parse(result);for (var p in js) {resultArray.push(js[p])console.log(js[p]);}console.log(resultArray);$(#table).bootstrapTable(load, resultArray);} catch (e) {window.alert(result);$(#table).bootstrapTable(load, [{result: 什么也没有找到}]);}},complete: function () {//请求结束时},error: function () {//请求失败时}})});});/script
/body
/html写完成后项目结构如下所示
四、测试流程 先执行HBaseUtil工具类main方法完成HBase测试表和目标表的创建验证程序和hbase的连通性 再执行ElasticSearch工具类main方法完成ElasticSearch测试表和目标表的创建验证程序和ElasticSearch的连通性 再执行LoadDataToHBaseAndES类完成数据写入HBase和ElasticSearch中 再执行ApplicationBootSystem启动类启动springboot入口程序 最后打开浏览器输入http://localhost:8084/bigdata/index在打开的界面中的搜索框输入查询关键字如输入3点击【搜一下】按钮正常情况下会看到如下结果 尝试输入不同的条件查看到不同的结果注意 需要观察检索的实时性或者速度是很快的。另外还可以尝试下修改测试数据集使得其数据量变得更大些然后再查看其检索速度读者可以自行尝试。