家装博览会,seo排名点击器曝光行者seo,wordpress 菜单居中,短视频app有哪些前言
pyspark中很多常用的功能#xff0c;过段时间没有使用就容易忘记#xff0c;需要去网上搜索#xff0c;这里总结一下#xff0c;省的以后还去去搜#xff0c;供自己以后参考。
withColumn
def hot_func(info_str):if info_str:eturn 1return 0过段时间没有使用就容易忘记需要去网上搜索这里总结一下省的以后还去去搜供自己以后参考。
withColumn
def hot_func(info_str):if info_str:eturn 1return 0
df df.withColumn(is_hot, F.udf(hot_func, StringType())(F.col(your_col_name)))自定义函数
from pyspark.sql.functions import udf
# 定义并注册函数
udf(returnTypeStringType())
def f_parse_category(info):x json.loads(info)[category]return x if x is not None else
spark.udf.register(f_parse_category, f_parse_category)
# 在sql中使用注册的函数
sql
select *, f_parse_category(info) category,
from your_table
where info is not null df spark.sql(sql).cache()groupby处理
按groupby处理保留goupby字段并对groupby的结果处理。正常情况下使用df.groupBy即可但需要处理多列并逻辑较为复杂时可以使用这种方式。
from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType
from pyspark.sql.types import StructField, LongType, StringType, StructType
from collections import Counterpattern re.compile(r\b\w(?: |.join([_size, _sum]) r)\b)group_cols [category]
value_cols [sales_sum, stat_size]schema StructType( [StructField(col, LongType()) if len(re.findall(pattern, col))0 else StructField(col, StringType()) for col in group_colsvalue_cols],)pandas_udf(schema, functionTypePandasUDFType.GROUPED_MAP)
def group_stat(df):# 获取l [df[item].iloc[0] for item in group_cols]df df[[col for col in df.columns if col not in group_cols]]sales_sum df[sales].sum().item()stat_size len(df)# d: {key: value}df[first_attr] df[attr].transform(lambda d: list(json.loads(d).keys())[0])attr_dict json.dumps({k:v for k, v in Counter(df[first_attr].value_counts().to_dict()).most_common()}, ensure_ascii0)counter sum(df[brand_name].apply(lambda x:Counter(json.loads(x))), Counter())ct len(counter)brand_list df[brand].to_list()values [sales_sum, stat_size, attr_dict, ct, infobox_brand_stat, brand_list]return pd.DataFrame([l values])# df 包含字段category sales attr brand_name brand
df df.groupby(group_cols).apply(group_stat).cache()
patition By orderBy
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, dense_rank
# 根据department分区然后按salary排序编号
windowSpec Window.partitionBy(department).orderBy(salary)
df.withColumn(row_number,row_number().over(windowSpec)) \.show(truncateFalse)
# dense_rank: 相同值排序编号一致sql的方式
select name, category, sales, DENSE_RANK() OVER (PARTITION BY category ORDER BY b.sales DESC) as sales_rank
from your_tbdataframe转正rdd处理行
该中情况一般在需要处理过个行的情况下使用如果是少数的行处理可以使用withColumn
def hot_func(info_str):if info_str:eturn 1return 0
df df.withColumn(is_hot, F.udf(hot_func, StringType())(F.col(your_col_name)))转为rdd的处理方式为
def gen_norm(row):# 转为字段处理row_dict row.asDict(recursiveTrue)process_key row_dict[key]row_dict[process_key] process_keyreturn Row(**row_dict)
# sampleRatio0.01 为推断列类型的抽样数据比例
df df.rdd.map(gen_norm).toDF(sampleRatio0.01).cache()
df.show()