自学内容网 自学内容网

SparkSQL数据源与数据存储综合实践

1. 打开项目

  • 打开SparkSQLDataSource项目
    在这里插入图片描述

2. 查看数据集

2.1 查看JSON格式数据

  • 查看users.json文件
    在这里插入图片描述
{"name": "李小玲", "gender": "女", "age": 45}
{"name": "童安格", "gender": "男", "age": 26}
{"name": "陈燕文", "gender": "女", "age": 18}
{"name": "王晓明", "gender": "男", "age": 32}
{"name": "张丽华", "gender": "女", "age": 29}
{"name": "刘伟强", "gender": "男", "age": 40}
{"name": "赵静怡", "gender": "女", "age": 22}
{"name": "孙强东", "gender": "男", "age": 35}

2.2 查看CSV格式数据

  • 查看users.csv文件
    在这里插入图片描述
name,gender,age
李小玲,,45
童安格,,26
陈燕文,,18
王晓明,,32
张丽华,,29
刘伟强,,40
赵静怡,,22
孙强东,,35

2.3 查看TXT格式数据

  • 查看users.txt文件
    在这里插入图片描述
李小玲 女 45
童安格 男 26
陈燕文 女 18
王晓明 男 32
张丽华 女 29
刘伟强 男 40
赵静怡 女 22
孙强东 男 35

3. 添加单元测试依赖

  • pom.xml里添加单元测试框架依赖
    在这里插入图片描述
<dependency>                                    
    <groupId>junit</groupId>                    
    <artifactId>junit</artifactId>              
    <version>4.13.2</version>                   
</dependency>                                   
  • 刷新项目依赖
    在这里插入图片描述

4. 创建数据加载与保存对象

  • 创建net.huawei.practice
    在这里插入图片描述
  • practice子包里创建DataLoadAndSave对象
    在这里插入图片描述
  • 创建DataLoadAndSave伴生类
    在这里插入图片描述

4.1 创建Spark会话对象

  • 创建spark常量
    在这里插入图片描述
// 获取或创建Spark会话对象                                  
val spark = SparkSession.builder() // 创建Builder对象  
  .appName("DataLoadAndSave") // 设置应用程序名称          
  .master("local[*]") // 运行模式:本地运行                 
  .getOrCreate() // 获取或创建Spark会话对象                 

4.2 创建加载JSON数据方法

  • 创建loadJSONData()方法
    在这里插入图片描述
// 加载JSON数据方法                                       
def loadJSONData(filePath: String): DataFrame = {   
  spark.read.json(filePath)                         
}                                                   
  • 在伴生类里创建单元测试方法testLoadJSONData()方法
    在这里插入图片描述
@Test                                                      
def testLoadJSONData(): Unit = {                           
  // 加载JSON数据                                              
  val df = DataLoadAndSave.loadJSONData("data/users.json") 
  // 显示数据                                                  
  df.show()                                                
}                                                          
  • 运行testLoadJSONData()测试方法,查看结果
    在这里插入图片描述

4.3 创建加载CSV数据方法

  • 创建loadCSVData()方法
    在这里插入图片描述
// 加载CSV数据方法                                           
def loadCSVData(filePath: String): DataFrame = {       
  spark.read                                           
    .option("header", "true")                          
    .option("inferSchema", "true")                     
    .csv(filePath)                                     
}                                                      
  • 在伴生类里创建单元测试方法testLoadCSVData()方法
    在这里插入图片描述
@Test                                                       
def testLoadCSVData(): Unit = {                             
  // 加载CSV数据                                                
  val df = DataLoadAndSave.loadCSVData("data/users.csv")    
  // 显示数据                                                   
  df.show()                                                 
}                                                           
  • 运行testLoadCSVData()测试方法,查看结果
    在这里插入图片描述

4.4 创建加载Text数据方法

  • 创建loadTextData()方法
    在这里插入图片描述
// 加载TEXT数据方法                                       
def loadTextData(filePath: String): DataFrame = {   
  spark.read.text(filePath)                         
}                                                   
  • 在伴生类里创建单元测试方法testLoadTextData()方法
    在这里插入图片描述
  • 运行testLoadTextData()测试方法,查看结果
    在这里插入图片描述

4.5 创建加载JSON数据扩展方法

  • 创建loadJSONDataExpand()方法
    在这里插入图片描述
// 加载JSON数据扩展方法                                         
def loadJSONDataExpand(filePath: String): DataFrame = { 
  spark.read.format("json").load(filePath)              
}                                                       
  • 在伴生类里创建单元测试方法testLoadJSONDataExpand()方法
    在这里插入图片描述
  • 运行testLoadJSONDataExpand()测试方法,查看结果
    在这里插入图片描述

4.6 创建加载CSV数据扩展方法

  • 创建loadCSVDataExpand()方法
    在这里插入图片描述
// 加载CSV数据扩展方法                                            
def loadCSVDataExpand(filePath: String): DataFrame = {    
  spark.read.format("csv")                                
    .option("header", "true")                             
    .option("inferSchema", "true")                        
    .load(filePath)                                       
}                                                         
  • 在伴生类里创建单元测试方法testLoadCSVDataExpand()方法
    在这里插入图片描述
  • 运行testLoadCSVDataExpand()测试方法,查看结果
    在这里插入图片描述

