image-20230218172120285

Spark 与 Hadoop

Spark 与 Hadoop 最主要的差别在于多个作业间的数据通信

Spark 多个作业间的数据通信基于内存,而 Hadoop 基于磁盘

image-20230126224831357

Hadoop:

一次性数据计算

image-20230107223303269

Hadoop 在处理数据时,先从磁盘中读取数据,然后进行逻辑操作,再将处理结果重新存储到磁盘中

通过磁盘 IO 来进行作业,会消耗大量资源,影响性能

Spark:

image-20230107223618591

Spark 优化了计算过程,把作业的计算结果放入内存中,这种方式的存取效率较高

总结:

在绝大多数场景中,Spark 的确比 MapReduce 更有优势

但正是 Spark 基于内存操作,在实际生产环境中,可能会由于内存的限制导致 Job 执行失败,此时 MapReduce 是更好的选择

即 Spark 并不能完全替代 MapReduce

Spark 核心模块

image-20230107224158550

  • Spark Core:

    提供最基础、最核心的功能

  • Spark SQL:

    用于操作结构化数据的组件,通过 Spark SQL,用户可以使用 SQL 语言来查询数据

  • Spark Streaming:

    用于流式计算的组件

  • Spark MLlib:

    用于机器学习的算法库

  • Spark GraphX:

    用于图计算的框架和算法库

Word Count

object WordCount {
    def main(args: Array[String]): Unit = {
        // 建立与 Spark 的连接
        val conf = new SparkConf().setMaster("local").setAppName("WordCount")
        val sparkContext = new SparkContext(conf)

        // 执行业务操作
        // 1 读取文件
        val lines = sparkContext.textFile("data")
        // 2 拆分 "hello world" -> "hello" "world"
        val words = lines.flatMap(_.split(" "))
        // 3 对单词分组 (hello, hello) (world, world)
        val group = words.groupBy(word => word)
        // 4 转换分组数据 (hello, hello) (world, world) -> (hello, 2) (world, 2)
        val count = group.map {
            case (word, list) => {
                (word, list.size)
            }
        }
        // 5 打印结果
        val array = count.collect()
        array.foreach(println)
        
        // 关闭连接
        sparkContext.stop()
    }
}
(hello,4)
(world,2)
(spark,2)

功能实现流程图

image-20230115155109532

Word Count 优化

object WordCount {
    def main(args: Array[String]): Unit = {

        val conf = new SparkConf().setMaster("local").setAppName("WordCount")
        val sparkContext = new SparkContext(conf)


        val lines = sparkContext.textFile("data")
        

        val words = lines.flatMap(_.split(" "))
    
        val wordToOne = words.map(
            word => (word, 1)
        )
        
        // 涉及 Scala 的元组引用,_1 引用第一个元素,_2 引用第二个元素
        val wordGroup = wordToOne.groupBy(
            t => t._1
        )
        
        val wordToCount = wordGroup.map {
            case (word, list) => {
                list.reduce(
                    (t1, t2) => {
                        (t1._1, t1._2 + t2._2)
                    }
                )
            }
        }
        
        val array = wordToCount.collect()
        array.foreach(println)
        
        sparkContext.stop()
    }
}
(hello,4)
(world,2)
(spark,2)

改进

image-20230125170914280

Spark 实现 Word Count

object WordCount {
    def main(args: Array[String]): Unit = {

        val conf = new SparkConf().setMaster("local").setAppName("WordCount")
        val sparkContext = new SparkContext(conf)


        val lines = sparkContext.textFile("data")
        

        val words = lines.flatMap(_.split(" "))
    
        val wordToOne = words.map(

            word => (word, 1)
        )
        
        // Spark 提供了更多功能,可以将分组和聚合用一个方法实现
        // reduceByKey 就是将相同的 key,对 value 进行聚合
        val wordToCount = wordToOne.reduceByKey(_ + _)
        
        val array = wordToCount.collect()
        array.foreach(println)
        
        sparkContext.stop()
    }
}
(hello,4)
(world,2)
(spark,2)
_ + _ 是由以下两步简化而来(至简原则)
wordToOne.reduceByKey((x, y) => {x + y})
wordToOne.reduceByKey((x, y) => x + y)

