翻译自:https://spark.apache.org/docs/latest/quick-start.html 当前版本spark 3.5.1.
本教程简要介绍了如何使用 Spark。我们将首先通过 Spark 的交互式 shell(使用 Python 或 Scala)介绍 API,然后展示如何用 Java、Scala 和 Python 编写应用程序。
要遵循本指南,首先从 Spark 网站下载 Spark 的打包版本。由于我们不会使用 HDFS,因此您可以下载任何版本 Hadoop 的软件包。
请注意,在 Spark 2.0 之前,Spark 的主要编程接口是弹性分布式数据集 (RDD)。在 Spark 2.0 之后,RDD 被 Dataset 取代,它像 RDD 一样是强类型的,但在底层进行了更丰富的优化。RDD 接口仍然受支持,您可以在 RDD 编程指南中获得更详细的参考。但是,我们强烈建议您切换到使用 Dataset,它比 RDD 具有更好的性能。请参阅 SQL 编程指南 以获取有关 Dataset 的更多信息。
使用 Spark Shell 进行交互分析
基础
Spark 的 shell 提供了一种学习 API 的简单方法,以及一个强大的交互式数据分析工具。它既可以在 Scala(在 Java VM 上运行,因此是使用现有 Java 库的好方法)中使用,也可以在 Python 中使用。通过在 Spark 目录中运行以下命令来启动它:
./bin/pyspark
如果您当前环境中使用 pip 安装了 PySpark:
pyspark
Spark 的主要抽象是一个分布式项目集合,称为 Dataset。可以从 Hadoop InputFormats(例如 HDFS 文件)或通过转换其他 Dataset 创建 Dataset。由于 Python 的动态特性,我们不需要 Dataset 在 Python 中是强类型的。因此,Python 中的所有 Dataset 都是 Dataset[Row],我们称之为 DataFrame,以与 Pandas 和 R 中的数据框概念保持一致。让我们从 Spark 源目录中的 README 文件文本创建一个新的 DataFrame:
>>> textFile = spark.read.text("README.md")
您可以通过调用某些操作直接从 DataFrame 获取值,或者转换 DataFrame 以获取新值。有关更多详细信息,请阅读 API 文档。
>>> textFile.count() # Number of rows in this DataFrame
126
>>> textFile.first() # First row in this DataFrame
Row(value=u'# Apache Spark')
现在让我们将这个 DataFrame 转换为一个新的。我们调用 filter 来返回一个包含文件中行子集的新 DataFrame。
>>> linesWithSpark = textFile.filter(textFile.value.contains("Spark"))
我们可以将transform和action链接在一起:
>>> textFile.filter(textFile.value.contains("Spark")).count() # How many lines contain "Spark"?
15
更多Dataset操作
数据集操作和转换可用于更复杂的计算。假设我们想找到包含最多单词的行:
>>> from pyspark.sql import functions as sf
>>> textFile.select(sf.size(sf.split(textFile.value, "\s+")).name("numWords")).agg(sf.max(sf.col("numWords"))).collect()
[Row(max(numWords)=15)]
这首先将一行映射到一个整数值,并将其别名为“numWords”,从而创建一个新的 DataFrame。在该 DataFrame 上调用 agg 来查找最大字数。select 和 agg 的参数都是 Column,我们可以使用 df.colName 从 DataFrame 中获取一列。我们还可以导入 pyspark.sql.functions,它提供了许多方便的函数来从旧列构建新列。
一种常见的数据流模式是 MapReduce,由 Hadoop 推广。Spark 可以轻松实现 MapReduce 流:
>>> wordCounts = textFile.select(sf.explode(sf.split(textFile.value, "\s+")).alias("word")).groupBy("word").count()
在这里,我们在 select 中使用explode函数,将行数据集转换为单词数据集,然后结合groupBy和count来计算文件中每个单词的计数,作为两列的DataFrame:word和count。要在shell中收集单词计数,我们可以调用collect:
>>> wordCounts.collect()
[Row(word=u'online', count=1), Row(word=u'graphs', count=1), ...]
缓存(cache)
Spark 还支持将数据集拉入集群范围的内存缓存中。当数据被重复访问时,这非常有用,例如查询小型“热门”数据集或运行 PageRank 等迭代算法时。举一个简单的例子,让我们将 linesWithSpark 数据集标记为要缓存:
>>> linesWithSpark.cache()
>>> linesWithSpark.count()
15
>>> linesWithSpark.count()
15
使用 Spark 探索和缓存 100 行文本文件似乎有些愚蠢。有趣的是,这些相同的函数可以用于非常大的数据集,即使它们分布在数十或数百个节点上。您还可以通过将 bin/pyspark 连接到集群以交互方式执行此操作,如 RDD 编程指南中所述。
独立的应用程序
假设我们希望使用 Spark API 编写一个独立的应用程序。我们将介绍使用 Scala(使用 sbt)、Java(使用 Maven)和 Python(pip)编写的简单应用程序。
现在我们将展示如何使用 Python API(PySpark)编写应用程序。
如果您正在构建打包的 PySpark 应用程序或库,则可以将其添加到 setup.py 文件中,如下所示:
install_requires=[
'pyspark==3.5.1'
]
作为示例,我们将创建一个简单的 Spark 应用程序 SimpleApp.py:
"""SimpleApp.py"""
from pyspark.sql import SparkSession
logFile = "YOUR_SPARK_HOME/README.md" # Should be some file on your system
spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
logData = spark.read.text(logFile).cache()
numAs = logData.filter(logData.value.contains('a')).count()
numBs = logData.filter(logData.value.contains('b')).count()
print("Lines with a: %i, lines with b: %i" % (numAs, numBs))
spark.stop()
该程序仅计算文本文件中包含“a”的行数和包含“b”的数字。请注意,您需要将 YOUR_SPARK_HOME 替换为 Spark 的安装位置。与 Scala 和 Java 示例一样,我们使用 SparkSession 来创建数据集。对于使用自定义类或第三方库的应用程序,我们还可以通过 spark-submit 的 –py-files 参数将其打包成 .zip 文件(有关详细信息,请参阅 spark-submit –help)。SimpleApp 足够简单,我们不需要指定任何代码依赖项。
我们可以使用 bin/spark-submit 脚本运行该应用程序:
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--master local[4] \
SimpleApp.py
...
Lines with a: 46, Lines with b: 23
如果您的环境中安装了 PySpark pip(例如,pip install pyspark),您可以使用常规 Python 解释器运行您的应用程序,或者根据需要使用提供的“spark-submit”。
# Use the Python interpreter to run your application
$ python SimpleApp.py
...
Lines with a: 46, Lines with b: 23
其他依赖管理工具(如 Conda 和 pip)也可用于自定义类或第三方库。另请参阅 Python 包管理。
继续学习
恭喜您运行您的第一个 Spark 应用程序!
- 要深入了解 API,请从 RDD 编程指南和 SQL 编程指南开始,或参阅其他组件的“编程指南”菜单。
- 要在集群上运行应用程序,请转到部署概述。
- 最后,Spark 在 examples 目录中包含几个示例(Scala、Java、Python、R)。您可以按如下方式运行它们:
# For Scala and Java, use run-example:
./bin/run-example SparkPi
# For Python examples, use spark-submit directly:
./bin/spark-submit examples/src/main/python/pi.py
# For R examples, use spark-submit directly:
./bin/spark-submit examples/src/main/r/dataframe.R
很有帮助👍🏻