Spark官方快速入门教程-中文版

翻译自:https://spark.apache.org/docs/latest/quick-start.html 当前版本spark 3.5.1.

本教程简要介绍了如何使用 Spark。我们将首先通过 Spark 的交互式 shell(使用 Python 或 Scala)介绍 API,然后展示如何用 Java、Scala 和 Python 编写应用程序。

要遵循本指南,首先从 Spark 网站下载 Spark 的打包版本。由于我们不会使用 HDFS,因此您可以下载任何版本 Hadoop 的软件包。

请注意,在 Spark 2.0 之前,Spark 的主要编程接口是弹性分布式数据集 (RDD)。在 Spark 2.0 之后,RDD 被 Dataset 取代,它像 RDD 一样是强类型的,但在底层进行了更丰富的优化。RDD 接口仍然受支持,您可以在 RDD 编程指南中获得更详细的参考。但是,我们强烈建议您切换到使用 Dataset,它比 RDD 具有更好的性能。请参阅 SQL 编程指南 以获取有关 Dataset 的更多信息。

使用 Spark Shell 进行交互分析

基础

Spark 的 shell 提供了一种学习 API 的简单方法,以及一个强大的交互式数据分析工具。它既可以在 Scala(在 Java VM 上运行,因此是使用现有 Java 库的好方法)中使用,也可以在 Python 中使用。通过在 Spark 目录中运行以下命令来启动它:

./bin/pyspark

如果您当前环境中使用 pip 安装了 PySpark:

pyspark

Spark 的主要抽象是一个分布式项目集合,称为 Dataset。可以从 Hadoop InputFormats(例如 HDFS 文件)或通过转换其他 Dataset 创建 Dataset。由于 Python 的动态特性,我们不需要 Dataset 在 Python 中是强类型的。因此,Python 中的所有 Dataset 都是 Dataset[Row],我们称之为 DataFrame,以与 Pandas 和 R 中的数据框概念保持一致。让我们从 Spark 源目录中的 README 文件文本创建一个新的 DataFrame:

>>> textFile = spark.read.text("README.md")

您可以通过调用某些操作直接从 DataFrame 获取值,或者转换 DataFrame 以获取新值。有关更多详细信息,请阅读 API 文档

>>> textFile.count()  # Number of rows in this DataFrame
126

>>> textFile.first()  # First row in this DataFrame
Row(value=u'# Apache Spark')

现在让我们将这个 DataFrame 转换为一个新的。我们调用 filter 来返回一个包含文件中行子集的新 DataFrame。

>>> linesWithSpark = textFile.filter(textFile.value.contains("Spark"))

我们可以将transform和action链接在一起:

>>> textFile.filter(textFile.value.contains("Spark")).count()  # How many lines contain "Spark"?
15

更多Dataset操作

数据集操作和转换可用于更复杂的计算。假设我们想找到包含最多单词的行:

>>> from pyspark.sql import functions as sf
>>> textFile.select(sf.size(sf.split(textFile.value, "\s+")).name("numWords")).agg(sf.max(sf.col("numWords"))).collect()
[Row(max(numWords)=15)]

这首先将一行映射到一个整数值,并将其别名为“numWords”,从而创建一个新的 DataFrame。在该 DataFrame 上调用 agg 来查找最大字数。select 和 agg 的参数都是 Column,我们可以使用 df.colName 从 DataFrame 中获取一列。我们还可以导入 pyspark.sql.functions,它提供了许多方便的函数来从旧列构建新列。

一种常见的数据流模式是 MapReduce,由 Hadoop 推广。Spark 可以轻松实现 MapReduce 流:

>>> wordCounts = textFile.select(sf.explode(sf.split(textFile.value, "\s+")).alias("word")).groupBy("word").count()

在这里,我们在 select 中使用explode函数,将行数据集转换为单词数据集,然后结合groupBy和count来计算文件中每个单词的计数,作为两列的DataFrame:word和count。要在shell中收集单词计数,我们可以调用collect:

