网页升级访问每天,北京网络推广优化公司,网站seo优化如何做,小企业怎么推广1 介绍Apache Calcite是一款开源的动态数据管理框架#xff0c;它提供了标准的 SQL 语言、多种查询优化和连接各种数据源的能力#xff0c;但不包括数据存储、处理数据的算法和存储元数据的存储库。Calcite 之前的名称叫做optiq#xff0c;optiq 起初在 Hive 项目中#xf…1 介绍Apache Calcite是一款开源的动态数据管理框架它提供了标准的 SQL 语言、多种查询优化和连接各种数据源的能力但不包括数据存储、处理数据的算法和存储元数据的存储库。Calcite 之前的名称叫做optiqoptiq 起初在 Hive 项目中为 Hive 提供基于成本模型的优化即CBO(Cost Based Optimizatio)。2014 年 5 月 optiq 独立出来成为 Apache 社区的孵化项目2014 年 9 月正式更名为 Calcite。Calcite 的目标是“one size fits all(一种方案适应所有需求场景)”希望能为不同计算平台和数据源提供统一的查询引擎。2 架构与解析步骤一般来说Calcite解析SQL有以下几步:Parser. 此步中Calcite通过Java CC将SQL解析成未经校验的ASTValidate. 该步骤主要作用是校证Parser步骤中的AST是否合法,如验证SQL scheme、字段、函数等是否存在; SQL语句是否合法等. 此步完成之后就生成了RelNode树(关于RelNode树, 请参考下文)Optimize. 该步骤主要的作用优化RelNode树, 并将其转化成物理执行计划。主要涉及SQL规则优化如:基于规则优化(RBO)及基于代价(CBO)优化; Optimze 这一步原则上来说是可选的, 通过Validate后的RelNode树已经可以直接转化物理执行计划但现代的SQL解析器基本上都包括有这一步目的是优化SQL执行计划。此步得到的结果为物理执行计划。Execute即执行阶段。此阶段主要做的是:将物理执行计划转化成可在特定的平台执行的程序。如Hive与Flink都在在此阶段将物理执行计划CodeGen生成相应的可执行代码。2.1 查询优化INSERT INTO tmp_nodeSELECT s1.id1, s1.id2, s2.val1FROM source1 as s1 INNER JOIN source2 AS s2ON s1.id1 s2.id1 and s1.id2 s2.id2 where s1.val1 5 and s2.val2 3;2.2 Parser解析LogicalTableModify(table[[TMP_NODE]], operation[INSERT], flattened[false])LogicalProject(ID1[$0], ID2[$1], VAL1[$7])LogicalFilter(condition[AND(($2, 5), ($8, 3))])LogicalJoin(condition[AND(($0, $5), ($1, $6))], joinType[INNER])LogicalTableScan(table[[SOURCE1]])LogicalTableScan(table[[SOURCE2]])2.3 Optimize优化谓词下推投影下推关系代数定律优化LogicalTableModify(table[[TMP_NODE]], operation[INSERT], flattened[false])LogicalProject(ID1[$0], ID2[$1], VAL1[$7])LogicalJoin(condition[AND(($0, $5), ($1, $6))], joinType[inner])LogicalFilter(condition[($4, 3)])LogicalProject(ID1[$0], ID2[$1], ID3[$2], VAL1[$3], VAL2[$4],VAL3[$5])LogicalTableScan(table[[SOURCE1]])LogicalFilter(condition[($3,5)])LogicalProject(ID1[$0], ID2[$1], ID3[$2], VAL1[$3], VAL2[$4],VAL3[$5])LogicalTableScan(table[[SOURCE2]])3 LogicalTableScan查询如上节点树中的最后节点均为LogicalTableScan假设我们不参与(LogicalTableScan)Calcite的查询过程即不做SQL解析不做优化只要把它接入进来实际Calcite是可以工作的无非就是可能会有扫全表、数据全部加载到内存里等问题所以实际中我们可能会参与全部(Translatable)或部分工作(FilterableTable)覆盖Calcite的一些执行计划或过滤条件让它能更高效的工作。值得一提的是Calcite支持异构数据源查询比如数据存在es和mysql可以通过写sql join之类的操作让calcite分别先从不同的数据源查询数据然后再在内存里进行合并计算另外它本身提供了许多优化规则也支持我们自定义优化规则来优化整个查询。3.1 ScannableTablea simple implementation of Table, using the ScannableTable interface, that enumerates all rows directly这种方式基本不会用原因是查询数据库的时候没有任何条件限制默认会先把全部数据拉到内存然后再根据filter条件在内存中过滤。使用方式实现Enumerable scan(DataContext root);该函数返回Enumerable对象通过该对象可以一行行的获取这个Table的全部数据。3.2 FilterableTablea more advanced implementation that implements FilterableTable, and can filter out rows according to simple predicates初级用法我们能拿到filter条件即能再查询底层DB时进行一部分的数据过滤一般开始介入calcite可以用这种方式(translatable方式学习成本较高)。使用方式实现Enumerable scan(DataContext root, List filters )。如果当前类型的“表”能够支持我们自己写代码优化这个过滤器那么执行完自定义优化器可以把该过滤条件从集合中移除否则就让calcite来过滤简言之就是如果我们不处理List filters Calcite也会根据自己的规则在内存中过滤无非就是对于查询引擎来说查的数据多了但如果我们可以写查询引擎支持的过滤器(比如写一些hbase、es的filter)这样在查的时候引擎本身就能先过滤掉多余数据更加优化。提示即使走了我们的查询过滤条件可以再让calcite帮我们过滤一次比较灵活。3.3 TranslatableTableadvanced implementation of Table, using TranslatableTable, that translates to relational operators using planner rules.高阶用法有些查询用上面的方式都支持不了或支持的不好比如join、聚合、或对于select的字段筛选等需要用这种方式来支持好处是可以支持更全的功能代价是所有的解析都要自己写“承上启下”上面解析sql的各个部件下面要根据不同的DB(es\mysql\drudi..)来写不同的语法查询。当使用ScannableTable的时候我们只需要实现函数Enumerable scan(DataContext root);该函数返回Enumerable对象通过该对象可以一行行的获取这个Table的全部数据(也就意味着每次的查询都是扫描这个表的数据我们干涉不了任何执行过程)当使用FilterableTable的时候我们需要实现函数Enumerable scan(DataContext root, List filters );参数中多了filters数组这个数据包含了针对这个表的过滤条件这样我们根据过滤条件只返回过滤之后的行减少上层进行其它运算的数据集当使用TranslatableTable的时候我们需要实现RelNode toRel( RelOptTable.ToRelContext context, RelOptTable relOptTable);该函数可以让我们根据上下文自己定义表扫描的物理执行计划至于为什么不在返回一个Enumerable对象了因为上面两种其实使用的是默认的执行计划转换成EnumerableTableAccessRel算子通过TranslatableTable我们可以实现自定义的算子以及执行一些其他的ruleKylin就是使用这个类型的Table实现查询。4 自定义数据源表接入demo如果你的数据源不在官方的支持列表中或者官方的支持不能满足你的需求那么则需要自己实现源接入。4.1 准备工作4.1.1 maven引入org.apache.calcitecalcite-core1.19.0com.alibabafastjson1.2.54com.google.guavaguava16.0.14.1.2 开发流程calcite中引入一个数据库通常是通过注册一个SchemaFactory接口实现类来实现。SchemaFactory中只有一个方法就是生成Schema。Schema最重要的功能是获取所有Table。Table有两个功能一个是获取所有字段的类型另一个是得到Enumerable迭代器用来读取数据。4.1.3 配置信息如果将你的数据源引入calcite一般情况下是使用一个配置文件以下是配置文件的demo。{version: 1.0,defaultSchema: TEST,schemas: [{name: TEST,type: custom,factory: org.apache.calcite.adapter.jdbc.JdbcSchema$Factory,operand: {jdbcUrl: jdbc:mysql://127.0.0.1:3306/test?useUnicodetruecharacterEncodingUTF-8,jdbcDriver:com.mysql.cj.jdbc.Driver,jdbcUser:test,jdbcPassword:test}}]}4.2 CSV表demo这里我们先生成一个CSV文件后边的操作就是通过在calcite中调用SQL访问CSV中的数据。TEST01.csvID:VARCHAR,NAME1:VARCHAR,NAME2:VARCHAR0,first,second1,hello,worldCsvSchemaFactory类package com.calcite.csv;import org.apache.calcite.schema.Schema;import org.apache.calcite.schema.SchemaFactory;import org.apache.calcite.schema.SchemaPlus;import java.util.Map;public class CsvSchemaFactory implements SchemaFactory {/*** parentSchema 他的父节点一般为root* name 数据库的名字它在model中定义的* operand 也是在mode中定义的是Map类型用于传入自定义参数。* */Overridepublic Schema create(SchemaPlus parentSchema, String name, Map operand) {return new CsvSchema(String.valueOf(operand.get(dataFile)));}}CsvSchema类package com.calcite.csv;import com.google.common.collect.ImmutableMap;import com.google.common.io.Resources;import org.apache.calcite.schema.Table;import org.apache.calcite.schema.impl.AbstractSchema;import org.apache.calcite.util.Source;import org.apache.calcite.util.Sources;import java.net.URL;import java.util.Map;public class CsvSchema extends AbstractSchema {private Map tableMap;private String dataFile;public CsvSchema(String dataFile) {this.dataFile dataFile;}Overrideprotected Map getTableMap() {URL url Resources.getResource(dataFile);Source source Sources.of(url);if (tableMap null) {final ImmutableMap.Builder builder ImmutableMap.builder();builder.put(this.dataFile.split(\\.)[0],new CsvTable(source));// 一个数据库有多个表名这里初始化大小写要注意了,TEST01是表名。tableMap builder.build();}return tableMap;}}CsvTable类package com.calcite.csv;import com.google.common.collect.Lists;import org.apache.calcite.DataContext;import org.apache.calcite.adapter.java.JavaTypeFactory;import org.apache.calcite.linq4j.AbstractEnumerable;import org.apache.calcite.linq4j.Enumerable;import org.apache.calcite.linq4j.Enumerator;import org.apache.calcite.rel.type.RelDataType;import org.apache.calcite.rel.type.RelDataTypeFactory;import org.apache.calcite.schema.ScannableTable;import org.apache.calcite.schema.impl.AbstractTable;import org.apache.calcite.sql.type.SqlTypeName;import org.apache.calcite.util.Pair;import org.apache.calcite.util.Source;import java.io.*;import java.util.List;public class CsvTable extends AbstractTable implements ScannableTable {private Source source;public CsvTable(Source source) {this.source source;}/*** 获取字段类型*/Overridepublic RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {JavaTypeFactory typeFactory (JavaTypeFactory)relDataTypeFactory;List names Lists.newLinkedList();List types Lists.newLinkedList();try {BufferedReader reader new BufferedReader(new FileReader(source.file()));String line reader.readLine();List lines Lists.newArrayList(line.split(,));lines.forEach(column - {String name column.split(:)[0];String type column.split(:)[1];names.add(name);types.add(typeFactory.createSqlType(SqlTypeName.get(type)));});} catch (FileNotFoundException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();}return typeFactory.createStructType(Pair.zip(names, types));}Overridepublic Enumerable scan(DataContext dataContext) {return new AbstractEnumerable() {Overridepublic Enumerator enumerator() {return new CsvEnumerator(source);}};}}CsvEnumerator类package com.calcite.csv;import org.apache.calcite.linq4j.Enumerator;import org.apache.calcite.util.Source;import java.io.BufferedReader;import java.io.IOException;public class CsvEnumerator implements Enumerator {private E current;private BufferedReader br;public CsvEnumerator(Source source) {try {this.br new BufferedReader(source.reader());this.br.readLine();} catch (IOException e) {e.printStackTrace();}}Overridepublic E current() {return current;}Overridepublic boolean moveNext() {try {String line br.readLine();if(line null){return false;}current (E)line.split(,); // 如果是多列这里要多个值} catch (IOException e) {e.printStackTrace();return false;}return true;}/*** 出现异常走这里* */Overridepublic void reset() {System.out.println(报错了兄弟不支持此操作);}/*** InputStream流在这里关闭* */Overridepublic void close() {}}model.json{version: 1.0,defaultSchema: TEST_CSV,schemas: [{name: TEST_CSV,type: custom,factory: com.calcite.csv.CsvSchemaFactory,operand: {dataFile: TEST01.csv}}]}Main方法调用package com.calcite;import com.alibaba.fastjson.JSON;import com.calcite.util.ReourceUtil;import com.google.common.collect.Lists;import com.google.common.collect.Maps;import java.sql.*;import java.util.List;import java.util.Map;public class Client {/*** 测试的时候用字符串 defaultSchema 默认数据库 name 数据库名称 type custom factory* 请求接收类该类会实例化Schema也就是数据库类Schema会实例化Table实现类Table会实例化数据类。* operand 动态参数ScheamFactory的create方法会接收到这里的数据*/public static void main(String[] args) {try {// 用文件的方式//URL url Client.class.getResource(/model.json);//String str URLDecoder.decode(url.toString(), UTF-8);//Properties info new Properties();//info.put(model, str.replace(file:, ));//Connection connection DriverManager.getConnection(jdbc:calcite:, info);// 字符串方式String model ReourceUtil.getResourceAsString(model.json);Connection connection DriverManager.getConnection(jdbc:calcite:modelinline: model);Statement statement connection.createStatement();test1(statement);} catch (Exception e) {e.printStackTrace();}}/*** CSV文件读取* param statement* throws Exception*/public static void test1(Statement statement) throws Exception {ResultSet resultSet statement.executeQuery(select * from test_csv.TEST01);System.out.println(JSON.toJSONString(getData(resultSet)));}public static List getData(ResultSet resultSet)throws Exception{List list Lists.newArrayList();ResultSetMetaData metaData resultSet.getMetaData();int columnSize metaData.getColumnCount();while (resultSet.next()) {Map map Maps.newLinkedHashMap();for (int i 1; i columnSize 1; i) {map.put(metaData.getColumnLabel(i), resultSet.getObject(i));}list.add(map);}return list;}}4.3 内存数据源与CSV数据源关联查询demo在4.2的演示中我们能够使用SQL查询CSV文件中的数据。接下来我们再定义一种内存数据源主要作用是演示两种数据源间的关联查询。MemSchemaFactory类package com.calcite.memory;import org.apache.calcite.schema.Schema;import org.apache.calcite.schema.SchemaFactory;import org.apache.calcite.schema.SchemaPlus;import java.util.Map;public class MemSchemaFactory implements SchemaFactory {Overridepublic Schema create(SchemaPlus schemaPlus, String s, Map map) {return new MemSchema(map);}}MemSchema类package com.calcite.memory;import com.google.common.collect.ImmutableMap;import org.apache.calcite.schema.Table;import org.apache.calcite.schema.impl.AbstractSchema;import java.util.Map;public class MemSchema extends AbstractSchema {private Map map;private Map tableMap;public MemSchema(Map map) {this.map map;}Overrideprotected Map getTableMap() {if (tableMap null) {final ImmutableMap.Builder builder ImmutableMap.builder();map.forEach((key, value) - {builder.put(key, new MemTable(value));});tableMap builder.build();}return tableMap;}}MemTable类package com.calcite.memory;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.TypeReference;import com.alibaba.fastjson.parser.Feature;import com.google.common.collect.Lists;import org.apache.calcite.DataContext;import org.apache.calcite.adapter.java.JavaTypeFactory;import org.apache.calcite.linq4j.AbstractEnumerable;import org.apache.calcite.linq4j.Enumerable;import org.apache.calcite.linq4j.Enumerator;import org.apache.calcite.rel.type.RelDataType;import org.apache.calcite.rel.type.RelDataTypeFactory;import org.apache.calcite.schema.ScannableTable;import org.apache.calcite.schema.impl.AbstractTable;import org.apache.calcite.sql.type.SqlTypeName;import org.apache.calcite.util.Pair;import java.io.BufferedReader;import java.io.FileNotFoundException;import java.io.FileReader;import java.io.IOException;import java.util.List;import java.util.Map;public class MemTable extends AbstractTable implements ScannableTable {private List list Lists.newLinkedList();public MemTable(Object list) {if (list instanceof List) {((List)list).forEach(o - {this.list.add(JSON.parseObject(JSON.toJSONString(o),new TypeReference() {},Feature.OrderedField));});}}Overridepublic Enumerable scan(DataContext dataContext) {return new AbstractEnumerable() {Overridepublic Enumerator enumerator() {return new MemEnumerator(list);}};}Overridepublic RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {JavaTypeFactory typeFactory (JavaTypeFactory)relDataTypeFactory;List names Lists.newLinkedList();List types Lists.newLinkedList();if (list.size() ! 0) {list.get(0).forEach((key, value) - {names.add(key);types.add(typeFactory.createSqlType(SqlTypeName.get(VARCHAR)));});}return typeFactory.createStructType(Pair.zip(names, types));}}MemEnumerator类package com.calcite.memory;import com.google.common.collect.Lists;import org.apache.calcite.linq4j.Enumerator;import java.util.List;import java.util.Map;public class MemEnumerator implements Enumerator {private List list Lists.newLinkedList();private int index -1;private E e;public MemEnumerator(List list) {this.list list;}Overridepublic E current() {return e;}Overridepublic boolean moveNext() {if (index1 list.size()){return false;}else {e (E)list.get(index1).values().toArray();index;return true;}}Overridepublic void reset() {index -1;e null;}Overridepublic void close() {}}model.json{version: 1.0,defaultSchema: TEST_CSV,schemas: [{name: TEST_CSV,type: custom,factory: com.calcite.csv.CsvSchemaFactory,operand: {dataFile: TEST01.csv}},{name: TEST_MEM,type: custom,factory: com.calcite.memory.MemSchemaFactory,operand: {MEM_TABLE_1: [{ID: 0,MEM_STR: str0},{ID: 1,MEM_STR: str1},{ID: 2,MEM_STR: str2}]}}]}Main方法调用package com.calcite;import com.alibaba.fastjson.JSON;import com.calcite.util.ReourceUtil;import com.google.common.collect.Lists;import com.google.common.collect.Maps;import java.sql.*;import java.util.List;import java.util.Map;public class Client {/*** 测试的时候用字符串 defaultSchema 默认数据库 name 数据库名称 type custom factory* 请求接收类该类会实例化Schema也就是数据库类Schema会实例化Table实现类Table会实例化数据类。* operand 动态参数ScheamFactory的create方法会接收到这里的数据*/public static void main(String[] args) {try {// 用文件的方式//URL url Client.class.getResource(/model.json);//String str URLDecoder.decode(url.toString(), UTF-8);//Properties info new Properties();//info.put(model, str.replace(file:, ));//Connection connection DriverManager.getConnection(jdbc:calcite:, info);// 字符串方式String model ReourceUtil.getResourceAsString(model.json);Connection connection DriverManager.getConnection(jdbc:calcite:modelinline: model);Statement statement connection.createStatement();test2(statement);} catch (Exception e) {e.printStackTrace();}}/*** CSV文件读取* param statement* throws Exception*/public static void test1(Statement statement) throws Exception {ResultSet resultSet statement.executeQuery(select * from test_csv.TEST01);System.out.println(JSON.toJSONString(getData(resultSet)));}/*** CSV文件与内存文件关联读取* param statement* throws Exception*/public static void test2(Statement statement) throws Exception {ResultSet resultSet1 statement.executeQuery(select csv1.id as cid,csv1.name1 as cname ,mem1.id as mid,mem1.mem_str as mstr from test_csv.TEST01 as csv1 left join test_mem.mem_table_1 as mem1 on csv1.id mem1.id);System.out.println(JSON.toJSONString(getData(resultSet1)));}public static List getData(ResultSet resultSet)throws Exception{List list Lists.newArrayList();ResultSetMetaData metaData resultSet.getMetaData();int columnSize metaData.getColumnCount();while (resultSet.next()) {Map map Maps.newLinkedHashMap();for (int i 1; i columnSize 1; i) {map.put(metaData.getColumnLabel(i), resultSet.getObject(i));}list.add(map);}return list;}}小结calcite对于没有高并发、低延时的多数据源间数据有着天然的优势。但需要注意的是如果一个表中数据量特别大大到读取速度很慢或内存无法容纳那么务必在操作该表数据时加入尽可能多的筛选条件如果自定义实现LogicalTableScan最好也是实现FilterableTable从而减少calcite在内存中操作数据行的量。参考