Spark 运行环境

Local 模式

在 IDEA 中运行代码的环境称之为开发环境,而 Local 模式与之不同

所谓的 Local 模式,就是不需要其他任何节点资源,就可以在本地执行 Spark 代码的环境,一般用于教学,调试,演示等

Standalone 模式

Local 本地模式只是用来进行练习演示的,真实工作中还是要将应用提交到对应的集群中去执行

只使用 Spark 自身节点运行的集群模式,也就是所谓独立部署(Standalone)模式

Spark 的 Standalone 模式体现了经典的 master-slave 模式

image-20230125173147108

Yarn 模式

独立部署(Standalone)模式由 Spark 自身提供计算资源,无需其他框架提供资源。

这种方式降低了和其他第三方资源框架的耦合性,独立性非常强

但是 Spark 主要是计算框架,而不是资源调度框架,所以本身提供的资源调度并不是它的强项,所以还是和其他专业的资源调度框架集成会更靠谱一些

K8S & Mesos 模式

Mesos 是 Apache 下的开源分布式资源管理框架,它被称为是分布式系统的内核,在 Twitter 得到广泛使用,管理着 Twitter 超过 30,0000 台服务器上的应用部署,但是在国内依然使用着传统的 Hadoop 大数据框架,使用 Mesos 框架的并不多

Windows 模式

在自己学习时,每次都需要启动虚拟机,启动集群,这是一个比较繁琐的过程, 并且会占大量的系统资源,导致系统执行变慢,不仅仅影响学习效果,也影响学习进度, Spark 提供了可以在 Windows 系统下启动本地集群的方式

Spark 运行架构

Spark 框架的核心是一个计算引擎,采用了标准的 master-slave 结构

image-20230126154922451

图中展示了 Spark 执行时的基本结构

Driver 表示 master, 负责管理整个集群中的作业任务调度

Executor 表示 slave,负责实际执行任务

核心组件

Driver

Spark 驱动器节点,用于执行 Spark 任务中的 main 方法,负责实际代码的执行工作

Driver 在 Spark 作业执行时主要负责:

  • 将用户程序转化为作业(job)
  • 在 Executor 之间调度任务(task)
  • 跟踪 Executor 的执行情况
  • 通过 UI 展示查询运行情况

实际上,我们无法准确地描述 Driver 的定义,因为在整个的编程过程中没有看到任何有关 Driver 的字眼

所以简单理解,所谓的 Driver 就是驱使整个应用运行起来的程序,也称之为 Driver 类

Executor

Spark Executor 是集群中工作节点(Worker)中一个 JVM 进程,负责在 Spark 作业中运行具体任务(Task),任务彼此之间相互独立

Spark 应用启动时,Executor 节点被同时启动,并且始终伴随着整个 Spark 应用的生命周期而存在

如果有 Executor 节点发生了 故障或崩溃,Spark 应用也可以继续执行,会将出错节点上的任务调度到其他 Executor 节点 上继续运行

Executor 有两个核心功能:

  • 负责运行组成 Spark 应用的任务,并将结果返回给驱动器进程

  • 它们通过自身的块管理器(Block Manager)为用户程序中要求缓存的 RDD 提供内存式存储

    RDD 是直接缓存在 Executor 进程内的,因此任务可以在运行时充分利用缓存数据加速运算

Master & Worker

Spark 集群的独立部署环境中,不需要依赖其他的资源调度框架,自身就实现了资源调度的功能

所以环境中还有两个核心组件:Master 和 Worker

Master 是一个进程,主要负责资源的调度和分配,进行集群的监控

Worker 也是进程,一个 Worker 运行在集群中的一台服务器上,由 Master 分配资源对数据进行并行的处理和计算

ApplicationMaster

Hadoop 用户向 YARN 集群提交应用程序时,提交程序中应该包含 ApplicationMaster,用于向资源调度器申请执行任务的资源容器 Container,运行用户自己的程序任务 job,监控整个任务的执行,跟踪整个任务的状态,处理任务失败等异常情况

即,ResourceManager(资源)和 Driver(计算)之间的解耦合靠的就是 ApplicationMaster

