Spark 与 Hadoop
Spark 与 Hadoop 最主要的差别在于多个作业间的数据通信
Spark 多个作业间的数据通信基于内存,而 Hadoop 基于磁盘
Hadoop:
一次性数据计算
Hadoop 在处理数据时,先从磁盘中读取数据,然后进行逻辑操作,再将处理结果重新存储到磁盘中
通过磁盘 IO 来进行作业,会消耗大量资源,影响性能
Spark:
Spark 优化了计算过程,把作业的计算结果放入内存中,这种方式的存取效率较高
总结:
在绝大多数场景中,Spark 的确比 MapReduce 更有优势
但正是 Spark 基于内存操作,在实际生产环境中,可能会由于内存的限制导致 Job 执行失败,此时 MapReduce 是更好的选择
即 Spark 并不能完全替代 MapReduce
Spark 核心模块
Spark Core:
提供最基础、最核心的功能
Spark SQL:
用于操作结构化数据的组件,通过 Spark SQL,用户可以使用 SQL 语言来查询数据
Spark Streaming:
用于流式计算的组件
Spark MLlib:
用于机器学习的算法库
Spark GraphX:
用于图计算的框架和算法库
Word Count
object WordCount {
def main(args: Array[String]): Unit = {
// 建立与 Spark 的连接
val conf = new SparkConf().setMaster("local").setAppName("WordCount")
val sparkContext = new SparkContext(conf)
// 执行业务操作
// 1 读取文件
val lines = sparkContext.textFile("data")
// 2 拆分 "hello world" -> "hello" "world"
val words = lines.flatMap(_.split(" "))
// 3 对单词分组 (hello, hello) (world, world)
val group = words.groupBy(word => word)
// 4 转换分组数据 (hello, hello) (world, world) -> (hello, 2) (world, 2)
val count = group.map {
case (word, list) => {
(word, list.size)
}
}
// 5 打印结果
val array = count.collect()
array.foreach(println)
// 关闭连接
sparkContext.stop()
}
}
(hello,4)
(world,2)
(spark,2)
功能实现流程图
Word Count 优化
object WordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("WordCount")
val sparkContext = new SparkContext(conf)
val lines = sparkContext.textFile("data")
val words = lines.flatMap(_.split(" "))
val wordToOne = words.map(
word => (word, 1)
)
// 涉及 Scala 的元组引用,_1 引用第一个元素,_2 引用第二个元素
val wordGroup = wordToOne.groupBy(
t => t._1
)
val wordToCount = wordGroup.map {
case (word, list) => {
list.reduce(
(t1, t2) => {
(t1._1, t1._2 + t2._2)
}
)
}
}
val array = wordToCount.collect()
array.foreach(println)
sparkContext.stop()
}
}
(hello,4)
(world,2)
(spark,2)
改进
Spark 实现 Word Count
object WordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("WordCount")
val sparkContext = new SparkContext(conf)
val lines = sparkContext.textFile("data")
val words = lines.flatMap(_.split(" "))
val wordToOne = words.map(
word => (word, 1)
)
// Spark 提供了更多功能,可以将分组和聚合用一个方法实现
// reduceByKey 就是将相同的 key,对 value 进行聚合
val wordToCount = wordToOne.reduceByKey(_ + _)
val array = wordToCount.collect()
array.foreach(println)
sparkContext.stop()
}
}
(hello,4)
(world,2)
(spark,2)
_ + _ 是由以下两步简化而来(至简原则)
wordToOne.reduceByKey((x, y) => {x + y})
wordToOne.reduceByKey((x, y) => x + y)
Spark 运行环境
Local 模式
在 IDEA 中运行代码的环境称之为开发环境,而 Local 模式与之不同
所谓的 Local 模式,就是不需要其他任何节点资源,就可以在本地执行 Spark 代码的环境,一般用于教学,调试,演示等
Standalone 模式
Local 本地模式只是用来进行练习演示的,真实工作中还是要将应用提交到对应的集群中去执行
只使用 Spark 自身节点运行的集群模式,也就是所谓独立部署(Standalone)模式
Spark 的 Standalone
模式体现了经典的 master-slave
模式
Yarn 模式
独立部署(Standalone)模式由 Spark 自身提供计算资源,无需其他框架提供资源。
这种方式降低了和其他第三方资源框架的耦合性,独立性非常强
但是 Spark 主要是计算框架,而不是资源调度框架,所以本身提供的资源调度并不是它的强项,所以还是和其他专业的资源调度框架集成会更靠谱一些
K8S & Mesos 模式
Mesos 是 Apache 下的开源分布式资源管理框架,它被称为是分布式系统的内核,在 Twitter 得到广泛使用,管理着 Twitter 超过 30,0000 台服务器上的应用部署,但是在国内依然使用着传统的 Hadoop 大数据框架,使用 Mesos 框架的并不多
Windows 模式
在自己学习时,每次都需要启动虚拟机,启动集群,这是一个比较繁琐的过程, 并且会占大量的系统资源,导致系统执行变慢,不仅仅影响学习效果,也影响学习进度, Spark 提供了可以在 Windows 系统下启动本地集群的方式
Spark 运行架构
Spark 框架的核心是一个计算引擎,采用了标准的 master-slave
结构
图中展示了 Spark 执行时的基本结构
Driver 表示 master, 负责管理整个集群中的作业任务调度
Executor 表示 slave,负责实际执行任务
核心组件
Driver
Spark 驱动器节点,用于执行 Spark 任务中的 main 方法,负责实际代码的执行工作
Driver 在 Spark 作业执行时主要负责:
- 将用户程序转化为作业(job)
- 在 Executor 之间调度任务(task)
- 跟踪 Executor 的执行情况
- 通过 UI 展示查询运行情况
实际上,我们无法准确地描述 Driver 的定义,因为在整个的编程过程中没有看到任何有关 Driver 的字眼
所以简单理解,所谓的 Driver 就是驱使整个应用运行起来的程序,也称之为 Driver 类
Executor
Spark Executor 是集群中工作节点(Worker)中一个 JVM 进程,负责在 Spark 作业中运行具体任务(Task),任务彼此之间相互独立
Spark 应用启动时,Executor 节点被同时启动,并且始终伴随着整个 Spark 应用的生命周期而存在
如果有 Executor 节点发生了 故障或崩溃,Spark 应用也可以继续执行,会将出错节点上的任务调度到其他 Executor 节点 上继续运行
Executor 有两个核心功能:
负责运行组成 Spark 应用的任务,并将结果返回给驱动器进程
它们通过自身的块管理器(Block Manager)为用户程序中要求缓存的 RDD 提供内存式存储
RDD 是直接缓存在 Executor 进程内的,因此任务可以在运行时充分利用缓存数据加速运算
Master & Worker
Spark 集群的独立部署环境中,不需要依赖其他的资源调度框架,自身就实现了资源调度的功能
所以环境中还有两个核心组件:Master 和 Worker
Master 是一个进程,主要负责资源的调度和分配,进行集群的监控
Worker 也是进程,一个 Worker 运行在集群中的一台服务器上,由 Master 分配资源对数据进行并行的处理和计算
ApplicationMaster
Hadoop 用户向 YARN 集群提交应用程序时,提交程序中应该包含 ApplicationMaster
,用于向资源调度器申请执行任务的资源容器 Container,运行用户自己的程序任务 job,监控整个任务的执行,跟踪整个任务的状态,处理任务失败等异常情况
即,ResourceManager(资源)和 Driver(计算)之间的解耦合靠的就是 ApplicationMaster
核心概念
Executor 与 Core
Spark Executor 是集群中运行在工作节点(Worker)中的一个 JVM 进程,是整个集群中的专门用于计算的节点
在提交应用中,可以提供参数指定计算节点的个数,以及对应的资源
这里的资源一般指的是工作节点 Executor 的内存大小和使用的虚拟 CPU 核(Core)数量
并行度(Parallelism)
在分布式计算框架中一般都是多个任务同时执行,由于任务分布在不同的计算节点进行计算,所以能够真正地实现多任务并行执行
注意,这里是并行,而不是并发
这里我们将整个集群并行执行任务的数量称之为并行度
一个作业的并行度取决于框架的默认配置,应用程序也可以在运行过程中动态修改
有向无环图(DAG)
DAG(Directed Acyclic Graph)有向无环图
是由点和线组成的拓扑图形,该图形具有方向,不会闭环
这里所谓的有向无环图,并不是真正意义的图形,而是由 Spark 程序直接映射成的数据流的高级抽象模型
简单理解就是将整个程序计算的执行过程用图形表示出来,这样更直观且更便于理解,可以用于表示程序的拓扑结构
大数据计算引擎框架我们根据使用方式的不同一般会分为四类,其中第一类就是 Hadoop 所承载的 MapReduce,它将计算分为两个阶段,Map 阶段 和 Reduce 阶段
对于上层应用来说,就不得不想方设法去拆分算法,甚至于不得不在上层应用实现多个 Job 的串联,以完成一个完整的算法,例如迭代计算,由于这样的弊端,催生了支持 DAG 框架的产生
Spark 核心编程
Spark 计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于处理不同的应用场景。
三大数据结构分别是:
- RDD:弹性分布式数据集
- 累加器:分布式共享只写变量
- 广播变量:分布式共享只读变量
上图模拟了 Spark 中分布式计算的过程,客户端向服务器发送计算任务
任务拆分:(1, 2, 3, 4) => (1, 2) (3, 4)
计算逻辑:n * 2
RDD
RDD(Resilient Distributed Datase)
叫做弹性分布式数据集,是 Spark 中最基本的数据处理模型,源码中是一个抽象类
RDD 的特点
弹性
- 存储的弹性:内存与磁盘的自动切换
- 容错的弹性:数据丢失可以自动恢复
- 计算的弹性:计算出错重试机制
- 分片的弹性:可根据需要重新分片
分布式:数据存储在大数据集群不同节点上
数据集:RDD 封装了计算逻辑,并不保存数据
数据抽象:RDD 是一个抽象类,需要子类具体实现
- 不可变:RDD 封装了计算逻辑,是不可以改变的,想要改变只能产生新的 RDD,在新的 RDD 里面封装计算逻辑
- 可分区、并行计算
RDD 与 IO 流的联系
IO
IO 操作体现了装饰者设计模式
字节流
一个一个字节读取,读一个打印一个
字节流 + 缓冲区
一次读取一个 字节缓冲区
大小的字节数,存满后一起打印
字符流
一次读取一个 字节缓冲区
大小的字节,此大小正好组装成一个字符,再暂存到 字符缓冲区
,存满后一起打印
RDD
- RDD 的数据处理方式类似于 IO 流,也体现了装饰者设计模式
- RDD 的数据只有在调用 collect 方法时,才会真正执行业务逻辑操作,之前的封装均为功能扩展
- RDD 不保存数据,但 IO 可以通过缓冲区暂存一部分数据
RDD 创建
从内存中创建 RDD
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val rdd1 = sparkContext.parallelize(
List(1, 2, 3, 4)
)
val rdd2 = sparkContext.makeRDD(
List(1, 2, 3, 4)
)
rdd1.collect().foreach(println)
rdd2.collect().foreach(println)
sparkContext.stop()
Spark 主要提供了两个方法:parallelize
和 makeRDD
底层代码中 makeRDD
方法其实就是 parallelize
方法,且大部分时候使用 makeRDD
方法居多
从外存中创建 RDD
由外部存储系统的数据集创建 RDD 包括:
- 本地的文件系统
- 所有 Hadoop 支持的数据集,比如 HDFS、HBase 等
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val fileRDD: RDD[String] = sparkContext.textFile("input")
fileRDD.collect().foreach(println)
sparkContext.stop()
从其他 RDD 创建
通过一个 RDD 运算完后,再产生新的 RDD
直接创建 RDD(new)
使用 new 的方式直接构造 RDD,一般由 Spark 框架自身使用
RDD 分区
默认情况下,Spark 可以将一个作业切分多个任务后,发送给 Executor 节点并行计算,而能够并行计算的任务数量称之为并行度,这个数量可以在构建 RDD 时指定
注意,这里是指并行执行的任务数量,并不是指切分任务的数量
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val dataRDD: RDD[Int] =
sparkContext.makeRDD(
List(1,2,3,4),
4)
val fileRDD: RDD[String] =
sparkContext.textFile(
"input",
2)
fileRDD.collect().foreach(println)
sparkContext.stop()
- 读取内存数据时,数据可以按照并行度的设定进行数据的分区操作
- 读取文件数据时,数据是按照 Hadoop 文件读取的规则进行切片分区,而切片规则和数据读取规则有差异
RDD 算子
RDD 算子就是 RDD 方法,只是名称不同
RDD 方法分为转换方法和行动方法
RDD 转换算子
RDD 根据数据处理方式的不同将算子整体上分为
- Value 类型
- 双 Value 类型
- Key-Value 类型
Value类型
map
将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换
mapPartitions
将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是进行任意处理,也可以是过滤数据
map 与 mapPartitions 的区别
map 是分区内一个数据一个数据的执行,类似于串行操作
mapPartitions 是以分区为单位进行批处理操作
所以,map 因为类似于串行操作,性能比较低,而 mapPartitions 类似于批处理,性能较高,但是 mapPartitions 会长时间占用内存,可能会导致内存溢出,在内存有限的情况下不推荐使用
mapPartitionsWithIndex
与 mapPartitions 类似,只是在处理时可以获取当前分区索引
flatMap
将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射
glom
将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变
groupBy
将数据根据指定的规则进行分组,分区默认不变,但是数据会被打乱重新组合,这样的操作称之为 shuffle
filter
将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃
sample
根据指定的规则从数据集中抽取数据
distinct
将数据集中重复的数据去重
coalesce
根据数据量缩减分区,当存在过多小任务时,可以通过 coalesce 方法收缩合并分区,减少分区的个数,减小任务调度成本
sortBy
用于排序数据
双 Value 类型
intersection
对两个 RDD 求交集
union
对两个 RDD 求并集
subtract
对两个 RDD 求差集,以一个 RDD 元素为主,去除两个 RDD 中重复元素,将其他元素保留下来
zip
将两个 RDD 中的元素以键值对的形式进行合并,其中键值对中的 Key 为第 1 个 RDD 中的元素,Value 为第 2 个 RDD 中的相同位置的元素
Key - Value 类型
partitionBy
将数据按照指定 Partitioner 重新进行分区,Spark 默认的分区器是 HashPartitioner
reduceByKey
可以将数据按照相同的 key 对 value 进行聚合
groupByKey
将数据源的数据根据 key 对 value 进行分组
aggregateByKey
将数据根据不同的规则进行分区内计算和分区间计算
foldByKey
当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为 foldByKey
combineByKey
最通用的对 key-value 型 RDD 进行聚集操作的聚集函数
sortByKey
在一个 (K,V) 的 RDD 上调用,K 必须实现 Ordered 接口(特质),返回一个按照 key 进行排序的 RDD
join
在类型为 (K,V) 和 (K,W) 的 RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的 (K,(V,W)) 的 RDD
RDD 行动算子
reduce
聚集 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据
collect
在驱动程序中,以数组 Array 的形式返回数据集的所有元素
count
返回 RDD 中元素的个数
first
返回 RDD 中的第一个元素
take
返回一个由 RDD 的前 n 个元素组成的数组
takeOrdered
返回该 RDD 排序后的前 n 个元素组成的数组
aggregate
分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合
fold
折叠操作,aggregate 的简化版操作
countByKey
统计每种 key 的个数
save 相关
将数据保存到不同格式的文件中
foreach
分布式遍历 RDD 中的每一个元素,调用指定函数
RDD 序列化
从计算的角度,算子外的代码都是在 Driver 端执行,算子内的代码都是在 Executor 端执行
那么在 Scala 的函数式编程中,就会导致算子内经常会用到算子外的数据,如果使用的算子外的数据无法序列化,就意味着无法通过网络传值给 Executor 端执行,发生错误
所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列化,这个操作称之为闭包检测
Kryo 序列化框架
Java 的序列化能够序列化任何类,但是生成的字节多,序列化后对象比较大
Spark 出于性能的考虑,Spark2.0 开始支持另外一种 Kryo 序列化机制
Kryo 速度是 Serializable 的 10 倍,当 RDD 在 Shuffle 数据的时候,简单数据类型、数组和字符串类已经在 Spark 内部使用 Kryo 来序列化
RDD 依赖关系
RDD 是不会保存数据的,为了提供容错性,需要将 RDD 的依赖关系保存下来
一旦发生错误,可根据血缘关系重新读取数据并计算
依赖关系
val rdd1 = rdd.map(_ * 2)
相邻两个 RDD 称为依赖关系
血缘关系
RDD1、RDD2、RDD3、RDD4 称为血缘关系
窄依赖
窄依赖表示每一个父 RDD 的 Partition 最多被子 RDD 的一个 Partition 使用
窄依赖可类比独生子女
宽依赖
宽依赖表示同一个父 RDD 的 Partition 被多个子 RDD 的 Partition 依赖,会引起 Shuffle
宽依赖可类比多生
RDD 持久化
RDD 不存储数据,如果一个 RDD 需要重复使用,那么就需要从头开始再次执行来获取数据
在数据执行任务较长,或数据比较重要的场合可以使用持久化
RDD Cache 缓存
RDD 通过 Cache 或者 Persist 方法将前面的计算结果进行缓存,默认情况下会把数据缓存在 JVM 的堆内存中
缓存有可能丢失或者由于内存不足而被删除,RDD 的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行
// cache 操作会增加血缘关系,不改变原有的血缘关系
println(wordToOneRdd.toDebugString)
// 数据缓存
wordToOneRdd.cache()
// 可以更改存储级别
mapRdd.persist(StorageLevel.MEMORY_AND_DISK_2)
RDD CheckPoint 检查点
检查点其实就是将 RDD 中间结果写入磁盘
血缘依赖过长会造成容错成本过高,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销
缓存和检查点区别
Cache 缓存只是将数据保存起来,不切断血缘依赖
Checkpoint 检查点切断血缘依赖
Cache 缓存的数据通常存储在内存,可靠性低
Checkpoint 的数据通常存储在 HDFS 等容错、高可用的文件系统,可靠性高
RDD 分区器
Spark 目前支持 Hash 分区、Range 分区、用户自定义分区,默认为 Hash 分区
只有 Key-Value 类型的 RDD 才有分区器,非 Key-Value 类型的 RDD 分区的值是 None
Hash 分区
对于给定的 key,计算其 hashCode,并除以分区个数取余
Range 分区
将一定范围内的数据映射到一个分区中,尽量保证每个分区数据均匀,而 且分区间有序
RDD 文件读取与保存
Spark 的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统
文件格式分为:text 文件、csv 文件、sequence 文件以及 Object 文件
文件系统分为:本地文件系统、HDFS、HBASE 以及数据库
text 文件
// 读取输入文件
val inputRDD: RDD[String] = sc.textFile("input/1.txt")
// 保存数据
inputRDD.saveAsTextFile("output")
sequence 文件
SequenceFile 文件是 Hadoop 用来存储二进制形式的 key-value 而设计的一种平面文件 (Flat File)
// 保存数据为 SequenceFile
dataRDD.saveAsSequenceFile("output")
// 读取 SequenceFile 文件
sc.sequenceFile[Int,Int]("output").collect().foreach(println)
object 对象文件
对象文件是将对象序列化后保存的文件,采用 Java 的序列化机制
// 保存数据
dataRDD.saveAsObjectFile("output")
// 读取数据
sc.objectFile[Int]("output").collect().foreach(println)
累加器
累加器用于把 Executor 的变量数据聚合到 Driver
对于累加器,在 Driver 中定义的变量,每个 Executor 的 Task 都会进行复制得到一个副本,Task 更新变量副本后,会返回 Driver 进行合并
val rdd = sc.makeRDD(List(1, 2, 3, 4, 5))
// 声明累加器
var sum = sc.longAccumulator("sum");
rdd.foreach(
num => {
// 使用累加器
sum.add(num)
}
)
// 获取累加器的值
println("sum = " + sum.value)
广播变量
闭包数据都是以 Task 为单位进行发送的,这样可能导致一个 Executor 中含有大量重复数据,浪费大量内存
Executor 本质上是一个 JVM,启动时会自动为其分配内存,因此可以将闭包数据放在该内存中,达到共享目的
val rdd1 = sc.makeRDD(List( ("a",1), ("b", 2), ("c", 3), ("d", 4) ),4)
val list = List( ("a",4), ("b", 5), ("c", 6), ("d", 7) )
// 声明广播变量
val broadcast: Broadcast[List[(String, Int)]] = sc.broadcast(list)
val resultRDD: RDD[(String, (Int, Int))] = rdd1.map {
case (key, num) => {
var num2 = 0
// 使用广播变量
for ((k, v) <- broadcast.value) {
if (k == key) {
num2 = v
}
}
(key, (num, num2))
}
}
Spark SQL
Spark SQL 是 Spark 用于处理结构化数据(structured data)的模块
Spark SQL 与 Hive
Hive 是早期唯一运行在 Hadoop 上的 SQL-on-Hadoop 工具,但 MapReduce 计算过程中需要进行大量磁盘 I/O,降低了运行效率
Shark 是 SparkSQL 的前身,它的出现使 SQL-on-Hadoop 的性能比 Hive 有了 10-100 倍的提高,但随着 Spark 的发展,Shark 对 Hive 过于依赖,制约了 Spark 的发展
SparkSQL 抛弃了原有 Shark 的代码,吸取了其中的优点,同时摆脱了对 Hive 的依赖性,在数据兼容、性能优化、组件扩展方面有极大提高
Spark SQL 特点
易整合
便捷地整合了 SQL 查询和 Spark 编程
数据访问方式统一
使用相同的方式连接不同的数据源
标准数据连接
通过 JDBC 或者 ODBC 来连接
DataFrame
Spark 1.x
在早期版本 Spark 1.x 中,DataFrame 类似于传统数据库中的二维表格
左侧的 RDD[Person]虽然以 Person 为类型参数,但 Spark 框架本身不了解 Person 类的内部结构
右侧的 DataFrame 多了数据的结构信息,即 schema,可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么
Spark 2.x
但在新版本 Spark 2.x 中,DataFrame=DataSet[Row],即DataFrame 每一行是 Row 类型,但每一行究竟有哪些字段,各个字段又是什么类型都无从得知,即每一列的值无法直接访问,只能用 getAS
方法得到各字段的具体值
testDataFrame.foreach{
line =>
val col1=line.getAs[String]("col1")
val col2=line.getAs[String]("col2")
}
DataFrame 查询优化
DataFrame 与 RDD 都是懒执行的,但 DataFrame 性能比 RDD 高,因为 Spark SQL 具有查询优化器:
图中构造了两个 DataFrame,先将他们 join 然后进行 filter 操作
如果原封不动地执行这个执行计划,最终的执行效率是不高的,因为 join 是一个代价较大的操作,也可能会产生一个较大的数据集
如果我们能将 filter 延迟到 join 之后执行,先对 DataFrame 进行过滤,再 join 过滤后的较小的结果集,便可以有效缩短执行时间
而 Spark SQL 的查询优化器正是这样做的,简而言之,就是将高成本的操作替换为低成本操作的过程
关于懒执行:
Spark算子主要划分为两类:transformation 和 action ,并且只有action算子触发的时候才会真正执行任务
常用的算子map、flatMap、filter都是 transformation 算子
而collect、count、saveAsTextFile、countByKey、foreach则为action算子
为什么 Spark 任务只有在调用 action 算子的时候,才会真正执行呢?
假如 transformation 算子直接触发 Spark 任务:
- 导致 map 执行完了要立即输出,数据也必然要落地(内存和磁盘)
- 对于 map 任务的生成、调度、执行以及彼此之间的 RPC 通信等,当涉及大数据量时,会很影响性能
MapReduce 正是因为中间结果需要落地,导致性能低下
Spark 只有调用 action 算子时才会真正执行任务,这是相对于 MapReduce 的优化之一
DataSet
这里以新版本 Spark 2.x 为例
DataFrame=DataSet[Row]
DataFrame 也可以称为 DataSet[Row],每一行的类型都是 Row,但每一行有哪些字段,各字段是什么类型都无法得知
DataSet 不同于 DataFrame,在定义了 case class 后可以直接获取每列的值
case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型
val test: Dataset[Coltest]=rdd.map{line=>
Coltest(line._1,line._2)
}.toDS
test.map{
line=>
println(line.col1)
println(line.col2)
}
DataSet -> DataFrame
DataSet 封装成 Row 类型后,可转为 DataFrame
import spark.implicits._
val testDF = testDS.toDF
DataFrame -> DataSet
DataFrame 给出每列的具体类型,使用 as 方法,可转为 DataSet
import spark.implicits._
case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型
val testDS = testDF.as[Coltest]
Spark Session
在以往 Spark Core 中,若要执行应用程序,需要先创建上下文对象 SparkContext
而 Spark SQL 对其进行了封装,称为 SparkSession,所以实际上的计算是由内部 SparkContext 完成的
Spark Session 是 Spark SQL 查询的起点
用户自定义函数 UDF
用户可以通过 spark.udf
创建自定义函数,实现自定义功能
spark.udf.register("addName",(x:String)=> "Name:"+x)
Spark Streaming
Spark Streaming 用于流式数据的处理,支持的数据源很多,例如 Kafka、简单 TCP 套接字等
与 Spark 基于 RDD 类似,Spark Streaming 基于 DStream,DStream 是随时间推移收到的数据序列
DStream 内部每个时间间隔收到的数据是作为 RDD 存在,而 DStream 是由这些 RDD 组成的序列,即离散化
总之,DStream 是对 RDD 在实时数据处理场景的封装
流式处理 和 批量处理
从数据处理的方式角度
- 流式处理:接收一条数据,立即进行处理
- 批量处理:接收一条数据,先暂存,收集一批数据后一起处理
实时处理 和 离线处理
从数据处理的延迟长短角度
- 实时处理:毫秒
- 离线处理:小时、天
而 Spark Streaming 是准实时(秒、分钟)的数据处理框架
架构
背压机制
Spark Streaming 处理任务采用生产者-消费者模式,接收器(或采集器,Receiver)接收到数据后,由 Driver 发送给 Executor 工作节点处理数据
此时可能会出现一些问题,如接收器接收太快,工作节点来不及处理数据,会造成数据积压,或工作节点处理数据太快,造成资源浪费
为了协调接收速率和处理速率,Spark Streaming 采用背压机制,可根据作业执行情况动态调整 Receiver 接收速率
- Post link: http://example.com/2023/01/07/Spark/
- Copyright Notice: All articles in this blog are licensed under unless otherwise stated.