自学内容网 自学内容网

SparkSQL介绍及使用

SparkSQL介绍及使用

一、什么是SparkSQL(了解)

spark开发时可以使用rdd进行开发,spark还提供saprksql工具,将数据转为结构化数据进行操作

1-1 介绍

官网:https://spark.apache.org/sql/

Spark SQL是 Apache Spark 用于处理结构化数据(DataFrame和Datasets)的模块。

在Spark1.0版本时引入了SparkSQL

数据的结构形式
  • 结构化数据
    • 表,DataFrame,Datasets
    • 构成
      • 元数据 描述数据的数据(描述信息,类型约束)
      • 数据本身
身高 int
179
170
156
132
200
  • 半结构化数据

    • json,xml ,有数据的描述信息,但是对数据内容的类型无法约束

    • {
          "name":"asdea"
      }
      
  • 非结构化数据

    • 文本文件
    • 图片文件
    • 视频文件
    • 音频文件

sparksql可以将非结构化 ,半结构化数据统一转化为结构化数据处理

Spark中使用的结构化数据有 DataFrame ,映射表(离线数仓开发使用)

1-2 特点

  • 易整合
    • 使用sql配合spark一起使用,封装了不同语言的dsl方法
  • 统一数据访问
    • 使用read方法可以读取hdfs数据,mysql数据,不同类型的文件数据(json,csv,orc)
    • 使用write方法可以写入hdfs,mysql不同类型的文件
  • 兼容hive
    • 使用hivesql方法
  • 标准的数据连接
    • 使用jdbc和odbc连接方式连接sparkSQL

1-3 SparkSQL与HiveSQL关系

  • shark
    • 运行的模式是hive on spark
    • 会将hivesqsl转换为spark的rdd
    • shark是基于hive开发的,维护麻烦,2015年停止维护
  • sparkSQL
    • 是spark团建独立开发的工具,2014年发布1.0版本
    • sparkSQL工具对spark的兼容性更好,优化性能得到提升
    • sparkSQL本质也是将sql语句转化为rdd执行,catalyst引擎负责将sql转化为rdd
    • sparkSQL可以连接使用hive的metastore服务,管理表的元数据

二、DataFrame详解(理解)

DataFrame是基于RDD进行封装的结构化数据类型,增加了scheme元数据

其中DataFrame类型在计算时,还是转为rdd计算

DataFrame的结构化数据有Row(行数据)和scheme元数据构成

  • Row 类型 表示一行数据
    • datafram就算是多行构成
# 导入行类Row
from pyspark.sql import Row

# 创建行数据
r1 = Row(1, '张三', 20)

# 行数取取值 按照下标取值
data = r1[0]
print(data)
data1 = r1[1]
print(data1)

# 指定字段创建行数据
r2 = Row(id=2, name='李四', age=22)
# 按照字段取值
data3 = r2['id']
print(data3)
data4 = r2['name']
print(data4)
  • schema表信息
    • 定义dataframe中的表的字段名和字段类型
# 导入数据类型
from pyspark.sql.types import *

# 定义schema信息
# 使用StructType类进行定义
# add()方法是指定字段信息
# 第一参数,字段名
# 第二个参数,字段信息
# 第三个参数是否允许为空值  默认是True,允许为空
schema_type = StructType().\
    add('id',IntegerType()).\
    add('name',StringType()).\
    add('age',IntegerType(),False)

三、DataFrame创建(掌握)

创建datafram数据

需要使用一个sparksession的类创建

SparkSession类是在SparkContext的基础上进行了封装,也就是SparkSession类中包含了SparkContext

3-1 基本创建

# 导入行类Row
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import *

# 创建行数据
r1 = Row(id=1, name='张三', age=20)
r2 = Row(id=2, name='李四', age=22)
# 创建元数据
schema = StructType(). \
    add('id', IntegerType()). \
    add('name', StringType()). \
    add('age', IntegerType())

# 创建dataframe
# 生成sparksession对象  按照固定写法创建
ss = SparkSession.builder.getOrCreate()
# 使用sparksession对象方法创建df
# createDataFrame 第一参数是一个列表数据,将每行数据放入列表
# 第二个参数指定表元数据信息
# df是一个dataframe类型的对象
df = ss.createDataFrame([r1, r2], schema=schema)

# dataframe数据的操作
# 查看df数据
df.show()  # 查看所有数据,超过20行时,默认只显示20行
# 查看元信息
df.printSchema()

3-2 RDD和DF之间的转化

  • rdd的二维数据转化为dataframe
    • rdd.toDF()
# RDD和DF之间的转换
# 导入SparkSession
from pyspark.sql import SparkSession

# 创建对象
ss = SparkSession.builder.getOrCreate()