核心概念

Executor 与 Core

Spark Executor 是集群中运行在工作节点(Worker)中的一个 JVM 进程,是整个集群中的专门用于计算的节点

在提交应用中,可以提供参数指定计算节点的个数,以及对应的资源

这里的资源一般指的是工作节点 Executor 的内存大小和使用的虚拟 CPU 核(Core)数量

image-20230126155734010

并行度(Parallelism)

在分布式计算框架中一般都是多个任务同时执行,由于任务分布在不同的计算节点进行计算,所以能够真正地实现多任务并行执行

注意,这里是并行,而不是并发

这里我们将整个集群并行执行任务的数量称之为并行度

一个作业的并行度取决于框架的默认配置,应用程序也可以在运行过程中动态修改

有向无环图(DAG)

image-20230126155858119

DAG(Directed Acyclic Graph)有向无环图 是由点和线组成的拓扑图形,该图形具有方向,不会闭环

这里所谓的有向无环图,并不是真正意义的图形,而是由 Spark 程序直接映射成的数据流的高级抽象模型

简单理解就是将整个程序计算的执行过程用图形表示出来,这样更直观且更便于理解,可以用于表示程序的拓扑结构

大数据计算引擎框架我们根据使用方式的不同一般会分为四类,其中第一类就是 Hadoop 所承载的 MapReduce,它将计算分为两个阶段,Map 阶段 和 Reduce 阶段

对于上层应用来说,就不得不想方设法去拆分算法,甚至于不得不在上层应用实现多个 Job 的串联,以完成一个完整的算法,例如迭代计算,由于这样的弊端,催生了支持 DAG 框架的产生

Spark 核心编程

Spark 计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于处理不同的应用场景。

三大数据结构分别是:

  • RDD:弹性分布式数据集
  • 累加器:分布式共享只写变量
  • 广播变量:分布式共享只读变量

image-20230126162247383

上图模拟了 Spark 中分布式计算的过程,客户端向服务器发送计算任务

任务拆分:(1, 2, 3, 4) => (1, 2) (3, 4)

计算逻辑:n * 2

RDD

image-20230126162838979

RDD(Resilient Distributed Datase)叫做弹性分布式数据集,是 Spark 中最基本的数据处理模型,源码中是一个抽象类

RDD 的特点

  • 弹性

    • 存储的弹性:内存与磁盘的自动切换
    • 容错的弹性:数据丢失可以自动恢复
    • 计算的弹性:计算出错重试机制
    • 分片的弹性:可根据需要重新分片
  • 分布式:数据存储在大数据集群不同节点上

  • 数据集:RDD 封装了计算逻辑,并不保存数据

  • 数据抽象:RDD 是一个抽象类,需要子类具体实现

  • 不可变:RDD 封装了计算逻辑,是不可以改变的,想要改变只能产生新的 RDD,在新的 RDD 里面封装计算逻辑
  • 可分区、并行计算

RDD 与 IO 流的联系

IO

IO 操作体现了装饰者设计模式

字节流

一个一个字节读取,读一个打印一个

image-20230126171245898

字节流 + 缓冲区

一次读取一个 字节缓冲区 大小的字节数,存满后一起打印

image-20230126171114657

字符流

一次读取一个 字节缓冲区 大小的字节,此大小正好组装成一个字符,再暂存到 字符缓冲区,存满后一起打印

image-20230126171626972

image-20230126165309740

RDD

image-20230126165522532

  • RDD 的数据处理方式类似于 IO 流,也体现了装饰者设计模式
  • RDD 的数据只有在调用 collect 方法时,才会真正执行业务逻辑操作,之前的封装均为功能扩展
  • RDD 不保存数据,但 IO 可以通过缓冲区暂存一部分数据

RDD 创建

从内存中创建 RDD
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val rdd1 = sparkContext.parallelize(
    List(1, 2, 3, 4)
)
val rdd2 = sparkContext.makeRDD(
    List(1, 2, 3, 4)
)
rdd1.collect().foreach(println)
rdd2.collect().foreach(println)
sparkContext.stop()

Spark 主要提供了两个方法:parallelizemakeRDD