4.7 创建加载Text数据扩展方法

  • 创建loadTextDataExpand()方法
    在这里插入图片描述
//  加载TEXT数据扩展方法                                          
def loadTextDataExpand(filePath: String): DataFrame = {   
  spark.read.format("text").load(filePath)                
}                                                         
  • 在伴生类里创建单元测试方法testLoadTextDataExpand()方法
    在这里插入图片描述
  • 运行testLoadTextDataExpand()测试方法,查看结果
    在这里插入图片描述

4.8 创建保存文本文件方法

  • 创建saveTextFile()方法
    在这里插入图片描述
// 保存数据到文本文件方法                                                   
def saveTextFile(inputPath: String, outputPath: String): Unit = {
  // 加载文本数据                                                      
  val df = spark.read.format("text").load(inputPath)             
  // 保存文本数据                                                      
  df.write.mode("overwrite").format("text").save(outputPath)     
}                                                                
  • 在伴生类里创建单元测试方法testSaveTextFile()方法
    在这里插入图片描述
  • 运行testSaveTextFile()测试方法,查看结果
    在这里插入图片描述

4.9 查看程序完整代码

package net.huawei.practice

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.junit.Test

/**
 * 功能:数据加载与保存
 * 作者:华卫
 * 日期:2025年01月18日
 */
object DataLoadAndSave {
  // 获取或创建Spark会话对象
  val spark = SparkSession.builder() // 创建Builder对象
    .appName("DataLoadAndSave") // 设置应用程序名称
    .master("local[*]") // 运行模式:本地运行
    .getOrCreate() // 获取或创建Spark会话对象

  // 加载JSON数据方法
  def loadJSONData(filePath: String): DataFrame = {
    spark.read.json(filePath)
  }

  // 加载CSV数据方法
  def loadCSVData(filePath: String): DataFrame = {
    spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .csv(filePath)
  }

  // 加载TEXT数据方法
  def loadTextData(filePath: String): DataFrame = {
    spark.read.text(filePath)
  }

  // 加载JSON数据扩展方法
  def loadJSONDataExpand(filePath: String): DataFrame = {
    spark.read.format("json").load(filePath)
  }

  // 加载CSV数据扩展方法
  def loadCSVDataExpand(filePath: String): DataFrame = {
    spark.read.format("csv")
      .option("header", "true")
      .option("inferSchema", "true")
      .load(filePath)
  }

  //  加载TEXT数据扩展方法
  def loadTextDataExpand(filePath: String): DataFrame = {
    spark.read.format("text").load(filePath)
  }

  // 保存数据到文本文件方法
  def saveTextFile(inputPath: String, outputPath: String): Unit = {
    // 加载文本数据
    val df = spark.read.format("text").load(inputPath)
    // 保存文本数据
    df.write.mode("overwrite").format("text").save(outputPath)
  }
}

// 伴生类
class DataLoadAndSave {
  @Test
  def testLoadJSONData(): Unit = {
    // 加载JSON数据
    val df = DataLoadAndSave.loadJSONData("data/users.json")
    // 显示数据
    df.show()
  }

  @Test
  def testLoadCSVData(): Unit = {
    // 加载CSV数据
    val df = DataLoadAndSave.loadCSVData("data/users.csv")
    // 显示数据
    df.show()
  }

  @Test
  def testLoadTextData(): Unit = {
    // 加载TEXT数据
    val df = DataLoadAndSave.loadTextData("data/users.txt")
    // 显示数据
    df.show()
  }

  @Test
  def testLoadJSONDataExpand(): Unit = {
    // 加载JSON数据
    val df = DataLoadAndSave.loadJSONDataExpand("data/users.json")
    // 显示数据
    df.show()
  }

  @Test
  def testLoadCSVDataExpand(): Unit = {
    // 加载CSV数据
    val df = DataLoadAndSave.loadCSVDataExpand("data/users.csv")
    // 显示数据
    df.show()
  }

  @Test
  def testLoadTextDataExpand(): Unit = {
    // 加载TEXT数据
    val df = DataLoadAndSave.loadTextDataExpand("data/users.txt")
    // 显示数据
    df.show()
  }

  @Test
  def testSaveTextFile(): Unit = {
    // 保存数据到文本文件
    DataLoadAndSave.saveTextFile("data/users.txt", "result/users")
  }
}

5. 实战小结

  • 在本次实战中,我们通过SparkSQLDataSource项目深入学习了如何使用Spark SQL加载和保存不同格式的数据。首先,我们查看了JSON、CSV和TXT格式的数据集,并通过DataLoadAndSave对象实现了数据的加载与保存功能。我们创建了多个方法,如loadJSONData()loadCSVData()loadTextData(),分别用于加载不同格式的数据,并通过单元测试验证了这些方法的正确性。此外,我们还扩展了数据加载方法,使用format()方法灵活加载数据,并实现了数据保存功能,如saveTextFile()方法,将数据保存为文本文件。通过本次实战,我们掌握了Spark SQL处理多种数据格式的基本操作,为后续的数据处理和分析打下了坚实基础。

原文地址:https://blog.csdn.net/howard2005/article/details/145220289

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!