网站seo诊断书,如何进入公众号,企业网站开发报价,做啥网站赚钱文章目录 Spark和PySpark概述1.1 Spark简介1.2 PySpark简介 二 基础准备2.1 PySpark库的安装2.2 构建SparkContext对象2.3 SparkContext和SparkSession2.4 构建SparkSession对象2.5 PySpark的编程模型 三 数据输入3.1 RDD对象3.2 Python数据容器转RDD对象3.3 读取文件转RDD对象… 文章目录 Spark和PySpark概述1.1 Spark简介1.2 PySpark简介 二 基础准备2.1 PySpark库的安装2.2 构建SparkContext对象2.3 SparkContext和SparkSession2.4 构建SparkSession对象2.5 PySpark的编程模型 三 数据输入3.1 RDD对象3.2 Python数据容器转RDD对象3.3 读取文件转RDD对象 四 数据计算4.1 map算子4.2 flatMap算子4.3 reduceByKey算子4.4 使用SparkSession可能出现的问题4.5 filter算子4.6 distinct算子4.7 sortBy算子4.8 综合案例 五 数据输出5.1 输出为python对象5.1.1 collect算子5.1.2 reduce算子5.1.3 take算子5.1.4 count算子 5.2 输出到文件5.2.1 环境准备5.2.2 saveAsTextFile算子5.2.3 coalesce/repartition调整分区 六 综合案例搜索引擎日志分析6.1 需求6.2 参考代码6.3 输出结果 Spark和PySpark概述
1.1 Spark简介 PySpark是Apache Spark的Python库Apache Spark是一个开源的大数据处理框架。Apache Spark旨在进行大规模数据处理和分析并提供统一的API用于批处理、实时流处理、机器学习和图处理。 PySpark允许开发人员使用Python编程语言与Spark交互使更多习惯于Python而不是传统的Scala或Java的数据工程师、数据科学家和分析师能够使用Spark。 简单来说Spark是一款分布式的计算框架用于调度成百上千的服务器集群计算TB、PB乃至EB级别的海量数据
1.2 PySpark简介
PySpark是由Spark官方开发的Python语言第三方库。 Python开发者可以使用pip程序快速的安装PySpark并像其它三方库那样直接使用。
二 基础准备
2.1 PySpark库的安装 PySpark同样可以使用pip程序进行安装。 在”CMD”命令提示符程序内输入
pip install pyspark或使用国内代理镜像网站清华大学源
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark2.2 构建SparkContext对象
想要使用PySpark库完成数据处理首先需要构建一个执行环境入口对象。PySpark的执行环境入口对象是类SparkContext 的类对象使用PySpark库来创建和管理一个Spark应用程序的基本示例
# 导入必要的模块
from pyspark import SparkConf, SparkContext
# 创建SparkConf类对象
conf SparkConf().setMaster(local[*]).setAppName(test_spark_app)
# 基于SparkConf类对象创建SparkContext对象
sc SparkContext(confconf)
# 打印PySpark的运行版本
print(sc.version)
# 停止SparkContext对象的运行停止PySpark程序
sc.stop()导入必要的模块
from pyspark import SparkConf, SparkContextpyspark是PySpark库的主要模块SparkConf和SparkContext是两个重要的类用于配置和管理Spark应用程序
创建SparkConf对象
conf SparkConf().setMaster(local[*]).setAppName(test_spark_app)SparkConf对象用于配置Spark应用程序的各种属性.setMaster(local[*])设置了Spark的运行模式为本地模式其中local[*]表示使用所有可用的本地处理核心.setAppName(test_spark_app)设置了应用程序的名称为test_spark_app
创建SparkContext对象
sc SparkContext(confconf)基于前面创建的SparkConf对象创建SparkContext对象它是与Spark集群通信的主要入口点。通过此对象您可以在集群上创建RDD弹性分布式数据集并执行各种操作。
当前SparkContext对象所使用的Spark版本
print(sc.version)停止SparkContext对象
sc.stop()2.3 SparkContext和SparkSession
SparkContext 和 SparkSession 都是 Apache Spark 中用于创建和管理 Spark 应用程序的关键组件但它们在 Spark 版本 2.0 之后的引入中扮演了不同的角色。
SparkContext: SparkContext简称为 sc是 Spark 的早期上下文对象它是连接 Spark 集群的主要入口点。在 Spark 2.0 之前的版本中开发人员使用 SparkContext 来创建 RDD、广播变量和累加器等以及执行各种操作。它充当了应用程序与 Spark 集群之间的中间人负责管理与集群的通信任务调度数据分布等。 在 Spark 2.0 之后随着 SparkSession 的引入SparkContext 仍然可用但更多的功能被集成到了 SparkSession 中因此在新的应用程序中更常用 SparkSession。
SparkSession: SparkSession 是在 Spark 2.0 中引入的新概念它是创建 Spark 应用程序的主要入口点。SparkSession 继承自 SparkContext并将许多原来与 SparkContext 相关的功能整合到一个对象中包括创建 RDD、DataFrame 和 Dataset以及执行 SQL 查询和 Spark 应用程序的配置。它还提供了一个更统一的 API使得在 Spark 中处理不同类型的数据更加便捷。 通过 SparkSession可以使用 DataFrame API 来处理结构化数据进行 SQL 查询还可以使用 RDD API 进行更底层的操作。此外SparkSession 也提供了与 Hive 集成的功能可以在 Spark 中执行 Hive 查询。
总结
SparkContext 是早期的 Spark 上下文负责连接 Spark 集群和管理任务。它在 Spark 2.0 后仍然可用但在新应用程序中更常用 SparkSession。SparkSession 是 Spark 2.0 引入的主要上下文继承了 SparkContext 的功能并在其基础上整合了更多功能用于创建和管理 Spark 应用程序处理不同类型的数据和执行查询。
2.4 构建SparkSession对象
以下是创建SparkSession的简单示例
from pyspark.sql import SparkSession
# 设置本地python解释器 请注意修改为自己的python解释器目录
import os
os.environ[PYSPARK_PYTHON] C:\environment\Python3.11.4\python.exe# 创建一个SparkSession
spark SparkSession.builder \.appName(PySparkExample) \.getOrCreate()# 将数据加载到DataFrame中
data [(Alice, 25), (Bob, 30), (Charlie, 22)]
columns [Name, Age]
df spark.createDataFrame(data, columns)# 执行DataFrame操作
df.show()2.5 PySpark的编程模型
SparkContext类对象是PySpark编程中一切功能的入口。 PySpark的编程主要分为如下三大步骤
数据输入通过SparkContext完成数据读取数据计算读取到的数据转换为RDD对象调用RDD的成员方法完成计算数据输出调用RDD的数据输出相关成员方法将结果输出到list、元组、字典、文本文件、数据库等 三 数据输入
3.1 RDD对象
PySpark支持多种数据的输入在输入完成后都会得到一个RDD类的对象RDD全称为弹性分布式数据集Resilient Distributed DatasetsPySpark针对数据的处理都是以RDD对象作为载体即 数据存储在RDD内各类数据的计算方法也都是RDD的成员方法RDD的数据计算方法返回值依旧是RDD对象
3.2 Python数据容器转RDD对象
PySpark支持通过SparkContext对象的parallelize成员方法将list、tuple、set、dict、str转换为PySpark的RDD对象
from pyspark import SparkConf, SparkContextconf SparkConf().setMaster(local[*]).setAppName(test_spark)
sc SparkContext(confconf)# 通过parallelize方法将Python对象加载到Spark内成为RDD对象
rdd1 sc.parallelize([1, 2, 3, 4, 5]) #list
rdd2 sc.parallelize((1, 2, 3, 4, 5)) #元组
rdd3 sc.parallelize(abcdefg) #字符串
rdd4 sc.parallelize({1, 2, 3, 4, 5}) #集合
rdd5 sc.parallelize({key1: value1, key2: value2}) # 字典# 如果要查看RDD里面有什么内容需要用collect()方法
print(rdd1.collect())
print(rdd2.collect())
print(rdd3.collect())
print(rdd4.collect())
print(rdd5.collect())
sc.stop()3.3 读取文件转RDD对象
PySpark也支持通过SparkContext入口对象来读取文件来构建出RDD对象。 演示通过PySpark代码加载数据即数据输入from pyspark import SparkConf, SparkContextconf SparkConf().setMaster(local[*]).setAppName(test_spark)
sc SparkContext(confconf)#用过textFile方法读取文件数据加载到Spark内成为RDD对象
rdd sc.textFile(C:/hello.txt)
print(rdd.collect())
rdd.map()
sc.stop()四 数据计算
PySpark的数据计算都是基于RDD对象来进行依赖RDD对象内置丰富的成员方法算子进行实现
4.1 map算子 功能map算子是将RDD的数据一条条处理处理的逻辑基于map算子中接收的处理函数返回新的RDD map 是 PySpark 中的一个转换操作用于对 RDD 中的每个元素应用一个函数并返回一个新的 RDD其中包含应用函数后的结果。 链式调用对于返回值是新RDD的算子可以通过链式调用的方式多次调用算子 其基本语法如下
new_rdd old_rdd.map(lambda x: function(x))old_rdd: 原始的 RDD即要进行转换的数据集。 new_rdd: 经过 map 转换后生成的新 RDD。 lambda x: function(x): 这是一个匿名函数lambda 函数它定义了应用于每个元素 x 的转换操作。你可以根据需要定义任何适当的操作和函数。 x: 表示 RDD 中的每个元素function(x) 就是将该函数应用于元素 x 的结果 使用 map 算子将 RDD 中的每个元素进行平方处理
from pyspark.sql import SparkSession
# 设置本地python解释器 请注意修改为自己的python解释器目录
import os
os.environ[PYSPARK_PYTHON] C:\environment\Python3.11.4\python.exe
# 创建 SparkSession
spark SparkSession.builder.appName(RDDMapExample).getOrCreate()# 创建一个 RDD
data [1, 2, 3, 4, 5]
rdd spark.sparkContext.parallelize(data)# 使用 map 算子对 RDD 中的每个元素进行平方
squared_rdd rdd.map(lambda x: x**2)
# 支持链式调用
# squared_rdd rdd.map(lambda x: x**2).map(lambda x: x 5)
# 收集结果并打印
result squared_rdd.collect()
print(result)# 停止 SparkSession
spark.stop()
使用 map 转换操作对一个包含字符串的 RDD 进行转换将每个字符串转换为大写形式
from pyspark.sql import SparkSession
# 设置本地python解释器 请注意修改为自己的python解释器目录
import os
os.environ[PYSPARK_PYTHON] C:\environment\Python3.11.4\python.exe# 创建 SparkSession
spark SparkSession.builder.appName(MapExample).getOrCreate()# 创建一个包含字符串的 RDD
data [hello, world, pyspark, map]
rdd spark.sparkContext.parallelize(data)# 使用 map 转换操作将每个字符串转换为大写形式
upper_case_rdd rdd.map(lambda x: x.upper())# 收集结果并打印
result upper_case_rdd.collect()
print(result)# 停止 SparkSession
spark.stop()
4.2 flatMap算子 flatMap 是 PySpark 中的另一个转换操作类似于 map但是它会将每个输入元素映射为零个或多个输出元素然后将所有输出元素组合成一个扁平的 RDD。 flatMap 可以用于在一个 RDD 中对每个元素应用一个函数并生成多个结果元素最终将这些结果组合成一个新的 RDD。简单说对rdd执行map操作然后进行解除嵌套操作 flatMap 的语法如下 new_rdd old_rdd.flatMap(lambda x: function(x))old_rdd 是要进行转换的原始 RDDfunction 是一个应用于每个元素的函数x 是 RDD 中的每个元素。flatMap 算子会将 function(x) 应用于每个元素并返回一个或多个结果元素然后将所有结果元素扁平化为一个新的 RDD。 使用 flatMap 转换操作对一个包含单词列表的 RDD 进行转换将每个单词拆分为字符并生成一个扁平的字符列表
from pyspark.sql import SparkSession
# 设置本地python解释器 请注意修改为自己的python解释器目录
import os
os.environ[PYSPARK_PYTHON] C:\environment\Python3.11.4\python.exe# 创建 SparkSession
spark SparkSession.builder.appName(FlatMapExample).getOrCreate()# 创建一个包含单词列表的 RDD
data [hello world, pyspark flatMap, example]
rdd spark.sparkContext.parallelize(data)# 使用 flatMap 转换操作将每个单词拆分为字符
character_list_rdd rdd.flatMap(lambda line: line.split())# 收集结果并打印
result character_list_rdd.collect()
print(result)
# [hello, world, pyspark, flatMap, example]# 停止 SparkSession
spark.stop()4.3 reduceByKey算子
reduceByKey 是 PySpark 中的一个转换操作用于对键值对形式的 RDD 进行聚合计算。它按照键对 RDD 中的值键值对中的键进行分组并应用一个指定的聚合函数来合并每个键对应的值。 这个操作在执行分布式计算时特别有用因为它能够将具有相同键的数据分布在同一个分区上从而减少数据传输和处理开销。 reduceByKey 的基本语法如下 new_rdd old_rdd.reduceByKey(lambda x, y: function(x, y))old_rdd 是原始的键值对形式的 RDDfunction 是一个应用于每对值的聚合函数x 和 y 是每对值的两个元素。reduceByKey 算子将具有相同键的值进行聚合将聚合函数应用于每对值并返回一个新的键值对形式的 RDD。 使用 reduceByKey 转换操作对一个包含单词计数的 RDD 进行聚合计算 from pyspark.sql import SparkSession
# 设置本地python解释器 请注意修改为自己的python解释器目录
import os
os.environ[PYSPARK_PYTHON] C:\environment\Python3.11.4\python.exe# 创建 SparkSession
spark SparkSession.builder.appName(ReduceByKeyExample).getOrCreate()# 创建一个包含单词的 RDD
data [hello, world, hello, pyspark, world]
rdd spark.sparkContext.parallelize(data)# 将单词映射为键值对并使用 reduceByKey 进行单词计数
word_counts_rdd rdd.map(lambda word: (word, 1)).reduceByKey(lambda x, y: x y)# 收集结果并打印
result word_counts_rdd.collect()
print(result)# 停止 SparkSession
spark.stop()输出结果 [(hello, 2), (world, 2), (pyspark, 1)]注意SparkContext 是 Spark 的基础上下文而 SparkSession 是在 Spark 2.0 之后引入的高级概念用于在 Spark 中创建和管理 DataFrame 和 Dataset
4.4 使用SparkSession可能出现的问题
Please install psutil to have better support with spilling需要安装 psutil 库以便在 PySpark 中更好地监控和管理系统资源使用包括内存、磁盘和网络资源。要安装 psutil在Python 环境中使用以下命令pip install psutil4.5 filter算子 filter 用于从 RDD 中筛选出满足指定条件的元素并返回一个包含筛选结果的新 RDD。 filter 方法的基本语法如下 new_rdd old_rdd.filter(lambda x: condition(x))old_rdd 是要进行筛选的原始 RDDcondition 是一个用于筛选元素的函数x 是 RDD 中的每个元素。filter 方法将 condition(x) 应用于每个元素并将满足条件的元素组成一个新的 RDD。 使用 filter 方法从包含数字的 RDD 中筛选出偶数
from pyspark.sql import SparkSession
# 设置本地python解释器 请注意修改为自己的python解释器目录
import os
os.environ[PYSPARK_PYTHON] C:\environment\Python3.11.4\python.exe
# 创建 SparkSession
spark SparkSession.builder.appName(FilterExample).getOrCreate()# 创建一个包含数字的 RDD
data [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd spark.sparkContext.parallelize(data)# 使用 filter 方法筛选出偶数
even_rdd rdd.filter(lambda x: x % 2 0)# 收集结果并打印
result even_rdd.collect()
print(result) # [2, 4, 6, 8, 10]# 停止 SparkSession
spark.stop()4.6 distinct算子 distinct 对RDD数据进行去重操作并返回一个包含这些不重复元素的新 RDD。 distinct 方法的基本语法如下 new_rdd old_rdd.distinct()old_rdd 是要进行操作的原始 RDDdistinct 方法将会返回一个新的 RDD其中包含了所有不重复的元素。 使用 distinct 方法从包含重复元素的 RDD 中获取不重复的元素
from pyspark.sql import SparkSession
# 设置本地python解释器 请注意修改为自己的python解释器目录
import os
os.environ[PYSPARK_PYTHON] C:\environment\Python3.11.4\python.exe# 创建 SparkSession
spark SparkSession.builder.appName(DistinctExample).getOrCreate()# 创建一个包含重复元素的 RDD
data [1, 2, 3, 2, 4, 3, 5, 6, 1]
rdd spark.sparkContext.parallelize(data)# 使用 distinct 方法获取不重复的元素
distinct_rdd rdd.distinct()# 收集结果并打印
result distinct_rdd.collect()
print(result) # [1, 2, 3, 4, 5, 6]# 停止 SparkSession
spark.stop()4.7 sortBy算子 sortBy 用于对 RDD 中的元素根据指定的排序依据进行排序并返回一个新的包含排序结果的 RDD。 sortBy 方法的基本语法如下 new_rdd old_rdd.sortBy(lambda x: key_function(x), ascendingTrue,numPartitions1)old_rdd 是要进行排序的原始 RDDkey_function 是一个用于从每个元素中提取排序关键字的函数x 是 RDD 中的每个元素。ascending 参数指定排序顺序为 True 表示升序为 False 表示降序。numPartitions 可选参数用于指定输出 RDD 的分区数量全局排序需要设置分区数为1 使用 sortBy 方法对包含数字的 RDD 进行升序排序
from pyspark.sql import SparkSession# 创建 SparkSession
spark SparkSession.builder.appName(SortByExample).getOrCreate()# 创建一个包含数字的 RDD
data [5, 2, 8, 1, 3, 7, 4, 6]
rdd spark.sparkContext.parallelize(data)# 使用 sortBy 方法对 RDD 进行升序排序
sorted_rdd rdd.sortBy(lambda x: x, ascendingTrue)# 收集结果并打印
result sorted_rdd.collect()
print(result) # [1, 2, 3, 4, 5, 6, 7, 8]# 停止 SparkSession
spark.stop()4.8 综合案例
{id:1,timestamp:2019-05-08T01:03.00Z,category:平板电脑,areaName:北京,money:1450}|{id:2,timestamp:2019-05-08T01:01.00Z,category:手机,areaName:北京,money:1450}|{id:3,timestamp:2019-05-08T01:03.00Z,category:手机,areaName:北京,money:8412}
{id:4,timestamp:2019-05-08T05:01.00Z,category:电脑,areaName:上海,money:1513}|{id:5,timestamp:2019-05-08T01:03.00Z,category:家电,areaName:北京,money:1550}|{id:6,timestamp:2019-05-08T01:01.00Z,category:电脑,areaName:杭州,money:1550}
{id:7,timestamp:2019-05-08T01:03.00Z,category:电脑,areaName:北京,money:5611}|{id:8,timestamp:2019-05-08T03:01.00Z,category:家电,areaName:北京,money:4410}|{id:9,timestamp:2019-05-08T01:03.00Z,category:家具,areaName:郑州,money:1120}
{id:10,timestamp:2019-05-08T01:01.00Z,category:家具,areaName:北京,money:6661}|{id:11,timestamp:2019-05-08T05:03.00Z,category:家具,areaName:杭州,money:1230}|{id:12,timestamp:2019-05-08T01:01.00Z,category:书籍,areaName:北京,money:5550}
{id:13,timestamp:2019-05-08T01:03.00Z,category:书籍,areaName:北京,money:5550}|{id:14,timestamp:2019-05-08T01:01.00Z,category:电脑,areaName:北京,money:1261}|{id:15,timestamp:2019-05-08T03:03.00Z,category:电脑,areaName:杭州,money:6660}
{id:16,timestamp:2019-05-08T01:01.00Z,category:电脑,areaName:天津,money:6660}|{id:17,timestamp:2019-05-08T01:03.00Z,category:书籍,areaName:北京,money:9000}|{id:18,timestamp:2019-05-08T05:01.00Z,category:书籍,areaName:北京,money:1230}
{id:19,timestamp:2019-05-08T01:03.00Z,category:电脑,areaName:杭州,money:5551}|{id:20,timestamp:2019-05-08T01:01.00Z,category:电脑,areaName:北京,money:2450}
{id:21,timestamp:2019-05-08T01:03.00Z,category:食品,areaName:北京,money:5520}|{id:22,timestamp:2019-05-08T01:01.00Z,category:食品,areaName:北京,money:6650}
{id:23,timestamp:2019-05-08T01:03.00Z,category:服饰,areaName:杭州,money:1240}|{id:24,timestamp:2019-05-08T01:01.00Z,category:食品,areaName:天津,money:5600}
{id:25,timestamp:2019-05-08T01:03.00Z,category:食品,areaName:北京,money:7801}|{id:26,timestamp:2019-05-08T01:01.00Z,category:服饰,areaName:北京,money:9000}
{id:27,timestamp:2019-05-08T01:03.00Z,category:服饰,areaName:杭州,money:5600}|{id:28,timestamp:2019-05-08T01:01.00Z,category:食品,areaName:北京,money:8000}|{id:29,timestamp:2019-05-08T02:03.00Z,category:服饰,areaName:杭州,money:7000}需求复制以上内容到文件中使用Spark读取文件进行计算 各个城市销售额排名从大到小全部城市有哪些商品类别在售卖北京市有哪些商品类别在售卖 参考代码 完成练习案例JSON商品统计
需求
1. 各个城市销售额排名从大到小
2. 全部城市有哪些商品类别在售卖
3. 北京市有哪些商品类别在售卖from pyspark import SparkConf, SparkContext
import os
import json
os.environ[PYSPARK_PYTHON] D:/dev/python/python310/python.exe
conf SparkConf().setMaster(local[*]).setAppName(test_spark)
sc SparkContext(confconf)# TODO 需求1 城市销售额排名
# 1.1 读取文件得到RDD
file_rdd sc.textFile(D:/orders.txt)
# 1.2 取出一个个JSON字符串
json_str_rdd file_rdd.flatMap(lambda x: x.split(|))
# 1.3 将一个个JSON字符串转换为字典
dict_rdd json_str_rdd.map(lambda x: json.loads(x))
# 1.4 取出城市和销售额数据
# (城市销售额)
city_with_money_rdd dict_rdd.map(lambda x: (x[areaName], int(x[money])))
# 1.5 按城市分组按销售额聚合
city_result_rdd city_with_money_rdd.reduceByKey(lambda a, b: a b)
# 1.6 按销售额聚合结果进行排序
result1_rdd city_result_rdd.sortBy(lambda x: x[1], ascendingFalse, numPartitions1)
print(需求1的结果, result1_rdd.collect())
# TODO 需求2 全部城市有哪些商品类别在售卖
# 2.1 取出全部的商品类别
category_rdd dict_rdd.map(lambda x: x[category]).distinct()
print(需求2的结果, category_rdd.collect())
# 2.2 对全部商品类别进行去重
# TODO 需求3 北京市有哪些商品类别在售卖
# 3.1 过滤北京市的数据
beijing_data_rdd dict_rdd.filter(lambda x: x[areaName] 北京)
# 3.2 取出全部商品类别并进行商品类别去重
result3_rdd beijing_data_rdd.map(lambda x: x[category]).distinct()
print(需求3的结果, result3_rdd.collect())
五 数据输出
5.1 输出为python对象
5.1.1 collect算子 功能:将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象 collect 操作会触发实际的计算操作因此在处理大量数据时需要谨慎使用以避免资源耗尽或内存溢出的问题。 collect 操作的基本语法 result rdd.collect()rdd 是要执行 collect 操作的 RDDresult 是一个包含所有元素的本地 Python 列表。 使用 collect 操作将 RDD 中的元素收集到本地列表并打印 from pyspark.sql import SparkSession# 创建 SparkSession
spark SparkSession.builder.appName(CollectExample).getOrCreate()# 创建一个包含数字的 RDD
data [1, 2, 3, 4, 5]
rdd spark.sparkContext.parallelize(data)# 使用 collect 操作将 RDD 中的元素收集到本地列表
result rdd.collect()# 打印结果
print(result)# 停止 SparkSession
spark.stop()请注意collect 操作将所有数据从分布式计算节点收集到驱动程序节点因此适用于数据量较小的情况。对于大规模数据集使用 collect 可能会导致驱动程序节点的内存溢出。如果需要对数据进行分析、汇总或展示建议使用其他适当的操作如 take、first、reduce 等以避免在内存方面的问题。
5.1.2 reduce算子 reduce 是一种动作action操作用于对 RDD 中的元素进行累积计算。它可以将一个二元操作应用于 RDD 中的所有元素从而将元素逐个合并以得到一个单一的结果。 reduce 操作是一个逐步的、迭代的过程它从左到右依次将每个元素与累积值进行二元操作直到得到最终结果。 reduce 操作的基本语法如下 result rdd.reduce(lambda x, y: binary_function(x, y))rdd 是要执行 reduce 操作的 RDDbinary_function 是一个二元操作的函数x 和 y 分别是 RDD 中的两个元素reduce 操作将会从左到右依次将每对元素应用二元操作得到一个最终的结果。 使用 reduce 操作计算 RDD 中所有元素的累加和
from pyspark.sql import SparkSession# 创建 SparkSession
spark SparkSession.builder.appName(ReduceExample).getOrCreate()# 创建一个包含数字的 RDD
data [1, 2, 3, 4, 5]
rdd spark.sparkContext.parallelize(data)# 使用 reduce 操作计算 RDD 中所有元素的累加和
total_sum rdd.reduce(lambda x, y: x y)# 打印结果
print(Total sum:, total_sum)# 停止 SparkSession
spark.stop()5.1.3 take算子 take 是一种动作action操作用于从 RDD 中获取指定数量的元素并返回一个包含这些元素的本地 Python 列表。与 collect 不同take 操作仅获取指定数量的元素而不是整个 RDD 的所有元素。 take 操作的基本语法如下 result rdd.take(num_elements)rdd 是要执行 take 操作的 RDDnum_elements 是要获取的元素数量result 是一个包含获取的元素的本地 Python 列表。
使用 take 操作从 RDD 中获取指定数量的元素
from pyspark.sql import SparkSession# 创建 SparkSession
spark SparkSession.builder.appName(TakeExample).getOrCreate()# 创建一个包含数字的 RDD
data [1, 2, 3, 4, 5]
rdd spark.sparkContext.parallelize(data)# 使用 take 操作从 RDD 中获取前三个元素
selected_elements rdd.take(3)# 打印结果
print(selected_elements) # [1, 2, 3]# 停止 SparkSession
spark.stop()注意take 操作仅在集群中获取指定数量的元素而不会将整个 RDD 的所有元素都收集到驱动程序节点上。这使得 take 在需要获取一小部分数据样本时非常有用而不会耗尽内存。
5.1.4 count算子 count 是一种动作action操作用于计算 RDD 中元素的数量。它返回一个整数表示 RDD 中的元素个数。 count 操作的基本语法如下 num_elements rdd.count()rdd 是要执行 count 操作的 RDDnum_elements 是包含 RDD 中元素数量的整数。 使用 count 操作计算 RDD 中元素的数量
from pyspark.sql import SparkSession# 创建 SparkSession
spark SparkSession.builder.appName(CountExample).getOrCreate()# 创建一个包含数字的 RDD
data [1, 2, 3, 4, 5]
rdd spark.sparkContext.parallelize(data)# 使用 count 操作计算 RDD 中元素的数量
element_count rdd.count()# 打印结果
print(Number of elements:, element_count) # Number of elements: 5# 停止 SparkSession
spark.stop()注意count 操作会触发对 RDD 的完整扫描以确定元素的数量。对于大型数据集可能需要一些时间来计算数量。
5.2 输出到文件
5.2.1 环境准备
这里使用的是Hadoop3.3.5并使用兼容版本版本的winutils调用保存文件的算子需要配置Hadoop依赖
下载Hadoop安装包解压到电脑任意位置 可进入Hadoop按照下面的步骤进行操作也可直接点击连接进行跳转 在Python代码中使用os模块配置os.environ[‘HADOOP_HOME’] ‘HADOOP解压文件夹路径’ 感谢cdarlint的贡献下载winutils.exe并放入Hadoop解压文件夹的bin目录内 下载hadoop.dll并放入:C:/Windows/System32 文件夹内 下载方法点击文件跳转页面点击右侧下载按钮即可
5.2.2 saveAsTextFile算子 saveAsTextFile 是一种动作action操作用于将 RDD 中的内容保存到文本文件中。 saveAsTextFile 操作的基本语法如下 rdd.saveAsTextFile(output_path)rdd 是要保存的 RDDoutput_path 是指定的输出目录路径用于存储 RDD 的内容。当执行 saveAsTextFile 操作时PySpark 将会将 RDD 中的每个元素转换为字符串并将这些字符串逐行写入到文本文件中。 修改rdd分区为1个 方式1SparkConf对象设置属性全局并行度为1 conf SparkConf().setMaster(local[*]).setAppName(test_spark)
conf.set(spark.default.parallelism,1)
sc SparkContext(confconf)方式2创建RDD的时候设置parallelize方法传入numSlices参数为1
rdd1 sc.parallelize([1, 2, 3, 4, 5], numSlices1)
rdd2 sc.parallelize([(Hello, 3), (Spark, 5), (Hi, 7)], 1)
rdd3 sc.parallelize([[1, 3, 5], [6, 7, 9], [11, 13, 11]], 1)使用 saveAsTextFile 操作将 RDD 中的元素保存到文本文件 注意saveAsTextFile 操作会自动分区数据并将数据保存到多个文件中每个分区一个文件。一般默认分区数和cpu核心数相同 from pyspark import SparkConf, SparkContext
import os
import json
os.environ[PYSPARK_PYTHON] C:/environment/Python3.11.4/python.exe
os.environ[HADOOP_HOME] C:/environment/hadoop-3.3.5
conf SparkConf().setMaster(local[*]).setAppName(test_spark)
# conf.set(spark.default.parallelism,1)
sc SparkContext(confconf)# 准备RDD1 numSlices设置分区为1
rdd1 sc.parallelize([1, 2, 3, 4, 5], numSlices1)# 准备RDD2 numSlices设置分区为1
rdd2 sc.parallelize([(Hello, 3), (Spark, 5), (Hi, 7)], 1)# 准备RDD3 numSlices设置分区为1
rdd3 sc.parallelize([[1, 3, 5], [6, 7, 9], [11, 13, 11]], 1)# 输出到文件中
rdd1.saveAsTextFile(C:/output1)
rdd2.saveAsTextFile(C:/output2)
rdd3.saveAsTextFile(C:/output3)5.2.3 coalesce/repartition调整分区 saveAsTextFile 操作默认会将数据分区并保存到多个文件中每个分区一个文件以便更好地利用分布式存储。如果希望将数据保存为单个文本文件可以使用 coalesce 或 repartition 操作来调整 RDD 的分区数量将所有数据合并到一个分区中然后再使用 saveAsTextFile 操作。 演示使用 coalesce 或 repartition 的示例以将数据保存为单个文本文件 使用 coalesce from pyspark.sql import SparkSession# 创建 SparkSession
spark SparkSession.builder.appName(SaveSingleTextFileExample).getOrCreate()# 创建一个包含数字的 RDD
data [1, 2, 3, 4, 5]
rdd spark.sparkContext.parallelize(data, numSlices4) # 指定初始分区数为 4# 将数据合并到一个分区
single_partition_rdd rdd.coalesce(1)# 将合并后的数据保存为单个文本文件
output_path single_text_file
single_partition_rdd.saveAsTextFile(output_path)# 停止 SparkSession
spark.stop()使用 repartition from pyspark.sql import SparkSession# 创建 SparkSession
spark SparkSession.builder.appName(SaveSingleTextFileExample).getOrCreate()# 创建一个包含数字的 RDD
data [1, 2, 3, 4, 5]
rdd spark.sparkContext.parallelize(data, numSlices4) # 指定初始分区数为 4# 将数据重新分区为一个分区
single_partition_rdd rdd.repartition(1)# 将重新分区后的数据保存为单个文本文件
output_path single_text_file
single_partition_rdd.saveAsTextFile(output_path)# 停止 SparkSession
spark.stop()六 综合案例搜索引擎日志分析
6.1 需求 读取文件转换成RDD并完成
打印输出热门搜索时间段小时精度Top3打印输出热门搜索词Top3打印输出统计黑马程序员关键字在哪个时段被搜索最多将数据转换为JSON格式写出为文件
6.2 参考代码
from pyspark import SparkConf, SparkContext
import os
import json
os.environ[PYSPARK_PYTHON] C:/environment/Python3.11.4/python.exe
os.environ[HADOOP_HOME] C:/environment/hadoop-3.3.5
conf SparkConf().setMaster(local[*]).setAppName(test_spark)
conf.set(spark.default.parallelism, 1)
sc SparkContext(confconf)# 读取文件转换成RDD
file_rdd sc.textFile(C:/search_log.txt)
# TODO 需求1 热门搜索时间段Top3小时精度
# 1.1 取出全部的时间并转换为小时
# 1.2 转换为(小时, 1) 的二元元组
# 1.3 Key分组聚合Value
# 1.4 排序降序
# 1.5 取前3
result1 file_rdd.map(lambda x: x.split(\t)[0][:2])\.map(lambda x:(x, 1))\.reduceByKey(lambda a, b: a b)\.sortBy(lambda x: x[1], ascendingFalse, numPartitions1)\.take(3)
print(需求1的结果, result1)# TODO 需求2 热门搜索词Top3
# 2.1 取出全部的搜索词
# 2.2 (词, 1) 二元元组
# 2.3 分组聚合
# 2.4 排序
# 2.5 Top3
result2 file_rdd.map(lambda x: (x.split(\t)[2], 1)).\reduceByKey(lambda a, b: a b).\sortBy(lambda x: x[1], ascendingFalse, numPartitions1).\take(3)
print(需求2的结果, result2)# TODO 需求3 统计黑马程序员关键字在什么时段被搜索的最多
# 3.1 过滤内容只保留黑马程序员关键词
# 3.2 转换为(小时, 1) 的二元元组
# 3.3 Key分组聚合Value
# 3.4 排序降序
# 3.5 取前1
result3 file_rdd.map(lambda x: x.split(\t)).\filter(lambda x: x[2] 黑马程序员).\map(lambda x: (x[0][:2], 1)).\reduceByKey(lambda a, b: a b).\sortBy(lambda x: x[1], ascendingFalse, numPartitions1).\take(1)
print(需求3的结果, result3)# TODO 需求4 将数据转换为JSON格式写出到文件中
# 4.1 转换为JSON格式的RDD
# 4.2 写出为文件
file_rdd.map(lambda x: x.split(\t)).\map(lambda x: {time: x[0], user_id: x[1], key_word: x[2], rank1: x[3], rank2: x[4], url: x[5]}).\saveAsTextFile(C:/output_json)
6.3 输出结果
Setting default log level to WARN.
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/14 13:18:50 WARN Utils: Service SparkUI could not bind on port 4040. Attempting port 4041.
需求1的结果 [(20, 3479), (23, 3087), (21, 2989)]
需求2的结果 [(scala, 2310), (hadoop, 2268), (博学谷, 2002)]
需求3的结果 [(22, 245)]