底层代码中 makeRDD 方法其实就是 parallelize 方法,且大部分时候使用 makeRDD 方法居多

从外存中创建 RDD

由外部存储系统的数据集创建 RDD 包括:

  • 本地的文件系统
  • 所有 Hadoop 支持的数据集,比如 HDFS、HBase 等
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val fileRDD: RDD[String] = sparkContext.textFile("input")
fileRDD.collect().foreach(println)
sparkContext.stop()
从其他 RDD 创建

通过一个 RDD 运算完后,再产生新的 RDD

直接创建 RDD(new)

使用 new 的方式直接构造 RDD,一般由 Spark 框架自身使用

RDD 分区

默认情况下,Spark 可以将一个作业切分多个任务后,发送给 Executor 节点并行计算,而能够并行计算的任务数量称之为并行度,这个数量可以在构建 RDD 时指定

注意,这里是指并行执行的任务数量,并不是指切分任务的数量

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val dataRDD: RDD[Int] =
    sparkContext.makeRDD(
        List(1,2,3,4),
        4)
val fileRDD: RDD[String] =
    sparkContext.textFile(
        "input",
        2)
fileRDD.collect().foreach(println)
sparkContext.stop()
  • 读取内存数据时,数据可以按照并行度的设定进行数据的分区操作
  • 读取文件数据时,数据是按照 Hadoop 文件读取的规则进行切片分区,而切片规则和数据读取规则有差异

RDD 算子

RDD 算子就是 RDD 方法,只是名称不同

RDD 方法分为转换方法和行动方法

image-20230126230055836

RDD 转换算子

RDD 根据数据处理方式的不同将算子整体上分为

  • Value 类型
  • 双 Value 类型
  • Key-Value 类型
Value类型
  • map

    将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换

  • mapPartitions

    将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是进行任意处理,也可以是过滤数据

map 与 mapPartitions 的区别

map 是分区内一个数据一个数据的执行,类似于串行操作

mapPartitions 是以分区为单位进行批处理操作

所以,map 因为类似于串行操作,性能比较低,而 mapPartitions 类似于批处理,性能较高,但是 mapPartitions 会长时间占用内存,可能会导致内存溢出,在内存有限的情况下不推荐使用

  • mapPartitionsWithIndex

    与 mapPartitions 类似,只是在处理时可以获取当前分区索引

  • flatMap

    将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射

  • glom

    将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变

  • groupBy

    将数据根据指定的规则进行分组,分区默认不变,但是数据会被打乱重新组合,这样的操作称之为 shuffle

  • filter

    将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃

  • sample

    根据指定的规则从数据集中抽取数据

  • distinct

    将数据集中重复的数据去重

  • coalesce

    根据数据量缩减分区,当存在过多小任务时,可以通过 coalesce 方法收缩合并分区,减少分区的个数,减小任务调度成本

  • sortBy

    用于排序数据

双 Value 类型
  • intersection

    对两个 RDD 求交集

  • union

    对两个 RDD 求并集

  • subtract

    对两个 RDD 求差集,以一个 RDD 元素为主,去除两个 RDD 中重复元素,将其他元素保留下来

  • zip

    将两个 RDD 中的元素以键值对的形式进行合并,其中键值对中的 Key 为第 1 个 RDD 中的元素,Value 为第 2 个 RDD 中的相同位置的元素

Key - Value 类型
  • partitionBy

    将数据按照指定 Partitioner 重新进行分区,Spark 默认的分区器是 HashPartitioner

  • reduceByKey

    可以将数据按照相同的 key 对 value 进行聚合

  • groupByKey

    将数据源的数据根据 key 对 value 进行分组

  • aggregateByKey

    将数据根据不同的规则进行分区内计算和分区间计算

  • foldByKey

    当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为 foldByKey

  • combineByKey

    最通用的对 key-value 型 RDD 进行聚集操作的聚集函数

  • sortByKey

    在一个 (K,V) 的 RDD 上调用,K 必须实现 Ordered 接口(特质),返回一个按照 key 进行排序的 RDD

  • join

    在类型为 (K,V) 和 (K,W) 的 RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的 (K,(V,W)) 的 RDD

