网站静态生成目录 名称 建议,2023互联网公司排行,上海闵行官网,河北交通建设投资集团公司网站当写入数据到外部数据库时#xff0c;Flink 会使用 DDL 中定义的主键。如果定义了主键#xff0c;则连接器将以 upsert 模式工作#xff0c;否则连接器将以 append 模式工作
package cn.edu.tju.demo2;import org.apache.flink.streaming.api.environment.StreamExecutionE…当写入数据到外部数据库时Flink 会使用 DDL 中定义的主键。如果定义了主键则连接器将以 upsert 模式工作否则连接器将以 append 模式工作
package cn.edu.tju.demo2;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.*;
import org.apache.flink.types.Row;public class Test41 {//demo 是MySQL中已经创建好的表//create table demo (userId varchar(50) not null,total bigint,avgVal double);private static String FILE_PATH info.txt;public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv StreamTableEnvironment.create(env);tableEnv.connect(new FileSystem().path(FILE_PATH)).withFormat(new Csv()).withSchema(new Schema().field(userId, DataTypes.VARCHAR(50)).field(ts, DataTypes.INT()).field(val, DataTypes.DOUBLE())).createTemporaryTable(input);Table dataTable tableEnv.from(input);Table aggregateTable dataTable.groupBy(userId).select(userId, userId.count as total, val.avg as avgVal);String sqlcreate table jdbcOutputTable ( userId varchar(50) not null,total bigint,avgVal double ) with ( connector.type jdbc, connector.url jdbc:mysql://xx.xx.xx.xx:3306/test, connector.table demo, connector.driver com.mysql.jdbc.Driver, connector.username root, connector.password 123456 );tableEnv.sqlUpdate(sql);aggregateTable.insertInto(jdbcOutputTable);tableEnv.execute(my job);}
}
文件info.txt
user1,1680000890,31.6
user2,1681111900,38.3
user1,1680000890,34.9