当前位置: 首页 > news >正文

网站初期建设的成本来源广东省建设厅官方网(官网)

网站初期建设的成本来源,广东省建设厅官方网(官网),网站登录页面模板,广西桂林自驾游攻略摘要#xff1a; MaxCompute作为阿里巴巴集团内部绝大多数大数据处理需求的核心计算组件#xff0c;拥有强大的计算能力#xff0c;随着集团内外大数据业务的不断扩展#xff0c;新的数据使用场景也在不断产生。在这样的背景下#xff0c;MaxCompute#xff08;ODPS…摘要 MaxCompute作为阿里巴巴集团内部绝大多数大数据处理需求的核心计算组件拥有强大的计算能力随着集团内外大数据业务的不断扩展新的数据使用场景也在不断产生。在这样的背景下MaxComputeODPS计算框架持续演化而原来主要面对内部特殊格式数据的强大计算能力也正在一步步的通过新增的非结构化数据处理框架开放给不同的外部数据。 0. 前言 MaxCompute作为阿里巴巴集团内部绝大多数大数据处理需求的核心计算组件拥有强大的计算能力随着集团内外大数据业务的不断扩展新的数据使用场景也在不断产生。在这样的背景下MaxComputeODPS计算框架持续演化而原来主要面对内部特殊格式数据的强大计算能力也正在一步步的通过新增的非结构化数据处理框架开放给不同的外部数据。 我们相信阿里巴巴集团的这种需求也代表着业界大数据领域的最前沿实践和走向具有相当的普适性。在之前我们已经对MaxCompute 2.0新增的非结构化框架做过整体介绍描述了在MaxCompute上如何处理存储在OSS上面的非结构化数据侧重点在怎样从OSS读取各种非结构化数据并在MaxCompute上进行计算。 而一个完整数据链路读取和计算处理之后必然也会涉及到非结构化数据的 写出。 在这里我们着重介绍一下从MaxCompute往OSS输出非结构化数据并提供一个具体的在MaxCompute上进行图像处理的实例 来展示从【OSS-MaxCompute-OSS】的整个数据链路闭环的实现。 至于对于KV NoSQL类型数据的输出在对TableStore数据处理介绍 中已经有所介绍这里就不再重复。 1. 使用前提和假设 1.1 MaxCompute 2.0 功能 这里介绍的功能基于MaxCompute新一代的2.0计算框架目前2.0计算框架已经全面上线默认就可使用。 另外本文中使用了MaxCompute 2.0新引进的一个BINARY类型目前在使用BINARY类型时还需要显性设置set odps.sql.type.system.odps2true。 1.2 网络连通性与访问权限 另外因为MaxCompute与OSS是两个分开的云计算与云存储服务所以在不同的部署集群上的网络连通性有可能影响MaxCompute访问OSS的数据的可达性。 关于OSS的节点实例服务地址等概念可以参见OSS相关介绍。 在MaxCompute公共云服务访问OSS存储推荐使用OSS私网地址即以-internal.aliyuncs.com结尾的host地址。 此外需要指出的是MaxCompute计算服务要访问TableStore数据需要有一个安全的授权通道。 在这个问题上MaxCompute结合了阿里云的访问控制服务RAM和令牌服务STS来实现对数据的安全反问 首先需要在RAM中授权MaxCompute访问OSS的权限。登录RAM控制台创建角色AliyunODPSDefaultRole并将策略内容设置为 {Statement: [{Action: sts:AssumeRole,Effect: Allow,Principal: {Service: [odps.aliyuncs.com]}}],Version: 1 } 然后编辑该角色的授权策略将权限AliyunODPSRolePolicy授权给该角色。 如果觉得这些步骤太麻烦还可以登录阿里云账号点击此处完成一键授权。 2. MaxCompute内置的OSS数据输出handler 2.1 创建External Table MaxCompute非结构化数据框架希望从根本上提供MaxCompute与各种数据的联通这里的“各种数据”是两个维度上的 1.各种存储介质比如OSS 2.各种数据格式 比如文本文件视频图像音频基因气象等格式的数据 而数据的这两个维度的特征都是通过EXTERNAL TABLE的概念来引入MaxCompute的计算体系的。 与读取OSS数据的使用方法类似对OSS数据进行写操作在如上打开安全授权通道后也是先通过CREATE EXTERNAL TABLE语句创建出一个外部表再通过标准MaxCompute SQL的INSERT INTO/OVERWRITE等语句来实现的这里先用MaxCompute内置的TsvStorageHandler为例来说明一下用法 DROP TABLE IF EXISTS tpch_lineitem_tsv_external;CREATE EXTERNAL TABLE IF NOT EXISTS tpch_lineitem_tsv_external ( orderkey BIGINT, suppkey BIGINT, discount DOUBLE, tax DOUBLE, shipdate STRING, linestatus STRING, shipmode STRING, comment STRING ) STORED BY com.aliyun.odps.TsvStorageHandler ----------------------------------------- (1) LOCATION oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/tsv_output_folder/ --(2) 这个DDL语句建立了一个外部表tpch_lineitem_tsv_external并将前面提到的两个维度的外部数据信息关联到这个外部表上。 1.数据存储介质 LOCATION 将一个OSS上的地址关联到外部表上对这个外部表的进行读写操作都会反映到这个OSS地址上。 2.数据存储格式 StorageHandler用来表明对这些数据的读写操作方式这里使用了MaxCompute内置的 com.aliyun.odps.TsvStorageHandler 用户可以使用这个由系统自带的实现来读取和写出TSV文件。 同时用户也可以通过MaxCompute的SDK来自定义StorageHandler, 这个将在后面的章节介绍。 其中OSS数据存储的具体地址的URI格式为 LOCATION oss://${endpoint}/${bucket}/${userPath}/ 最后还要提到的是在上面的DDL语句中定义了外部表的Schema, 对于数据输出而言这表示输出的数据格式将由这个Schema描述。 就TSV格式而言这个schema描述比较直观容易理解 而在用户自定义的输出数据格式上这个schema与输出数据的联系则更松散一些有着更大的自由度。 在后面介绍通过自定义StorageHandler/Outputer的时候会详细展开。 2.2 通过对External Table的 INSERT 操作实现TSV文本文件的写出 在将OSS数据通过External Table关联上后对OSS文件的写出可以对External Table做标准的SQL INSERT OVERWRITE/INSERT INTO来操作。 具体输出数据的来源可以有两种 1.数据源为MaxCompute的内部表 也就是说可以通过对外表INSERT操作来实现MaxCompute内部表数据到外部存储介质的写出。 2.数据源为之前通过External Table引入MaxCompute计算体系的外部数据 这可以用来将外部数据引入MaxCompute进行计算然后再存储到不同的外部存储地址或者甚至是不同的外部存储介质比如将TableStore数据经由MaxCompute导出到OSS。 2.2.1 从MaxCompute内部表输出数据到OSS 这里先来看第一种场景假设我们已经有一个名为tpch_lineitem的MaxCompute内部表其schema可以通过 DESCRIBE tpch_lineitem; 得到 ------------------------------------------------------------------------------------ | InternalTable: YES | Size: 241483831680 | ------------------------------------------------------------------------------------ | Native Columns: | ------------------------------------------------------------------------------------ | Field | Type | Label | Comment | ------------------------------------------------------------------------------------ | l_orderkey | bigint | | | | l_partkey | bigint | | | | l_suppkey | bigint | | | | l_linenumber | bigint | | | | l_quantity | double | | | | l_extendedprice | double | | | | l_discount | double | | | | l_tax | double | | | | l_returnflag | string | | | | l_linestatus | string | | | | l_shipdate | string | | | | l_commitdate | string | | | | l_receiptdate | string | | | | l_shipinstruct | string | | | | l_shipmode | string | | | | l_comment | string | | | ------------------------------------------------------------------------------------ 其中有16个columns。 现在我们希望将其中的一部分数据以TSV格式导出到OSS上面。 那么在用上述DDL创建出External Table之后使用如下INSERT OVERWRITE操作就可以实现 INSERT OVERWRITE TABLE tpch_lineitem_tsv_external SELECT l_orderkey, l_suppkey, l_discount, l_tax, l_shipdate, l_linestatus, l_shipmode, l_commentFROM tpch_lineitemWHERE l_discount 0.07 and l_tax 0.01; 这里将从内部的tpch_lineitem表中在符合l_discount 0.07 并 l_tax 0.01的行中选出8个列对应tpch_lineitem_tsv_external这个外部表的schema按照TSV的格式写到OSS上。 在上面这个INSERT OVERWRITE操作成功完成后就可以看到OSS上的对应LOCATION产生了一系列文件 osscmd ls oss://oss-odps-test/tsv_output_folder/2017-01-14 06:48:27 39.00B Standard oss://oss-odps-test/tsv_output_folder/.odps/.meta 2017-01-14 06:48:12 4.80MB Standard oss://oss-odps-test/tsv_output_folder/.odps/20170113224724561g9m6csz7/M1_0_0-0.tsv 2017-01-14 06:48:05 4.78MB Standard oss://oss-odps-test/tsv_output_folder/.odps/20170113224724561g9m6csz7/M1_1_0-0.tsv 2017-01-14 06:47:48 4.79MB Standard oss://oss-odps-test/tsv_output_folder/.odps/20170113224724561g9m6csz7/M1_2_0-0.tsv ... 这里可以看到通过上面LOCATION指定的oss-odps-test这个OSS bucket下的tsv_output_folder文件夹下产生了一个.odps文件夹这其中将有一些.tsv文件以及一个.meta文件。 这样子的文件结构是MaxCompute(ODPS)往OSS上输出所特有的 1.通过MaxCompute对一个OSS地址使用INSERT INTO/OVERWRITE 外部表来做写出操作所有的数据将在指定的LOCATION下的.odps文件夹产生 2.其中.odps文件夹中的.meta文件为MaxCompute额外写出的宏数据文件其中用于记录当前文件夹中有效的数据。 正常情况下如果INSERT操作成功完成的话可以认为当前文件夹的所有数据均是有效数据。 只有在有作业失败的情况下需要对这个宏数据进行解析。 即使是在作业中途失败或被kill的情况下对于INSERT OVERWRITE操作再跑一次成功即可。 如果对于高级用户一定需要解析.meta文件的话可以联系MaxCompute技术团队。 这里迅速看一下这些tsv文件的内容 osscmd cat oss://oss-odps-test/tsv_output_folder/.odps/20170113232648738gam6csz7/M1_0_0-0.tsv 4236000067 9992377 0.07 0.01 1992-11-06 F RAIL across the ideas nag 4236000290 3272628 0.07 0.01 1998-04-28 O RAIL uriously. furiously unusual dinos int 4236000386 8081402 0.07 0.01 1994-02-19 F RAIL its. express, iron 4236000710 3879271 0.07 0.01 1995-03-10 F AIR es are carefully fluffily spe ... 可以看到确实在OSS上产生了对应的TSV数据文件。 最后大家可能也注意到了这个INSERT OVERWRITE操作产生了多个TSV文件对于MaxCompute内置的TSV/CSV处理来说产生的文件数目与对应SQL stage的并发度是相同的在上面这个例子中INSER OVERWITE … SELECT … FROM …; 的操作在源数据表(tpch_lineitem) 上分配了1000个mapper所以最后产生了1000个TSV文件的。 如果需要控制TSV文件的数目可以配合MaxCompute的各种灵活语义和配置来实现。 比如如果需要强制产生一个TSV文件那在这个特定例子中可以在INSER OVERWITE … SELECT … FROM …最后加上一个DISTRIBUTE BY l_discount, 就可以在最后插入仅有一个Reducer的Reduce stage, 也就会只输出一个TSV文件了 osscmd ls oss://oss-odps-test/tsv_output_folder/2017-01-14 08:03:41 39.00B Standard oss://oss-odps-test/tsv_output_folder/.odps/.meta 2017-01-14 08:03:35 4.20GB Standard oss://oss-odps-test/tsv_output_folder/.odps/20170113234037735gcm6csz7/R2_1_33_0-0.tsv 可以看到在增加了DISTRIBUTE BY l_discount后现在同样的数据只了一个输出TSV文件当然这个文件的size就大多了。 这方面的调控技巧还有很多都是可以依赖SQL语言的灵活性数据本身的特性以及MaxCompute计算相关设置来实现的这里就不深入展开了。 2.2.2 以MaxCompute为计算介质实现不同存储介质之间的数据转移 External Table作为一个MaxCompute与外部存储介质的一个切入点之前已经介绍过对OSS数据的读取以及TableStore数据的操作结合对外部数据读取和写出的功能就可以实现通过External Table实现各种各样的数据计算/存储链路比如 1.读取External Table A关联的OSS数据在MaxCompute上做复杂计算处理并输出到External Table B关联的OSS地址 2.读取External Table A关联的TableStore数据在MaxCompute上做复杂计算处理并输出到External Table B关联的OSS地址 而这些操作与上面数据源为MaxCompute内部表的场景 唯一的区别只是SELECT的来源变成一个External table而不是MaxCompute内置表。 3. 通过自定义StorageHandler来实现数据输出 除了使用内置的StorageHandler来实现在OSS上输出TSV/CSV等常见文本格式MaxCompute非结构化框架提供了通用的SDK允许用户对外输出自定义数据格式文件包括图像音频视频等等。 这种对于用户自定义的完全非结构化数据格式支持也是MaxCompute从结构化/文本类数据的一个向外扩展在这里我们会以一个图像处理的例子来走通整个【OSS-MaxCompute-OSS】数据链路尤其着重介绍对OSS输出文件的功能。 为了方便大家理解这里先提供一个在使用用户自定义代码的场景下数据在MaxCompute计算平台上的流程从上图可以看出从数据的流动和处理逻辑上理解用户可以简单地把非结构化处理框架理解成在MaxCompute计算平台两端有机耦合的的数据导入(Ingres)以及导出(Egress) 1.外部的OSS数据经过非结构化框架转换会使用Java用户容易理解的InputStream类提供给自定义代码接口。 用户自实现Extract逻辑只需要负责对输入的InputStream做读取/解析/转化/计算最终返回MaxCompute计算平台通用的Record格式 2.这些Record可以自由的参与MaxCompute的SQL逻辑运算这一部分计算是基于MaxCompute内置的强大结构化SQL运算引擎并可能产生新的Record 3.运算过后的Record中再传递给用户自定义的Output逻辑用户在这里可以进行进一步的计算转换并最终将Record里面需要输出的信息通过系统提供OutputStream输出由系统负责写到OSS。 值得指出的是这里面所有的步骤都是可以由用户根据需要来进行自由的选择与拼接的。 比如如果用户的输入就是MaxCompute的内部表那步骤1.就没有必要了事实上在前面的章节2中的例子我们就实现了将内部表直接写成OSS上的TSV文件的流程。 同理 如果用户没有输出的需求步骤3. 就没有必要比如我们之前介绍的OSS数据的读取。 最后步骤2.也是可以省略的比如如果用户的所有计算逻辑都是在自定义的Extract/Output中完成没有进行SQL逻辑运算的需要那步骤1.是可以直接连接到步骤3.的。 理解了上面这个数据变换的流程我们就可以来通过一个图像处理例子来看看怎么具体的通过非结构化框架在MaxCompute SQL上完整的实现非结构化数据的读取计算以及输出了 3.1 范例OSS图像文件 - MaxCompute计算处理 - OSS图像输出 这里我们先提供实现这整个【OSS-MaxCompute-OSS】数据链路需要用到的MaxCompute SQL query并做简单的注解详细的用户代码实现逻辑将在后面的3.2子章节中介绍SDK接口的时候做展开解释。 3.1.1 关联OSS上的原始输入图像到External Table: images_input_external DROP TABLE IF EXISTS images_input_external; CREATE EXTERNAL TABLE IF NOT EXISTS images_input_external ( name STRING, width BIGINT, height BIGINT, image BINARY ) STORED BY com.aliyun.odps.udf.example.image.ImageStorageHandler --- (1) WITH SERDEPROPERTIES (inputImageFormatBMP , transformedImageFormat JPG) --- (2) LOCATION oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/dev/SampleData/test_images/mixed_bmp/ --- (3) USING odps-udf-example.jar; --- (4) 说明 1.用户指明使用的用户代码wrapper class名字是com.aliyun.odps.udf.example.image.ImageStorageHandler这个class及其依赖的三方库用户通过jar提供具体jar名字会通过下面的USING语句见第4点指定。 2.通过SERDEPROPERTIES来实现参数传递格式为’key’’value’, 具体用法可以参见基本功能介绍 以及下面的用户代码说明 3.指定输入图像地址这个地址上存放了一系列不同分辨率的bmp图像文件。 4.指定包含用户JAR包内含自定义的StorageHandler/Extractor/Outputer以及需要的三方库这里用到了Java ImageIO库具体见下面用户代码范例。JAR包通过ADD JAR命令上传可以参见基本功能介绍。 另外要说明的是这里指定的External Table的schema就是用户在进行Extract操作后构造的Record格式具体怎么构造这个Schema用户可以根据需要自己根据能从输入数据中抽取到的信息定义。 在这里我们定义了对于输入图片数据会将图片名称图片的长和宽以及图片的二进制bytes抽取出来放进Record见后面的Extractor代码说明所以就有了上面的【STRINGBIGINTBIGINTBINARY】的schema。 3.1.2 关联OSS输出地址到External Table: images_output_external CREATE EXTERNAL TABLE IF NOT EXISTS images_output_external ( image_name STRING, image_width BIGINT, image_height BIGINT, outimage BINARY ) STORED BY com.aliyun.odps.udf.example.image.ImageStorageHandler LOCATION oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/dev/output/images_output/ ---(1) USING odps-udf-example.jar; 说明 可以看到这里创建关联输出图像文件的External Table使用的DDL语句与前面关联输入图像时使用的DDL语句是非常类似的只是LOCATION不一样表明图像数据处理后将输出到另外一个地址。 另外还有一点就是这里我们没有使用SERDEPROPERTIES来进行传参这个只是在这个场景上没有需求在有需求的时候可以用同样的方法把参数传递给outputer。 当然这里两个DDL语句如此相似有一个原因是因为我们这个例子中用户代码中对于Extract出的Record以及输入给Outputer的Record使用了一样的schema, 同时这一对Extractor和Outputer都被封装在了同一个ImageStorageHandler里放在同一个JAR包里。 在实际应用中这些都是可以根据实际需求自己调整的由用户自己选择组合和打包方式。 3.1.3 从OSS读取原始图片数据到MaxCompute, 计算处理并输出图像到OSS 在上面的3.1.1以及3.1.2子章节中的两个DDL语句分别实现了把输入OSS数据以及计划输出OSS数据分别绑定到两个LOCATION以及指定对应的用户处理代码参数等设置。 然而这两个DDL语句对系统而言只是进行了一些宏数据的记录操作并不会涉及具体的数据计算操作。 在这两个DDL语句运行成功后运行如下SQL语句才会引发真正的运算。 换句话说在Fig.1中描述的整个【OSS-MaxCompute-OSS】数据读取/计算/输出链路实际上都是通过下面一个简单的SQL 语句完成的 INSERT OVERWRITE TABLE images_output_external SELECT * FROM images_input_external WHERE width 1024; 这看起来就是一个标准的MaxCompute SQL语句只不过因为涉及了images_output_external和images_input_external这两个外部表所以真正进行的物理操作与传统的SQL操作会有一些区别在这个过程中涉及了读写OSS以及通过ImageStorageHandler这个wrapper调用自定义的ExtractorOutputer代码来对数据进行操作。 下面就来具体看看在这个例子中的用户自定义代码实现了怎样的功能以及具体是如何实现的。 3.2 ImageStorageHandler实现 如同之前介绍过的MaxCompute非结构化框架通过StorageHandler这个接口来描述对各种数据存储格式的处理。 具体来说StorageHandler作为一个wrapper class, 让用户指定自己定义的Exatractor(用于数据的读入解析处理等) 以及Outputer(用于数据的处理和输出等)。 用户自定义的StorageHandler 应该继承 OdpsStorageHandler实现getExtractorClass以及getOutputerClass 两个接口。 通常作为wrapper class, StorageHandler的实现都很简单比如这里的ImageStorageHandler 就只是通过这两个接口指定了我们将使用ImageExtractor以及ImageOutputer: package com.aliyun.odps.udf.example.image;public class ImageStorageHandler extends OdpsStorageHandler {Overridepublic Class!--? extends Extractor-- getExtractorClass() {return ImageExtractor.class;}Overridepublic Class!--? extends Outputer-- getOutputerClass() {return ImageOutputer.class;} } 另外要说明的是如果确定在使用某个StorageHandler的时候只需要用到Extractor或者只需要用到Outputer功能那不需要的接口则不用实现。 比如如果我们只需要读取OSS数据而不需要做INSERT操作那getOutputerClass()的实现只需要扔个NotImplemented exception就可以了不会被调用到。 3.3 ImageExtractor实现 因为对于SDK中Extractor接口的介绍以及对用户如何写一个自定义的Extractor在之前介绍的OSS数据的读取中已经有所涉及所以这里就不再对这方面做深入的介绍。 Extractor的工作在于读取输入数据并进行用户自定义处理那么我们首先来看看这里由images_input_external这个外表绑定的OSS输入LOCATION上存放的具体数据内容 osscmd ls oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/ 2017-01-09 14:02:01 1875.05KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/barbara.bmp 2017-01-09 14:02:00 768.05KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/cameraman.bmp 2017-01-09 14:02:00 1054.74KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/fishingboat.bmp 2017-01-09 14:01:59 257.05KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/goldhill.bmp 2017-01-09 14:01:59 468.80KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/house.bmp 2017-01-09 14:01:59 468.80KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/jetplane.bmp 2017-01-09 14:02:01 2.32MB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/lake.bmp 2017-01-09 14:01:59 257.05KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/lena.bmp 2017-01-09 14:02:00 768.05KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/livingroom.bmp 2017-01-09 14:02:00 768.05KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/pirate.bmp 2017-01-09 14:02:00 768.05KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/walkbridge.bmp 2017-01-09 14:02:00 1054.74KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/woman_blonde.bmp 2017-01-09 14:02:00 768.05KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/woman_darkhair.bmp 可以看到这个LOCATION存放了一系列bmp图像数据分辨率从 400 x 400 到 1200 x 1200不等。 具体在这个例子中用到的ImageExtractor的详细代码在github上可以找到, 这里只做一些简单介绍说明该Extractor做了些什么工作 1.从输入的OSS地址上使用非结构化框架提供的InputStream接口读取图像数据并在本地进行如下操作 对于图像宽度小于1024的图片统一放大到1024 x 1024; 对于图像宽度大于1024的图片跳过不进行处理 处理过的图片在内存中转存成由输入参数指定的格式JPG) 2.把处理后在内存中的JPG数据的原始字节存入输出的Record中的BINARY field, 同一个Record中还将存放处理后图像的长和宽都是1024, 以及原始的图像名字这个可以从输入的InputStream上获取 3.填充后的Record从Extract接口返回进入MaxCompute系统 4.在这个过程中用户可以灵活的进行各种操作比如额外的参数验证等。 另外要说明的是目前Record作为MaxCompute结构化数据处理的基本单元有一些额外的限制比如BINARY/STRING类型都有8MB大小的限制但是在大部分场景下这个大小应该是能满足存储需求的。 3.4 ImageOutputer的实现 接下来我们着重讲一下ImageOutputer的实现。 首先所有的用户输出逻辑都必须实现Outputer接口具体来说有如下三个setup, output和close, 这和Extractor的setup, extract和close三个接口基本上是对称的。 // Base outputer class, custom outputer shall extend from this class public abstract class Outputer{public abstract void setup(ExecutionContext ctx, OutputStreamSet outputStreamSet, DataAttributes attributes);public abstract void output(Record record) throws IOException;public abstract void close() throws IOException; } 这其中setup()和close()在一个outputer中只会调用一次。 用户可以在setup里面做初始化准备工作另外通常需要把setup()传递进来的这三个参数保存成ouputerd的class variable, 方便之后output()或者close()接口中使用。 而close()这个接口用于方便用户代码的扫尾工作。 通常情况下大部分的数据处理发生在output(Record)这个接口内。 MaxCompute系统会根据当前outputer分配处理的Record数目不断调用也就是对每个输入Record系统会调用一次 output(Record)。 系统假设在一个output(Record) 调用返回的时候用户代码已经消费完这个Record, 因此在当前output(Record)返回后系统可能将这个Record所使用的内存用作它用 所以不推荐一个Record中的信息在跨多个output()函数调用被使用如果一定有这个需求的话用户必须把相关信息通过class variable等方式自行另外保存。 3.4.1 ImageOutputer.setup() setup用于初始化整个outputer, 在这个接口上提供了整个outputer操作过程中可能需要的参数 ExecutionContext: 用于提供一些系统信息和接口比如读取resource等在ImageOutputer这个例子中我们没有用到这个参数OutputStreamSet: 用户可以从这个类的next()接口获取对外输出所需要的OutputStream具体用法我们在下面详细介绍DataAttributes: 用户通过SERDEPROPERTIES设置的key-value参数可以通过这个类获取参数获取这里ImageOutputer例子中没有用到但是Extractor上的setup参数中也有这个类在上面的ImageExtractor用到了改功能可以参考一下。 同时这个类上面还提供了一些helper接口比如方便用户验证schema等。 在我们这个ImageOutputer里setup()的实现比较简单 Overridepublic void setup(ExecutionContext ctx, OutputStreamSet outputStreamSet, DataAttributes attributes) {this.outputStreamSet outputStreamSet;this.attributes attributes;this.attributes.verifySchema(new OdpsType[]{ OdpsType.STRING, OdpsType.BIGINT, OdpsType.BIGINT, OdpsType.BINARY });} 只是做了简单的初始化以及对schema的验证。 3.4.2 ImageOutputer.output(Record) 以及 OutputStreamSet的使用 在介绍具体output()接口之前首先我们要来看看 OutputStreamSet, 这个类有两个接口 public interface OutputStreamSet{SinkOutputStream next(); SinkOutputStream next(String fileNamePostfix);} 两个接口都是用来获取一个新的SinkOutputStream(一个Java OutputStream的实现可以按照OutputStream使用)两个接口唯一的区别是next()获取的OutputStream写出的文件名完全由MaxCompute系统决定而next(String fileNamePostfix)则允许用户提供文件名的postfix。 提供这个postfix的意义是在输出文件具体地址和名字格式总体由MaxCompute系统决定的前提下用户依然可以定制一个方便理解的postfix。 比如使用next(“_boat.jpg”) 得到的OutputStream可能对应如下一个输出文文件 oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-0_boat.jpg 这其中尾端的”_boat.jpg”可以帮助用户理解输出文件的涵义。 如果这个 OutputStream是由next()获得的话那对应的输出文件可能就是这样的 oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-0 用户可能就需要具体读取这个文件才能知道这个文件中具体存放了什么内容。 前面提到output(Record)这个接口会由系统不断调用但是应该强调的是并不一定在每一个Record都需要调用一次OutputStreamSet.next()接口来获得一个新的OutputStream。 事实上在大多数情况下我们建议在一个Outputer里面尽可能减少调用next()的次数最好只调用一次。 也就是说理想情况下一个outpuer只应该产生一个输出文件。 比如处理TSV这种文本格式文件假设有5000个Record对应5000行TSV数据那么最理想的情况是应该把这5000行数据全部写到一个TSV文件中。 当然用户可能会有各种各样不同的切分输出文件的需求比如希望每个文件大小控制在一定范围或比如文件的边界有显著的意义等等。 具体到当前这个图像例子从下面的ImageOutputer代码实现中可以看出这个例子中确实是处理每个Record就调用一次next()的因为在当前场景中每一个输入的Record都表示一张图片的信息binary bytes, 图像名字图像长宽所以这里通过多次调用next()来输出多个图片文件。 但是我们还是需要再次强调调用next()的次数过多可能有一些其他弊端比如造成碎片化小数据在OSS上的存储等等。 尤其在MaxCompute这种分布式计算系统上因为系统本身就会调度起多个outputer进行并行计算处理如果每个outpuer都输出过多文件的话最后产生的文件数目会有一个乘性效应。 回头来看我们这个例子中即使在这里多个图像其实也可以通过一个OutputStream按照tar/tar.gz的方式写到单个文件中这些都是在实现具体系统中用户需要根据自己的场景, 以及处理逻辑输出数据类型等信息来进行优化和tradeoff的。 在理解了这些之后现在来具体看看ImageOutputer的实现output接口实现Overridepublic void output(Record record) throws IOException {String name record.getString(0);Long width record.getBigint(1);Long height record.getBigint(2);ByteArrayInputStream input new ByteArrayInputStream(record.getBytes(3));BufferedImage sobelEdgeImage getEdgeImage(input);OutputStream outputStream this.outputStreamSet.next(name _ width x height . outputFormat);ImageIO.write(sobelEdgeImage, this.outputFormat, outputStream);} 可以看到这里主要就做了三件事情 1. 根据之前保存的图像名字长宽信息和编码方式(“.jpg”)拼出一个带扩展名的输出文件名postfix。 2. 读取图像binary bytes并用getEdgeImage()来利用sobel算子对图像做边缘检测。 具体getEdgeImage()的实现这里就不进行深入解释了 使用了标准的sobel模板卷积算法 有兴趣看ImageOutputer源码即可。 3. 对每一个图像产生一个新的OutputStream并将数据写出至此当前Record处理完毕写出一张图片到OSSoutput()函数返回。 3.4.3 ImageOutputer.close() 在这个例子中outputer.close()接口没有包含具体的实现逻辑是个no-op。 至此我们就介绍完了一个output的实现现在可以看看在运行完这个SQL query对应OSS地址的数据 osscmd ls oss://oss-odps-test/dev/output/images_output/ 2017-01-15 14:36:50 215.19KB Standard oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-0-barbara_1024x1024.jpg 2017-01-15 14:36:50 108.90KB Standard oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-1-cameraman_1024x1024.jpg 2017-01-15 14:36:50 169.54KB Standard oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-2-fishingboat_1024x1024.jpg 2017-01-15 14:36:50 214.94KB Standard oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-3-goldhill_1024x1024.jpg 2017-01-15 14:36:50 71.00KB Standard oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-4-house_1024x1024.jpg 2017-01-15 14:36:50 126.50KB Standard oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-5-jetplane_1024x1024.jpg 2017-01-15 14:36:50 169.63KB Standard oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-6-lake_1024x1024.jpg 2017-01-15 14:36:50 194.18KB Standard oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-7-lena_1024x1024.jpg ... 可以看到图像数据按照期待格式写到了指定地址这里我们就选一个输入图像(lena.bmp)以及对应的输出图像(M1_0_-1–1-7-lena_1024x1024.jpg)看一下对比这个例子中整个图像处理流程已经通过如上的SQL query完成。 而从上面展示的ImageExtractor以及ImageOutputer 源代码我们可以看出整个过程中用户的逻辑基本与写单机图像处理程序无异用户的代码只需要在Extractor上做InputStream到Record的准换而在Outputer上做反向的Record到OutputSteam的写出处理其他核心的处理逻辑实现基本和单机算法实现相同在用户的层面并不用去操心底层分布式系统的细节以及MaxCompute和OSS的交互。 3.5 数据处理步骤的灵活性 从上面这个例子中我们也可以看出在一个完整的【OSS-MaxCompute-OSS】数据流程中Extractor和Outputer中涉及的具体计算逻辑其实也并不一定会有一个非常明确的边界。 Extractor和Outputer只要各自完成所需的转换Record/Stream的转换具体的额外算法逻辑在两个地方都有机会完成。 比如上面这个例子的整个流程涉及了如下图像处理相关的运算 1.图像的缩放 统一到 1024 x 1024 2.图像格式的转换 (BMP - JPG) 3.图像的Sobel边缘检测 上面的例子实现中把1. 和 2. 放在ImageExtractor中完成而3.则放在ImageOutputer中完成但并不是唯一的选择。 我们完全可以把所有3个步骤都放在ImageExtractor中完成让ImageOutputer只做Record到写出最后图像的操作也可以在ImageExtractor中只做读取原始binary到Recrod, 而把所有3个图像处理步骤都放在ImageOutputer中进行等等。 具体进行怎样的选择用户可以完全根据需要自己实现。 另外一个系统设计的点是如果对于一个数据需要做重复的运算那可以考虑将数据从OSS中通过Extractor读出进MaxCompute然后存储成MaxCompute的内置表格再进行多次的计算。 这个对于MaxCompute和OSS没有进行混布不在一个物理网络上的场景尤其有意义 MaxCompute从内置表中读取数据无疑要比从外部OSS存储服务中读出数据要有效得多。 在上面3.1.3子章节中的图像处理例子这个INSER OVERWITE操作 INSERT OVERWRITE TABLE images_output_external SELECT * FROM images_input_external WHERE width 1024; 就可以改写成两个分开的语句 INSERT OVERWRITE TABLE images_internal SELECT * FROM images_input_external WHERE width 1024;INSERT OVERWRITE TABLE images_output_external SELECT * FROM image_internal; 通过把数据写到一个内部images_internal表中后面如果有多次读取数据的需求的话就可以不再去访问外部OSS了。 这里也可以看到MaxCompute非结构化框架以及SQL语法本身提供了非常高的灵活性和可扩展性用户可以根据实际计算的不同模式/场景/需求来在上面完成各种各样的数据计算工作流。 结语 非结构化数据处理框架随着MaxCompute 2.0一起推出意在丰富MaxCompute平台的数据处理生态来打通阿里云核心计算平台与阿里云各个重要存储服务之间的数据链路。 在之前介绍过的读取OSS以及处理TableStore数据的整体方案后本文侧重介绍数据往OSS的输出方案并依托一个图像处理的处理实例展示了【OSS-MaxCompute-OSS】整个数据链路的实现。 在这些新功能的基础上我们希望实现整个阿里云计算与数据的生态融合 在不同的项目上我们已经看到了在MaxCompute上处理OSS上的海量视频图像等非结构化数据的巨大潜力。 今后随着这个生态的丰富我们期望OSS数据TableStore数据以及MaxCompute内部存储的数据都能在MaxCompute的核心计算引擎上进行融合从而产生更大的价值。 原文链接 干货好文请关注扫描以下二维码
http://www.zqtcl.cn/news/463715/

