php网站开发图片,西安好玩的地方排行榜,上海网站维护,深圳网站设计go摘要#xff1a; PyODPS 中使用 Python UDF 包含两方面#xff0c;一个是直接使用#xff0c;也就是在 MaxCompute SQL 中使用#xff1b;一个是间接的方式#xff0c;也就是 PyODPS DataFrame#xff0c;这种方式你不需要直接写 Python UDF#xff0c;而是写普通的 Pyt…摘要 PyODPS 中使用 Python UDF 包含两方面一个是直接使用也就是在 MaxCompute SQL 中使用一个是间接的方式也就是 PyODPS DataFrame这种方式你不需要直接写 Python UDF而是写普通的 Python 函数或者类。
点此查看原文http://click.aliyun.com/m/41092/
PyODPS 中使用 Python UDF 包含两方面一个是直接使用也就是在 MaxCompute SQL 中使用一个是间接的方式也就是 PyODPS DataFrame这种方式你不需要直接写 Python UDF而是写普通的 Python 函数或者类。下面我们分开说明。
作为准备工作我们需要 ODPS 入口可以通过直接初始化或者使用 room 机制 加载。
from odps import ODPSo ODPS(your-access-id, your-access-key, your-project)
MaxCompute SQL 中使用 Python UDF
首先我们需要写一个 Python 文件假设我们就是把某一列按 csv 格式放的一列转成 json 格式。
import jsonfrom odps.udf import annotateannotate(string-string)
class Transform(object):def evaluate(self, x):columns list(abc)d dict(zip(columns, x.split(,)))return json.dumps(d)
假设这个文件叫 my.py接下来我们就需要创建 py 资源。
r o.create_resource(csv_to_json.py, py, fileobjopen(my.py))
fileobj 参数也可以是 str 类型就是表示文件的内容
接着我们就可以创建 Python UDF 了。
o.create_function(csv_to_json, class_typecsv_to_json.Transform, resources[r])
这里我们指定了函数名叫 csv_to_json主类使我们上传的 csv_to_json.py 文件里的 Transform 类。
现在我们就可以在 MaxCompute SQL 中调用这个 UDF 了。
o.execute_sql(select csv_to_json(raw) from pyodps_test_udf)
这样我们就完成了在 PyODPS 中使用 MaxCompute SQL Python UDF 的整个过程。
PyODPS DataFrame
对于 PyODPS DataFrame 来说用户只需要写普通的 Python 函数或者类在函数或者类里甚至可以读取全局变量这样给开发带来了极大的方便。
和上面的例子目标相同我们定义一个 transform 函数即可。然后我们对于 DataFrame 的一列调用 map 方法来应用这个函数。
passed_columns list(abc) # 可以从数据库中读取或者写死def transform(x):import jsond dict(zip(passed_columns, x.split(,)))return json.dumps(d)df.raw.map(transform)
In [30]: dfraw
0 1,2,3
1 4,5,6
2 7,8,9In [31]: df.raw.map(transform)raw
0 {a: 1, c: 3, b: 2}
1 {a: 4, c: 6, b: 5}
2 {a: 7, c: 9, b: 8}
实际上PyODPS DataFrame 在用 MaxCompute 执行的时候也会创建 Python UDF 来实现这个功能但用户不需要去创建文件、资源和函数这些过程一切都是 Python 原生函数和类整个过程相当顺畅。
另外可以看到在上面的 my.py 里我们也是定义了一个 columns 参数的而如果这个参数是通过变量传进去的话在 Python UDF 里非常麻烦可能常常需要用一些 tricky 的方法比如写到某个文件资源然后在 UDF 里读取之类的。而对于 DataFrame 来说完全没有这个问题我们可以自由读取全局变量。
不过要注意的是这个全局变量是被序列化到各个机器上的所以你修改它不会全局生效。
好了还有什么问题可以随时和我们取得联系。
文档http://pyodps.readthedocs.io/zh_CN/latest/ 代码https://github.com/aliyun/aliyun-odps-python-sdk 欢迎提 issue 和 merge request