# 使用sparksession获取sparkcontext
sc = ss.sparkContext # 不要括号,可以直接获取到sparkcontext对象

# 生成rdd数据
# rdd转df时,要求数据是二维嵌套列表
data = [[1,'张三',20,'男'],[2,'小红',19,'女']]
rdd = sc.parallelize(data)

# rdd转df
df = rdd.toDF(schema='id int,name string,age int,gender string')

# 查看df数据
df.show()

# 查看表结构
df.printSchema()
  • df转为rdd
# 将df转为rdd
rdd2 = df.rdd
# 查看rdd中数据
res = rdd2.collect() # [Row(),Row()]
# 转化后的rdd中每个元素是有个Row类对象
print(res)
print(res[0])
print(res[0]['name'])

3-3 pandas和spark之间转化

pandas和spark之间的df相互转化

  • pandas的df转为spark的df
# Pandas和spark之间的转化
import pandas as pd
from pyspark.sql import SparkSession
# 创建pd的df
pd_df = pd.DataFrame(
    {
        'id':[1,2,3,4],
        'name':['a','b','c','d'],
        'age':[20,21,22,24],
        'gender':['男','女','男','男']
    }
)
# 查看数据
print(pd_df)

# 将pd_df 转为spark的df
ss = SparkSession.builder.getOrCreate()
spark_df = ss.createDataFrame(pd_df)

# 查看数据
spark_df.show()
  • spark的df转为pandas的df
    • toPandas
# 将spark_df转为pd_df
pd_df2 = spark_df.toPandas()
print(pd_df2)

3-4 读取文件数据转为df

通过read方法读取数据转为df

  • ss.read
# 读取数据转化为df
from pyspark.sql import SparkSession

# 创建sparksession
ss = SparkSession.builder.getOrCreate()

# 读取不同数据源
# header=True 是否需要获取表头
# sep 指定数据字段按照什么字符分割
df = ss.read.csv('hdfs://node1:8020/data/students.csv',header=True,sep=',')
# schema当没有表头时,可以自己指定字段
df2 = ss.read.csv('hdfs://node1:8020/data/students.csv',header=False,sep=',',schema='user_id string,username string,sex string,age string,cls string')

df3 = ss.read.json('hdfs://node1:8020/data/employees.json')
df4 = ss.read.orc('hdfs://node1:8020/data/users.orc')
df5 = ss.read.parquet('hdfs://node1:8020/data/users.parquet')


# 查看
# show中可以指定显示多少行,默认是20行
df.show(100)

df2.show()

四、DataFrame基本使用(掌握)

4-1 SQL语句

使用sparksession提供的sql方法,编写sql语句执行

# 使用sql方式开发
from pyspark.sql import SparkSession

ss = SparkSession.builder.getOrCreate()

# 读取数据得到df数据
df = ss.read.csv('hdfs://node1:8020/data/students.csv',header=True,sep=',',schema='id string,name string,gender string,age int,cls string')

# 对df数据进行sql操作
# 需要给df指定一个表名
df.createTempView('tb_user')

# 编写sql语句执行
# sql执行后的结果被保存新的df中
new_df =  ss.sql('select gender,avg(age) as avg_data from tb_user group by gender')
new_df.show()

4-2 DSL方法

DSL方法是df提供的数据操作函数


使用方式

df.方法()

可以进行链式调用

df.方法().方法().方法()

方法执行后返回一个新的df保存计算结果

new_df = df.方法()

spark提供DSL方法和sql的关键词一样,使用方式和sql基本类似,在进行数据处理时,要按照sql的执行顺序去思考如何处理数据

from join 知道数据在哪 df本身就是要处理的数据 df.join(df2) from 表

where 过滤需要处理的数据 df.join(df2).where()

group by 聚合 数据的计算 df.join(df2).where().groupby().sum()

having 计算后的数据进行过滤 df.join(df2).where().groupby().sum().where()

select 展示数据的字段 df.join(df2).where().groupby().sum().where().select()

order by 展示数据的排序 df.join(df2).where().groupby().sum().where().select().orderBy()

limit 展示数据的数量 df.join(df2).where().groupby().sum().where().select().orderBy().limit()


DSL方法执行完成后会得到一个处理后的新的df

# 使用DSL方式开发
from pyspark.sql import  SparkSession

ss = SparkSession.builder.getOrCreate()

# 生成df
df = ss.read.csv('hdfs://node1:8020/data/students.csv',header=True,sep=',',schema='id string,name string,gender string,age int,cls string')
# 查看df数据
df.show()

print('---------------select方法----------------------')
# 使用select方法指定输出展示的数据字段
# 方式一指定字段
df_select = df.select('id','name')
# 方式二
df_select2 = df.select(df.age,df.gender)
# 方式三
df_select3 = df.select(df['id'],df['cls'])