相关文章:

  • 网站建设策划书1万字深圳公司网站设计企业
  • 建设企业网站小微asp iis设置网站路径
  • 分类信息网站营销小程序appid是什么
  • 营销软文是什么意思网络seo培训
  • 效果好的手机网站建设成都网站制作报价
  • 江门网站建设推广平台注册公司费用要多少
  • 淄博哪家公司做网站最好新手做地方门户网站
  • 做一个交易平台网站的成本深圳南山做网站的公司
  • 网站建设的开发的主要方法aspcms分类信息网站
  • 中国免费图片素材网站烟台电商网站开发
  • 网站框架图浅谈网站的主色调设计
  • asp.net网站iis与目录权限设置做网站前端用什么软件好
  • 网站后台图片模板前端作业做一个网站
  • 做兼职的翻译网站吗教育直播网站开发
  • pxhere素材网站电子商务的网站开发的工作内容
  • 邮件网站怎么做wordpress如何代码高亮
  • 电脑做视频的网站吗中小学 网站建设 通知
  • 给企业做网站赚钱吗吉 360 网站建设
  • 网站建设多少价格东莞网站推广团队
  • 做课件的软件下载带有蓝色的网站html网页制作代码实例
  • 建设银行鄂州分行官方网站健身网站开发方式
  • 大连免费建站模板花坛设计平面图
  • 建设网站对企业有什么好处wordpress教程视频下载
  • 郑州网站提升排名上海 企业 网站建设
  • 南昌好的做网站的公司营销型网站 案例
  • 南宁经典网站建设网络运维工程师是干什么的
  • 网站开发算法建网站难不难
  • 茂名模板建站定制网站开发 ide
  • 做网站现在用什么语言网站估价
  • wap开头的网站外贸网站建设官网