这张图解释了 Apache Spark DataFrame 写入 API 的流程。它始于对写入数据的 API 调用,支持的格式包括 CSV、JSON 或 Parquet。流程根据选择的保存模式(追加、覆盖、忽略或报错)而分岔。每种模式执行必要的检查和操作,例如分区和数据写入处理。流程以数据的最终写入或错误结束,取决于这些检查和操作的结果。
Apache Spark 是一个开源的分布式计算系统,提供了强大的平台用于处理大规模数据。写入 API 是 Spark 数据处理能力的基本组成部分,允许用户将数据从他们的 Spark 应用程序写入或输出到不同的数据源。
一、理解 Spark 写入 API
1.数据源
Spark 支持将数据写入各种数据源,包括但不限于:
- 分布式文件系统,如 HDFS
- 云存储,如 AWS S3、Azure Blob Storage
- 传统数据库(包括 SQL 和 NoSQL)
- 大数据文件格式(Parquet、Avro、ORC)
2.DataFrameWriter
写入 API 的核心类是DataFrameWriter。它提供配置和执行写入操作的功能。通过在 DataFrame 或 Dataset 上调用.write方法获得DataFrameWriter。
3.写入模式
指定 Spark 在写入数据时应如何处理现有数据的模式。常见的模式包括:
- append:将新数据添加到现有数据中。
- overwrite:用新数据覆盖现有数据。
- ignore:如果数据已存在,则忽略写入操作。
- errorIfExists(默认):如果数据已存在,则抛出错误。
4.格式规范
可以使用.format("formatType")方法指定输出数据的格式,如 JSON、CSV、Parquet 等。
5.分区
为了实现有效的数据存储,可以使用.partitionBy("column")方法根据一个或多个列对输出数据进行分区。
6.配置选项
可以使用.option("key", "value")方法设置特定于数据源的各种选项,如压缩、CSV 文件的自定义分隔符等。
7.保存数据
最后,使用.save("path")方法将 DataFrame 写入指定的路径。其他方法如.saveAsTable("tableName")也可用于不同的写入场景。
from pyspark.sql import SparkSession
from pyspark.sql import Row
import os
# 初始化 SparkSession
spark = SparkSession.builder
.appName("DataFrameWriterSaveModesExample")
.getOrCreate()
# 示例数据
data = [
Row(name="Alice", age=25, country="USA"),
Row(name="Bob", age=30, country="UK")
]
# 附加数据用于追加模式
additional_data = [
Row(name="Carlos", age=35, country="Spain"),
Row(name="Daisy", age=40, country="Australia")
]
# 创建 DataFrames
df = spark.createDataFrame(data)
additional_df = spark.createDataFrame(additional_data)
# 定义输出路径
output_path = "output/csv_save_modes"
# 函数:列出目录中的文件
def list_files_in_directory(path):
files = os.listdir(path)
return files
# 显示初始 DataFrame
print("初始 DataFrame:")
df.show()
# 使用覆盖模式写入 CSV 格式
df.write.csv(output_path, mode="overwrite", header=True)
print("覆盖模式后的文件:", list_files_in_directory(output_path))
# 显示附加 DataFrame
print("附加 DataFrame:")
additional_df.show()
# 使用追加模式写入 CSV 格式
additional_df.write.csv(output_path, mode="append", header=True)
print("追加模式后的文件:", list_files_in_directory(output_path))
# 使用忽略模式写入 CSV 格式
additional_df.write.csv(output_path, mode="ignore", header=True)
print("忽略模式后的文件:", list_files_in_directory(output_path))
# 使用 errorIfExists 模式写入 CSV 格式
try:
additional_df.write.csv(output_path, mode="errorIfExists", header=True)
except Exception as e:
print("errorIfExists 模式中发生错误:", e)
# 停止 SparkSession
spark.stop()
二、Spark 架构概述
在 Apache Spark 中写入 DataFrame 遵循一种顺序流程。Spark 基于用户 DataFrame 操作创建逻辑计划,优化为物理计划,并分成阶段。系统按分区处理数据,对其进行日志记录以确保可靠性,并带有定义的分区和写入模式写入到本地存储。Spark 的架构确保在计算集群中高效管理和扩展数据写入任务。
从 Spark 内部架构的角度来看,Apache Spark 写入 API 涉及了解 Spark 如何在幕后管理数据处理、分发和写入操作。让我们来详细了解:
三、Spark 架构概述
- 驱动程序和执行器:Spark 采用主从架构。驱动节点运行应用程序的 main() 函数并维护有关 Spark 应用程序的信息。执行器节点执行数据处理和写入操作。
- DAG 调度器:当触发写入操作时,Spark 的 DAG(有向无环图)调度器将高级转换转换为一系列可以在集群中并行执行的阶段。
- 任务调度器:任务调度器在每个阶段内启动任务。这些任务分布在执行器之间。
- 执行计划和物理计划:Spark 使用 Catalyst 优化器创建高效的执行计划。这包括将逻辑计划(要做什么)转换为物理计划(如何做),考虑到分区、数据本地性和其他因素。
四、在 Spark 内部写入数据
(1) 数据分布:Spark 中的数据分布在分区中。当启动写入操作时,Spark 首先确定这些分区中的数据布局。
(2) 写入任务执行:每个分区的数据由一个任务处理。这些任务在不同的执行器之间并行执行。
写入模式和一致性:
- 对于overwrite和append模式,Spark 确保一致性,通过管理数据文件的替换或添加来实现。
- 对于基于文件的数据源,Spark 以分阶段的方式写入数据,先写入临时位置再提交到最终位置,有助于确保一致性和处理故障。
(3) 格式处理和序列化:根据指定的格式(例如,Parquet、CSV),Spark 使用相应的序列化器将数据转换为所需的格式。执行器处理此过程。
(4) 分区和文件管理:
- 如果指定了分区,则Spark在写入之前根据这些分区对数据进行排序和组织。这通常涉及在执行器之间移动数据。
- Spark 试图最小化每个分区创建的文件数量,以优化大文件大小,在分布式文件系统中更有效。
(5) 错误处理和容错:在写入操作期间,如果任务失败,Spark 可以重试任务,确保容错。但并非所有写入操作都是完全原子的,特定情况可能需要手动干预以确保数据完整性。
(6) 优化技术:
- Catalyst 优化器:为效率优化写入计划,例如最小化数据移动。
- Tungsten:Spark 的 Tungsten 引擎优化数据序列化和反序列化过程中的内存和 CPU 使用。
(7) 写入提交协议:Spark 使用写入提交协议来协调特定数据源的任务提交和中止过程,确保对写入数据的一致视图。
Spark 的写入 API 旨在实现高效和可靠的数据写入,它以复杂的方式编排任务分发、数据序列化和文件管理。它利用 Spark 的核心组件,如 DAG 调度器、任务调度器和 Catalyst 优化器,有效地执行写入操作。