RDD 行动算子

  • reduce

    聚集 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据

  • collect

    在驱动程序中,以数组 Array 的形式返回数据集的所有元素

  • count

    返回 RDD 中元素的个数

  • first

    返回 RDD 中的第一个元素

  • take

    返回一个由 RDD 的前 n 个元素组成的数组

  • takeOrdered

    返回该 RDD 排序后的前 n 个元素组成的数组

  • aggregate

    分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合

  • fold

    折叠操作,aggregate 的简化版操作

  • countByKey

    统计每种 key 的个数

  • save 相关

    将数据保存到不同格式的文件中

  • foreach

    分布式遍历 RDD 中的每一个元素,调用指定函数

RDD 序列化

从计算的角度,算子外的代码都是在 Driver 端执行,算子内的代码都是在 Executor 端执行

那么在 Scala 的函数式编程中,就会导致算子内经常会用到算子外的数据,如果使用的算子外的数据无法序列化,就意味着无法通过网络传值给 Executor 端执行,发生错误

所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列化,这个操作称之为闭包检测

Kryo 序列化框架

Java 的序列化能够序列化任何类,但是生成的字节多,序列化后对象比较大

Spark 出于性能的考虑,Spark2.0 开始支持另外一种 Kryo 序列化机制

Kryo 速度是 Serializable 的 10 倍,当 RDD 在 Shuffle 数据的时候,简单数据类型、数组和字符串类已经在 Spark 内部使用 Kryo 来序列化

RDD 依赖关系

image-20230209170526155

image-20230209170536238

image-20230209171241203

RDD 是不会保存数据的,为了提供容错性,需要将 RDD 的依赖关系保存下来

一旦发生错误,可根据血缘关系重新读取数据并计算

依赖关系

val rdd1 = rdd.map(_ * 2)

相邻两个 RDD 称为依赖关系

血缘关系

RDD1、RDD2、RDD3、RDD4 称为血缘关系

窄依赖

image-20230209173155612

窄依赖表示每一个父 RDD 的 Partition 最多被子 RDD 的一个 Partition 使用

窄依赖可类比独生子女

宽依赖

image-20230209173250322

宽依赖表示同一个父 RDD 的 Partition 被多个子 RDD 的 Partition 依赖,会引起 Shuffle

宽依赖可类比多生

RDD 持久化

image-20230209175349601

image-20230209175429945

RDD 不存储数据,如果一个 RDD 需要重复使用,那么就需要从头开始再次执行来获取数据

在数据执行任务较长,或数据比较重要的场合可以使用持久化

RDD Cache 缓存

RDD 通过 Cache 或者 Persist 方法将前面的计算结果进行缓存,默认情况下会把数据缓存在 JVM 的堆内存中

缓存有可能丢失或者由于内存不足而被删除,RDD 的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行

// cache 操作会增加血缘关系,不改变原有的血缘关系
println(wordToOneRdd.toDebugString)
// 数据缓存
wordToOneRdd.cache()
// 可以更改存储级别
mapRdd.persist(StorageLevel.MEMORY_AND_DISK_2)

image-20230209174139303

RDD CheckPoint 检查点

检查点其实就是将 RDD 中间结果写入磁盘

血缘依赖过长会造成容错成本过高,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销

缓存和检查点区别

  • Cache 缓存只是将数据保存起来,不切断血缘依赖

    Checkpoint 检查点切断血缘依赖

  • Cache 缓存的数据通常存储在内存,可靠性低

    Checkpoint 的数据通常存储在 HDFS 等容错、高可用的文件系统,可靠性高

RDD 分区器

Spark 目前支持 Hash 分区、Range 分区、用户自定义分区,默认为 Hash 分区

只有 Key-Value 类型的 RDD 才有分区器,非 Key-Value 类型的 RDD 分区的值是 None

Hash 分区

对于给定的 key,计算其 hashCode,并除以分区个数取余

Range 分区

将一定范围内的数据映射到一个分区中,尽量保证每个分区数据均匀,而 且分区间有序

RDD 文件读取与保存

Spark 的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统

文件格式分为:text 文件、csv 文件、sequence 文件以及 Object 文件

文件系统分为:本地文件系统、HDFS、HBASE 以及数据库

