自学内容网 自学内容网

RDD的介绍、RDD的特点、创建RDD数据

1. RDD介绍

1.1 Spark开发方式

1.1.1 交互式开发

  • 通过不同的命令进入不同的语言交互开发界面。
  • 代码不能持久保存,一般用来测试某个代码的执行是否正确。

在 Spark 中,交互式开发主要通过 Spark Shell 来实现。

  • 对于 Scala 版本的 Spark Shell
    在命令行中输入spark-shell命令即可启动。启动后,可以在命令提示符下输入 Scala 代码来操作 Spark。
    在这里插入图片描述
  • 对于 Python 版本的 Spark Shell(PySpark):
    使用pyspark命令启动。在 PySpark 中,可以使用 Python 语言进行 Spark 开发。
    在这里插入图片描述
    注意:在spark shell中,可以使用:q或者:quit退出。

1.1.2 脚本式开发

  • 将编写代码保存在文件,对代码文件进行运行实现spark的计算。
  • 使用IDE工具进行代码文件编写

1.2 Spark支持的开发语言

Spark支持的开发语言有:

  • java
  • scala
  • python
  • R
  • SQL

1.3 RDD介绍

在 Apache Spark 中,弹性分布式数据集(Resilient Distributed Dataset,简称 RDD)是其核心概念之一。
RDD是spark的一种数据模型(规定数据的存储结构和计算方法),RDD将数据分布式存储在不同服务器内存上,通过RDD共享不同服务器的内存数据,所以Spark是基于内存进行分布式数据计算的框架。
它具有以下主要特点:

  1. 弹性:
    • 可以根据计算的需求将数据进行分区拆分,本质就是将数据分成多份,每份数据称为一个分区。
    • 每个分区都会对应一个线程任务执行计算。
  2. 分布式:数据分布在集群中的多个节点上,可以并行处理。
  3. 不可变:一旦创建,RDD 的内容不能被修改,只能通过转换操作生成新的 RDD。

2. RDD特性

  • 分区
    • 可以将计算的海量数据分成多份,需要分成多少分区可以通过方法指定。
    • 每个分区都可以对应一个task线程执行计算
  • `只读``
    • RDD中的数据不能直接修改,需要通过方法计算后得到一个新的RDD。
    • RDD本身存储的数据只能读取。
  • 依赖
    • RDD之间是有依赖关系的。
    • 新的RDD是通过旧的RDD计算得到的。
  • 缓存
    • 可以将计算中的结果缓存起来,如果后续计算错误时,可以从缓存位置重新计算。
    • 将数据存储在内存或本地磁盘
    • 作用是容错
    • 缓存在执行计算任务程序结束后会释放删除。
  • checkpoint
    • 作用和缓存一样
    • checkpoint可以将数据存储在分布式存储系统中,比如HDFS。

3. 创建RDD数据

将需要计算的数据转为rdd的数据,就可以利用spark的内存计算方法进行分布式计算操作,这些计算方法就是有rdd提供的
rdd数据的转化方法是有sparkcontext提供的,所以需要先生成sparkcontext,sparkcontext中还包含资源申请和任务划分功能
SparkContext称为Spark的入口类

3.1 Python数据转化为RDD

将python数据转为RDD

data = [1,2,3,4]

# 将python的列表数据转为RDD,需要使用pyspark中的sparkcontext类,该类中封装转化方法
from pyspark import SparkContext

# 1.先将类生成对象
sc = SparkContext()
# 2. 使用对象方法将python的列表转为RDD
rdd = sc.parallelize(data)

#3. 使用rdd提供的方法进行分布式聚合计算
res = rdd.reduce(lambda x, y: x + y)
print(res)

结果输出:
在这里插入图片描述

3.2 文件数据HDFS转化为RDD

在这里插入图片描述

#读取hdfs文件数据转为rdd
from pyspark import SparkContext\

# 1.创建对象
sc = SparkContext()

#2.读取hdfs的文件数据
# 指定读取的文件路径
rdd1 = sc.textFile('hdfs://node1:8020/data/stu.txt')
# 指定读取的目录路径
rdd2 = sc.textFile('hdfs://node1:8020/data')

# 3. 查看rdd中的读取数据
res1 = rdd1.collect()
print(res1)

res2 = rdd2.collect()
print(res2)

结果输出:
在这里插入图片描述

3.3 RDD分区

Python数据转发的分区数指定

#  RDD的分区指定
from pyspark import SparkContext
sc = SparkContext()
data = [1,2,3,4,5,6,7,8]

#转化RDD时指定分区数
rdd = sc.parallelize(data,numSlices=8)

#查看分区后的数据形式 glom()查看分区形式
res = rdd.glom().collect()
print(res)

#读取的文件指定分区数
# 文件在进行分区时,有时候会多一个空分区
# 文件大小 % 分区数 = 值 -- 余数
# 余数/值 占比 超过10%额外会创建一个分区
rdd2 = sc.textFile("hdfs://node1:8020/data/stu.txt",minPartitions=2)
res2 = rdd2.glom().collect()
print(res2)

结果输出:
在这里插入图片描述

3.4 小文件数据读取

在一个目录下,有多个文件,如果文件的大小不够一个块的大小,一个文件就对应一个分区,文件超过一个块,那就一个block(128M)块对应一个分区。
目录下都是小文件,那么读取目录下的文件数据,会对应很多个分区。


一个分区对应一个task线程,当小文件过多时,会占用大量的线程,造成资源浪费。
使用wholeTextFiles方法可以解决
该方法会现将读取到的数据合并在一起,然后重新进行分区。

# 小文件读取
from pyspark import SparkContext
# 1-创建对象
sc = SparkContext()

# 2-读取hdfs的文件数据
# 指定读取目录,可以读取目录下的所有文件数据
rdd2 = sc.textFile('hdfs://node1:8020/data')

# 小文件的数据读取 将小文件数据进行合并后按照指定的分区数进行分区
rdd3 = sc.wholeTextFiles('hdfs://node1:8020/data',minPartitions=2)
# 3-查看rdd中的读取的数据
res = rdd2.glom().collect()
print(res)

res = rdd3.glom().collect()
print(res)

结果输出:
在这里插入图片描述


原文地址:https://blog.csdn.net/m0_70882914/article/details/142775638

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!