>>> wordCounts.collect()
[Row(word=u'online', count=1), Row(word=u'graphs', count=1), ...]

缓存(cache)

Spark 还支持将数据集拉入集群范围的内存缓存中。当数据被重复访问时,这非常有用,例如查询小型“热门”数据集或运行 PageRank 等迭代算法时。举一个简单的例子,让我们将 linesWithSpark 数据集标记为要缓存:

>>> linesWithSpark.cache()

>>> linesWithSpark.count()
15

>>> linesWithSpark.count()
15

使用 Spark 探索和缓存 100 行文本文件似乎有些愚蠢。有趣的是,这些相同的函数可以用于非常大的数据集,即使它们分布在数十或数百个节点上。您还可以通过将 bin/pyspark 连接到集群以交互方式执行此操作,如 RDD 编程指南中所述。

独立的应用程序

假设我们希望使用 Spark API 编写一个独立的应用程序。我们将介绍使用 Scala(使用 sbt)、Java(使用 Maven)和 Python(pip)编写的简单应用程序。

现在我们将展示如何使用 Python API(PySpark)编写应用程序。

如果您正在构建打包的 PySpark 应用程序或库,则可以将其添加到 setup.py 文件中,如下所示:

    install_requires=[
        'pyspark==3.5.1'
    ]

作为示例,我们将创建一个简单的 Spark 应用程序 SimpleApp.py:

"""SimpleApp.py"""
from pyspark.sql import SparkSession

logFile = "YOUR_SPARK_HOME/README.md"  # Should be some file on your system
spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
logData = spark.read.text(logFile).cache()

numAs = logData.filter(logData.value.contains('a')).count()
numBs = logData.filter(logData.value.contains('b')).count()

print("Lines with a: %i, lines with b: %i" % (numAs, numBs))

spark.stop()

该程序仅计算文本文件中包含“a”的行数和包含“b”的数字。请注意,您需要将 YOUR_SPARK_HOME 替换为 Spark 的安装位置。与 Scala 和 Java 示例一样,我们使用 SparkSession 来创建数据集。对于使用自定义类或第三方库的应用程序,我们还可以通过 spark-submit 的 –py-files 参数将其打包成 .zip 文件(有关详细信息,请参阅 spark-submit –help)。SimpleApp 足够简单,我们不需要指定任何代码依赖项。

我们可以使用 bin/spark-submit 脚本运行该应用程序:

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --master local[4] \
  SimpleApp.py
...
Lines with a: 46, Lines with b: 23

如果您的环境中安装了 PySpark pip(例如,pip install pyspark),您可以使用常规 Python 解释器运行您的应用程序,或者根据需要使用提供的“spark-submit”。

# Use the Python interpreter to run your application
$ python SimpleApp.py
...
Lines with a: 46, Lines with b: 23

其他依赖管理工具(如 Conda 和 pip)也可用于自定义类或第三方库。另请参阅 Python 包管理

继续学习

恭喜您运行您的第一个 Spark 应用程序!

  • 要深入了解 API,请从 RDD 编程指南和 SQL 编程指南开始,或参阅其他组件的“编程指南”菜单。
  • 要在集群上运行应用程序,请转到部署概述。
  • 最后,Spark 在 examples 目录中包含几个示例(Scala、Java、Python、R)。您可以按如下方式运行它们:
# For Scala and Java, use run-example:
./bin/run-example SparkPi

# For Python examples, use spark-submit directly:
./bin/spark-submit examples/src/main/python/pi.py

# For R examples, use spark-submit directly:
./bin/spark-submit examples/src/main/r/dataframe.R
5 1 投票
文章评分
订阅评论
提醒
guest
1 评论
最新
最旧 最多投票
内联反馈
查看所有评论
1
0
希望看到您的想法,请您发表评论x