df_select.show()
df_select2.show()
df_select3.show()

print('---------------alias方法----------------------')
# 字段名称修改,需要配合select中使用
df_alias = df.select(df.id.alias('user_id'),df.name.alias('username'))
df_alias.show()

print('---------------cast方法----------------------')
# 修改字段的数据类型
df.printSchema()
df_cast = df.select(df.id.cast('int'),df.name,df.age)
df_cast.printSchema()

print('---------------where方法----------------------')
# 数据过滤,where方法内部是调用了filter方法
# 方式1
df_where = df.where('age > 20')
df_where.show()
#方式2
df_where2 = df.where(df.age > 20)
df_where2.show()

# 与或非多条件 只能使用方式1  条件的书写和在sql中的where书写内容一样
df_where3 = df.where('age > 20 and gender = "男" ')
df_where3.show()

print('---------------groupby方法----------------------')
# 分组计算,可以配和聚合方法一起使用  使用该方式聚合一次只能计算一个聚合数据 ,可以使用内置函数配合agg方法
# groupby指定分组字段,可以指定多个
# avg 聚合方法  指定聚合字段  sum  count  avg  max  min
df_groupby = df.groupby('gender').avg('age')
df_groupby.show()

# groupby指定分组字段,可以指定多个
df_groupby2 = df.groupby('gender','cls').avg('age')
df_groupby2.show()

# 分组后的数据过滤
df_groupby3 = df.groupby('gender','cls').avg('age').where(' avg(age) > 19')
df_groupby3.show()

print('---------------orderby方法----------------------')
# 数据排序 内部调用sort方法
df_orderby = df.orderBy('age')
df_orderby.show()
# ascending=False 降序
df_orderby2 = df.orderBy('age',ascending=False)
df_orderby2.show()

print('---------------limit方法----------------------')
# 指定获取多条数据
df_limit = df.orderBy('age',ascending=False).limit(5)

df_limit.show()



五、数据的关联与和并[掌握]

5-1 join关联

  • 内关联
  • 左关联
  • 右关联
from pyspark.sql import SparkSession

ss =SparkSession.builder.getOrCreate()

# 读取文件数据
df1 = ss.read.csv('hdfs://node1:8020/data/students.csv',header=True,sep=',',schema='user_id string,username string,sex string,age string,cls string')
df2 = ss.read.csv('hdfs://node1:8020/data/students2.csv',header=False,sep=',',schema='user_id string,username string,sex string,age string,cls string')

# 两表进行关联
# 内关联 第一个参数,关联的df  第二参数 关联字段  第三个参数  关联方式 默认inner
df_join = df1.join(df2,df1.user_id == df2.user_id)
df_join2 = df1.join(df2,'user_id')
# 左关联
df_left= df1.join(df2,'user_id','left')
# 右关联
df_right = df1.join(df2,'user_id','right')

# show查看数据
df1.show()
df2.show()
df_join.show()
df_join2.show()
print('---------------------------------')
df_left.show()
df_right.show()

5-2 Union合并

from pyspark.sql import SparkSession

ss =SparkSession.builder.getOrCreate()

# 读取文件数据
df1 = ss.read.csv('hdfs://node1:8020/data/students.csv',header=True,sep=',',schema='user_id string,username string,sex string,age string,cls string')
df2 = ss.read.csv('hdfs://node1:8020/data/students2.csv',header=False,sep=',',schema='user_id string,username string,sex string,age string,cls string')

# 两表进行关联  合并后不会去重
df_union = df1.union(df2)

df_unionAll =  df1.unionAll(df2)

# 合并后的数据去重
df_distinct =  df_union.distinct().orderBy('user_id')

# 查看数据
df_union.show(100)
print('****************************************************************')
df_unionAll.show(100)
print('****************************************************************')
df_distinct.show(100)

六、缓存和checkpoint[了解]

# df的数据持久化
from pyspark.sql import SparkSession

ss = SparkSession.builder.getOrCreate()

# 指定checkpoint的位置
sc = ss.sparkContext # 或sparkcontext对象
sc.setCheckpointDir('hdfs://node1:8020/df_checkpoint')

# 读取文件数据
df1 = ss.read.csv('hdfs://node1:8020/data/students.csv',header=True,sep=',',schema='user_id string,username string,sex string,age int,cls string')
# 缓存 通过缓存级别指定缓存位置  默认是内存和磁盘上
# df1.persist()
# 使用checkpoint
df1.checkpoint()

df1_sum = df1.groupby('sex').sum('age')

#查看计算结果
df1_sum.show()

原文地址:https://blog.csdn.net/weixin_58305115/article/details/142894229

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!