text 文件

// 读取输入文件
val inputRDD: RDD[String] = sc.textFile("input/1.txt")
// 保存数据
inputRDD.saveAsTextFile("output")

sequence 文件

SequenceFile 文件是 Hadoop 用来存储二进制形式的 key-value 而设计的一种平面文件 (Flat File)

// 保存数据为 SequenceFile
dataRDD.saveAsSequenceFile("output")
// 读取 SequenceFile 文件
sc.sequenceFile[Int,Int]("output").collect().foreach(println)

object 对象文件

对象文件是将对象序列化后保存的文件,采用 Java 的序列化机制

// 保存数据
dataRDD.saveAsObjectFile("output")
// 读取数据
sc.objectFile[Int]("output").collect().foreach(println)

累加器

累加器用于把 Executor 的变量数据聚合到 Driver

对于累加器,在 Driver 中定义的变量,每个 Executor 的 Task 都会进行复制得到一个副本,Task 更新变量副本后,会返回 Driver 进行合并

image-20230210172002330

val rdd = sc.makeRDD(List(1, 2, 3, 4, 5))
// 声明累加器
var sum = sc.longAccumulator("sum");
rdd.foreach(
    num => {
        // 使用累加器
        sum.add(num)
    }
)
// 获取累加器的值
println("sum = " + sum.value)

广播变量

闭包数据都是以 Task 为单位进行发送的,这样可能导致一个 Executor 中含有大量重复数据,浪费大量内存

Executor 本质上是一个 JVM,启动时会自动为其分配内存,因此可以将闭包数据放在该内存中,达到共享目的

image-20230210172135194

val rdd1 = sc.makeRDD(List( ("a",1), ("b", 2), ("c", 3), ("d", 4) ),4)
val list = List( ("a",4), ("b", 5), ("c", 6), ("d", 7) )
// 声明广播变量
val broadcast: Broadcast[List[(String, Int)]] = sc.broadcast(list)
val resultRDD: RDD[(String, (Int, Int))] = rdd1.map {
    case (key, num) => {
        var num2 = 0
        // 使用广播变量
        for ((k, v) <- broadcast.value) {
            if (k == key) {
                num2 = v
            }
        }
        (key, (num, num2))
    }
}

Spark SQL

Spark SQL 是 Spark 用于处理结构化数据(structured data)的模块

Spark SQL 与 Hive

Hive 是早期唯一运行在 Hadoop 上的 SQL-on-Hadoop 工具,但 MapReduce 计算过程中需要进行大量磁盘 I/O,降低了运行效率

Shark 是 SparkSQL 的前身,它的出现使 SQL-on-Hadoop 的性能比 Hive 有了 10-100 倍的提高,但随着 Spark 的发展,Shark 对 Hive 过于依赖,制约了 Spark 的发展

SparkSQL 抛弃了原有 Shark 的代码,吸取了其中的优点,同时摆脱了对 Hive 的依赖性,在数据兼容、性能优化、组件扩展方面有极大提高

Spark SQL 特点

易整合

便捷地整合了 SQL 查询和 Spark 编程

image-20230213214353200

数据访问方式统一

使用相同的方式连接不同的数据源

image-20230213214426269

标准数据连接

通过 JDBC 或者 ODBC 来连接

image-20230213214456341

DataFrame

Spark 1.x

在早期版本 Spark 1.x 中,DataFrame 类似于传统数据库中的二维表格

image-20230213221449968

左侧的 RDD[Person]虽然以 Person 为类型参数,但 Spark 框架本身不了解 Person 类的内部结构

右侧的 DataFrame 多了数据的结构信息,即 schema,可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么

Spark 2.x

但在新版本 Spark 2.x 中,DataFrame=DataSet[Row],即DataFrame 每一行是 Row 类型,但每一行究竟有哪些字段,各个字段又是什么类型都无从得知,即每一列的值无法直接访问,只能用 getAS 方法得到各字段的具体值

testDataFrame.foreach{
  line =>
    val col1=line.getAs[String]("col1")
    val col2=line.getAs[String]("col2")
}

DataFrame 查询优化

DataFrame 与 RDD 都是懒执行的,但 DataFrame 性能比 RDD 高,因为 Spark SQL 具有查询优化器

image-20230213215625359

图中构造了两个 DataFrame,先将他们 join 然后进行 filter 操作

如果原封不动地执行这个执行计划,最终的执行效率是不高的,因为 join 是一个代价较大的操作,也可能会产生一个较大的数据集

如果我们能将 filter 延迟到 join 之后执行,先对 DataFrame 进行过滤,再 join 过滤后的较小的结果集,便可以有效缩短执行时间

而 Spark SQL 的查询优化器正是这样做的,简而言之,就是将高成本的操作替换为低成本操作的过程

关于懒执行:

Spark算子主要划分为两类:transformation 和 action ,并且只有action算子触发的时候才会真正执行任务

常用的算子map、flatMap、filter都是 transformation 算子

而collect、count、saveAsTextFile、countByKey、foreach则为action算子

为什么 Spark 任务只有在调用 action 算子的时候,才会真正执行呢?

假如 transformation 算子直接触发 Spark 任务:

  • 导致 map 执行完了要立即输出,数据也必然要落地(内存和磁盘)
  • 对于 map 任务的生成、调度、执行以及彼此之间的 RPC 通信等,当涉及大数据量时,会很影响性能

MapReduce 正是因为中间结果需要落地,导致性能低下

Spark 只有调用 action 算子时才会真正执行任务,这是相对于 MapReduce 的优化之一

DataSet

这里以新版本 Spark 2.x 为例

DataFrame=DataSet[Row]

DataFrame 也可以称为 DataSet[Row],每一行的类型都是 Row,但每一行有哪些字段,各字段是什么类型都无法得知

DataSet 不同于 DataFrame,在定义了 case class 后可以直接获取每列的值

case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型

val test: Dataset[Coltest]=rdd.map{line=>
      Coltest(line._1,line._2)
    }.toDS

test.map{
      line=>
        println(line.col1)
        println(line.col2)
    }

DataSet -> DataFrame

DataSet 封装成 Row 类型后,可转为 DataFrame

import spark.implicits._

val testDF = testDS.toDF

DataFrame -> DataSet

DataFrame 给出每列的具体类型,使用 as 方法,可转为 DataSet

import spark.implicits._

case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型
val testDS = testDF.as[Coltest]

Spark Session

在以往 Spark Core 中,若要执行应用程序,需要先创建上下文对象 SparkContext

而 Spark SQL 对其进行了封装,称为 SparkSession,所以实际上的计算是由内部 SparkContext 完成的

Spark Session 是 Spark SQL 查询的起点

用户自定义函数 UDF

用户可以通过 spark.udf 创建自定义函数,实现自定义功能

spark.udf.register("addName",(x:String)=> "Name:"+x)

Spark Streaming

Spark Streaming 用于流式数据的处理,支持的数据源很多,例如 Kafka、简单 TCP 套接字等

与 Spark 基于 RDD 类似,Spark Streaming 基于 DStream,DStream 是随时间推移收到的数据序列

DStream 内部每个时间间隔收到的数据是作为 RDD 存在,而 DStream 是由这些 RDD 组成的序列,即离散化

总之,DStream 是对 RDD 在实时数据处理场景的封装

流式处理 和 批量处理

从数据处理的方式角度

  • 流式处理:接收一条数据,立即进行处理
  • 批量处理:接收一条数据,先暂存,收集一批数据后一起处理

实时处理 和 离线处理

从数据处理的延迟长短角度

  • 实时处理:毫秒
  • 离线处理:小时、天

而 Spark Streaming 是准实时(秒、分钟)的数据处理框架

架构

image-20230219154000410

image-20230219154017818

背压机制

image-20230219154236720

Spark Streaming 处理任务采用生产者-消费者模式,接收器(或采集器,Receiver)接收到数据后,由 Driver 发送给 Executor 工作节点处理数据

此时可能会出现一些问题,如接收器接收太快,工作节点来不及处理数据,会造成数据积压,或工作节点处理数据太快,造成资源浪费

为了协调接收速率和处理速率,Spark Streaming 采用背压机制,可根据作业执行情况动态调整 Receiver 接收速率