自学内容网 自学内容网

SparkSQL 读写数据攻略:从基础到实战

目录

一、输入Source

1)代码演示最普通的文件读取方式:

2) 通过jdbc读取数据库数据

3) 读取table中的数据【hive】

二、输出Sink

实战一:保存普通格式

实战二:保存到数据库中

实战三:将结果保存在hive表中

三、总结


        在大数据处理领域,SparkSQL 以其强大的数据处理能力和丰富的数据源支持备受青睐。它能够高效地读取和写入多种格式的数据,无论是本地文件、分布式文件系统(如 HDFS)上的数据,还是数据库、Hive 表中的数据,都能轻松驾驭。今天,就让我们深入探究 SparkSQL 读写数据的方式,通过详细的代码示例和原理讲解,助你全面掌握这一关键技能。

 

一、输入Source

 

  • 类型:text / csv【任意固定分隔符】 / json / orc / parquet / jdbc / table【Hive表】
  • 语法:spark.read.format(格式).load(读取的地址)

方式一:给定读取数据源的类型和地址

spark.read.format("json").load(path)
spark.read.format("csv").load(path)
spark.read.format("parquet").load(path)

方式二:直接调用对应数据源类型的方法

spark.read.json(path)
spark.read.csv(path)
spark.read.parquet(path)

特殊参数:option,用于指定读取时的一些配置选项

spark.read.format("csv").option("sep", "\t").load(path)

jdbcDF = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .load()

 

1)代码演示最普通的文件读取方式:

from pyspark.sql import SparkSession
import os

if __name__ == '__main__':
# 构建环境变量
# 配置环境
os.environ['JAVA_HOME'] = 'D:/Program Files/Java/jdk1.8.0_271'
# 配置Hadoop的路径,就是前面解压的那个路径
os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
# 配置base环境Python解析器的路径
os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

# 获取sparkSession对象
spark = SparkSession.builder.master("local[2]").appName("第一次构建SparkSession").config(
"spark.sql.shuffle.partitions", 2).getOrCreate()

df01 = spark.read.json("../../datas/resources/people.json")
df01.printSchema()
df02 = spark.read.format("json").load("../../datas/resources/people.json")
df02.printSchema()
df03 = spark.read.parquet("../../datas/resources/users.parquet")
df03.printSchema()
#spark.read.orc("")
df04 = spark.read.format("orc").load("../../datas/resources/users.orc")
df04.printSchema()
df05 = spark.read.format("csv").option("sep",";").load("../../datas/resources/people.csv")
df05.printSchema()

df06 = spark.read.load(
path="../../datas/resources/people.csv",
format="csv",
sep=";"
)
df06.printSchema()

spark.stop()

 

2) 通过jdbc读取数据库数据

先在本地数据库或者linux数据库中插入一张表:

CREATE TABLE `emp`  (
  `empno` int(11) NULL DEFAULT NULL,
  `ename` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `job` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `mgr` int(11) NULL DEFAULT NULL,
  `hiredate` date NULL DEFAULT NULL,
  `sal` decimal(7, 2) NULL DEFAULT NULL,
  `comm` decimal(7, 2) NULL DEFAULT NULL,
  `deptno` int(11) NULL DEFAULT NULL
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of emp
-- ----------------------------
INSERT INTO `emp` VALUES (7369, 'SMITH', 'CLERK', 7902, '1980-12-17', 800.00, NULL, 20);
INSERT INTO `emp` VALUES (7499, 'ALLEN', 'SALESMAN', 7698, '1981-02-20', 1600.00, 300.00, 30);
INSERT INTO `emp` VALUES (7521, 'WARD', 'SALESMAN', 7698, '1981-02-22', 1250.00, 500.00, 30);
INSERT INTO `emp` VALUES (7566, 'JONES', 'MANAGER', 7839, '1981-04-02', 2975.00, NULL, 20);
INSERT INTO `emp` VALUES (7654, 'MARTIN', 'SALESMAN', 7698, '1981-09-28', 1250.00, 1400.00, 30);
INSERT INTO `emp` VALUES (7698, 'BLAKE', 'MANAGER', 7839, '1981-05-01', 2850.00, NULL, 30);
INSERT INTO `emp` VALUES (7782, 'CLARK', 'MANAGER', 7839, '1981-06-09', 2450.00, NULL, 10);
INSERT INTO `emp` VALUES (7788, 'SCOTT', 'ANALYST', 7566, '1987-04-19', 3000.00, NULL, 20);
INSERT INTO `emp` VALUES (7839, 'KING', 'PRESIDENT', NULL, '1981-11-17', 5000.00, NULL, 10);
INSERT INTO `emp` VALUES (7844, 'TURNER', 'SALESMAN', 7698, '1981-09-08', 1500.00, 0.00, 30);
INSERT INTO `emp` VALUES (7876, 'ADAMS', 'CLERK', 7788, '1987-05-23', 1100.00, NULL, 20);
INSERT INTO `emp` VALUES (7900, 'JAMES', 'CLERK', 7698, '1981-12-03', 950.00, NULL, 30);
INSERT INTO `emp` VALUES (7902, 'FORD', 'ANALYST', 7566, '1981-12-03', 3000.00, NULL, 20);
INSERT INTO `emp` VALUES (7934, 'MILLER', 'CLERK', 7782, '1982-01-23', 1300.00, NULL, 10);

dept的数据:

CREATE TABLE `dept`  (
  `deptno` int(11) NULL DEFAULT NULL,
  `dname` varchar(14) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `loc` varchar(13) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of dept
-- ----------------------------
INSERT INTO `dept` VALUES (10, 'ACCOUNTING', 'NEW YORK');
INSERT INTO `dept` VALUES (20, 'RESEARCH', 'DALLAS');
INSERT INTO `dept` VALUES (30, 'SALES', 'CHICAGO');
INSERT INTO `dept` VALUES (40, 'OPERATIONS', 'BOSTON');

接着放驱动程序:

py4j.protocol.Py4JJavaError: An error occurred while calling o67.load.
: java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:102)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:102)

Python环境放入MySQL连接驱动

  • 找到工程中pyspark库包所在的环境,将驱动包放入环境所在的jars目录中
  • 如果是Linux上:注意集群模式所有节点都要放。

第一种情况:

假如你是windows环境:

最终的路径是在这里:

第二种情况:linux环境下,按照如下方式进行

# 进入目录
cd /opt/installs/anaconda3/lib/python3.8/site-packages/pyspark/jars

# 上传jar包:mysql-connector-java-5.1.32.jar

代码练习:

import os

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType

if __name__ == '__main__':
# 获取sparkSession对象
# 设置 任务的环境变量
os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jdk1.8.0_77'
# 配置Hadoop的路径,就是前面解压的那个路径
os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
# 配置base环境Python解析器的路径
os.environ['PYSPARK_PYTHON'] = r'C:\ProgramData\Miniconda3\python.exe'  # 配置base环境Python解析器的路径
os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
# 得到sparkSession对象
spark = SparkSession.builder.master("local[2]").appName("").config("spark.sql.shuffle.partitions", 2).getOrCreate()
# 处理逻辑
# 读取json 数据
df1 = spark.read.format("json").load("../../datas/sql/person.json")
df1.show()
# 另一种写法,推荐使用这一种
df2 = spark.read.json("../../datas/sql/person.json")
df2.show()
df3 = spark.read.csv("../../datas/dept.csv")
df4 = spark.read.format("csv").load("../../datas/dept.csv")

# 读取分隔符为别的分隔符的文件
user_schema = StructType([
StructField(name="emp_id", dataType=StringType(), nullable=False),
StructField(name="emp_name", dataType=StringType(), nullable=True),
StructField(name="salary", dataType=DoubleType(), nullable=True),
StructField(name="comm", dataType=DoubleType(), nullable=True),
StructField(name="dept_id", dataType=LongType(), nullable=True)
])
# 使用csv 读取了一个 \t 为分隔符的文件,读取的数据字段名很随意,所以可以自定义
df5 = spark.read.format("csv").option("sep","\t").load("../../datas/emp.tsv",schema=user_schema)
df5.show()

# 昨天的作业是否也可以有另一个写法
movie_schema = StructType([
StructField(name="movie_id", dataType=LongType(), nullable=False),
StructField(name="movie_name", dataType=StringType(), nullable=True),
StructField(name="movie_type", dataType=StringType(), nullable=True)
])
movieDF = spark.read.format("csv").option("sep","::").load("../../datas/zuoye/movies.dat",schema=movie_schema)
movieDF.show()

spark.read.load(
path="../../datas/zuoye/movies.dat",
format="csv",
sep="::",
schema=movie_schema
).show()
dict = {"user":"root","password":"root"}
jdbcDf = spark.read.jdbc(url="jdbc:mysql://localhost:3306/spark",table="emp",properties=dict)
jdbcDf.show()
# jdbc的另一种写法
jdbcDf2 = spark.read.format("jdbc") \
.option("driver", "com.mysql.cj.jdbc.Driver") \
.option("url", "jdbc:mysql://localhost:3306/spark") \
.option("dbtable", "spark.dept") \
.option("user", "root") \
.option("password", "root").load()
jdbcDf2.show()

# 读取hive表中的数据

# 关闭
spark.stop()

 

3) 读取table中的数据【hive】

海量数据,如何处理,存储在hdfs上

第一种:

使用spark读取hdfs上的数据(可以使用sparkCore读取,也可以使用sparksql读取),将数据变为表【数据+Schema】,然后编写sql或者sparkCore代码。

rdd --> dataFrame

第二种:推荐

将hdfs上的数据映射成hive的表,然后通过sparkSql连接hive, 编写 sql 处理需求。

  • 场景:Hive底层默认是MR引擎,计算性能特别差,一般用Hive作为数据仓库,使用SparkSQL对Hive中的数据进行计算
    • 存储:数据仓库:Hive:将HDFS文件映射成表
    • 计算:计算引擎:SparkSQL、Impala、Presto:对Hive中的数据表进行处理
  • 问题:SparkSQL怎么能访问到Hive中有哪些表,以及如何知道Hive中表对应的HDFS的地址?

Hive中的表存在哪里?元数据--MySQL , 启动metastore服务即可。

本质上:SparkSQL访问了Metastore服务获取了Hive元数据,基于元数据提供的地址进行计算

先退出base环境:conda deactivate
启动服务:
启动hdfs:  start-dfs.sh  因为hive的数据在那里存储着
启动yarn:  start-yarn.sh 因为spark是根据yarn部署的,假如你的spark是standalone模式,不需要启动yarn.
日志服务也需要启动一下:
mapred --daemon start historyserver
# 启动Spark的HistoryServer:18080
/opt/installs/spark/sbin/start-history-server.sh
启动metastore服务: 因为sparkSQL需要知道表结构,和表数据的位置
hive-server-manager.sh start metastore
启动spark服务: 啥服务也没有了,已经启动完了。
查看metastore服务:
hive-server-manager.sh status metastore

修改配置:

cd /opt/installs/spark/conf
新增:hive-site.xml
vi hive-site.xml

在这个文件中,编写如下配置:
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <property>
        <name>hive.metastore.uris</name>
        <value>thrift://bigdata01:9083</value>
    </property>
</configuration>

接着将该文件进行分发:
xsync.sh hive-site.xml

操作sparkSQL:

/opt/installs/spark/bin/pyspark --master local[2] --conf spark.sql.shuffle.partitions=2

此处的pyspark更像是一个客户端,里面可以通过python编写spark代码而已。而我们以前安装的pyspark更像是spark的python运行环境。

进入后,通过内置对象spark:

>>> spark.sql("show databases").show()
+---------+
|namespace|
+---------+
|  default|
|     yhdb|
+---------+

>>> spark.sql("select * from yhdb.student").show()
+---+------+                                                                    
|sid| sname|
+---+------+
|  1|laoyan|
|  1|廉德枫|
|  2|  刘浩|
|  3|  王鑫|
|  4|  司翔|
+---+------+

开发环境如何编写代码,操作hive:

Pycharm工具集成Hive开发SparkSQL,必须申明Metastore的地址和启用Hive的支持

spark = SparkSession \
        .builder \
        .appName("HiveAPP") \
        .master("local[2]") \
        .config("spark.sql.warehouse.dir", 'hdfs://bigdata01:9820/user/hive/warehouse') \
        .config('hive.metastore.uris', 'thrift://bigdata01:9083') \
        .config("spark.sql.shuffle.partitions", 2) \
        .enableHiveSupport()\
        .getOrCreate()

代码实战:

from pyspark.sql import SparkSession
import os


if __name__ == '__main__':
# 构建环境变量
# 配置环境
os.environ['JAVA_HOME'] = 'D:/Program Files/Java/jdk1.8.0_271'
# 配置Hadoop的路径,就是前面解压的那个路径
os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
# 配置base环境Python解析器的路径
os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
# 防止在本地操作hdfs的时候,出现权限问题
os.environ['HADOOP_USER_NAME'] = 'root'

# 获取sparkSession对象
spark = SparkSession \
.builder \
.appName("HiveAPP") \
.master("local[2]") \
.config("spark.sql.warehouse.dir", 'hdfs://bigdata01:9820/user/hive/warehouse') \
.config('hive.metastore.uris', 'thrift://bigdata01:9083') \
.config("spark.sql.shuffle.partitions", 2) \
.enableHiveSupport() \
.getOrCreate()

spark.sql("select * from yhdb.student").show()

spark.stop()

代码还可以这样写:

方式二:加载Hive表的数据变成DF,可以调用DSL或者SQL的方式来实现计算

# 读取Hive表构建DataFrame

hiveData = spark.read.table("yhdb.student")

hiveData.printSchema()

hiveData.show()

# 读取hive表中的数据
spark2 = SparkSession \
.builder \
.appName("HiveAPP") \
.master("local[2]") \
.config("spark.sql.warehouse.dir", 'hdfs://192.168.233.128:9820/user/hive/warehouse') \
.config('hive.metastore.uris', 'thrift://192.168.233.128:9083') \
.config("spark.sql.shuffle.partitions", 2) \
.enableHiveSupport() \
.getOrCreate()

#spark2.sql("show databases").show()
#spark2.sql("show  tables").show()

#spark2.sql("select * from yhdb.t_user").show()

spark2.read.table("t_user2").show()

不要在一个python 文件中,创建两个不同的sparkSession对象,否则对于sparksql获取hive的元数据,有影响。另外,记得添加一个权限校验的语句:

# 防止在本地操作hdfs的时候,出现权限问题
os.environ['HADOOP_USER_NAME'] = 'root'

为什么有些平台不支持,不兼容 sqoop flume datax 这些工具呢?

spark 可以读取日志数据

spark 可以读取数据库数据

spark 可以读取 hdfs 数据

spark 可以读取 hive 数据

------------------------------------

spark 可以读取日志数据,形成一个 A 表,读取 mysql 数据,形成一个 B 表

A 表和 B 表还可以相互关联,此时也就不需要 sqoop、flume、datax 去导入导出了。

spark 还可以将统计出来的结果直接放入 mysql 或者直接放入 hive

--------------------

我们后面学习的内容还是沿用 将日志数据,数据库数据等所有数据抽取到 hive ,然后呢,使用 spark 去统计,统计完之后还是放入 hive ,使用 datax 等工具将结果导出 mysql。

 

二、输出Sink

 

sink --> 下沉 --> 落盘 --> 保存起来

如果输出路径或者表已经存在了怎么办

  • 类型:text /csv【所有具有固定分隔符的文件】/ json/ orc/ parquet / jdbc / table【Hive表】
  • 语法:DataFrame.write.format(保存的类型).save(保存到哪)
    • 方法:save-保存到文件save(path)或者数据库表save()中,saveAsTable-用于保存到Hive表

方式一:给定输出数据源的类型和地址

df.write.format("json").save(path)
df.write.format("csv").save(path)
df.write.format("parquet").save(path)

方式二:直接调用对应数据源类型的方法

df.write.json(path)
df.write.csv(path)
df.write.parquet(path)

特殊参数:option,用于指定输出时的一些配置选项

df.write \
.format("jdbc") \
.option("url", "jdbc:postgresql:dbserver") \
.option("dbtable", "schema.tablename") \
.option("user", "username") \
.option("password", "password") \
.save()

输出模式:Save Mode

append: 追加模式,当数据存在时,继续追加
overwrite: 覆写模式,当数据存在时,覆写以前数据,存储当前最新数据;
error/errorifexists: 如果目标存在就报错,默认的模式
ignore: 忽略,数据存在时不做任何操作

代码如何编写:

df.write.mode(saveMode="append").format("csv").save(path)

 

实战一:保存普通格式

import os

from pyspark.sql import SparkSession

if __name__ == '__main__':
    # 配置环境
    os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

    spark = SparkSession.builder.master("local[2]").appName("").config(
        "spark.sql.shuffle.partitions", 2).getOrCreate()

    df = spark.read.json("../../datas/person.json")

    # 获取年龄最大的人的名字
    df.createOrReplaceTempView("persons")
    rsDf = spark.sql("""
       select name,age from persons where age = (select max(age) from persons)
    """)
    # 将结果打印到控制台
    #rsDf.write.format("console").save()
    #rsDf.write.json("../../datas/result",mode="overwrite")
    #rsDf.write.mode(saveMode='overwrite').format("json").save("../../datas/result")
    #rsDf.write.mode(saveMode='overwrite').format("csv").save("../../datas/result1")
    #rsDf.write.mode(saveMode='overwrite').format("parquet").save("../../datas/result2")
    #rsDf.write.mode(saveMode='append').format("csv").save("../../datas/result1")
    # text 保存路径为hdfs 直接报错,不支持
    #rsDf.write.mode(saveMode='overwrite').text("hdfs://bigdata01:9820/result")
    #rsDf.write.orc("hdfs://bigdata01:9820/result",mode="overwrite")
    rsDf.write.parquet("hdfs://bigdata01:9820/result", mode="overwrite")

    spark.stop()
假如:
spark.sql("select concat(name,' ',age) from person").write.text("hdfs://bigdata01:9820/spark/result")
直接报错:假如你的输出类型是text类型,直接报错
pyspark.sql.utils.AnalysisException: Text data source does not support bigint data type.
假如修改为parquet等类型,是可以直接保存的:
rsDf.write.parquet("hdfs://bigdata01:9820/result")                                                                

 

实战二:保存到数据库中

import os

from pyspark.sql import SparkSession

if __name__ == '__main__':
    # 配置环境
    os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

    spark = SparkSession.builder.master("local[2]").appName("").config(
        "spark.sql.shuffle.partitions", 2).getOrCreate()

    df = spark.read.format("csv").option("sep","\t").load("../../datas/zuoye/emp.tsv").toDF("id","name","sal","comm","deptno")

    # 获取年龄最大的人的名字
    df.createOrReplaceTempView("emps")
    rsDf = spark.sql("""
       select * from emps where comm is not null
    """)
    # 不需要事先将表创建好,它可以帮助我们创建
    rsDf.write.format("jdbc") \
     .option("driver", "com.mysql.cj.jdbc.Driver") \
     .option("url", "jdbc:mysql://localhost:3306/spark?characterEncoding=UTF-8") \
     .option("user","root") \
     .option("password", "123456") \
     .option("dbtable", "emp1") \
     .save(mode="overwrite")

    spark.stop()

 

实战三:将结果保存在hive表中

import os

from pyspark.sql import SparkSession

if __name__ == '__main__':
    # 配置环境
    os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
    os.environ['HADOOP_USER_NAME'] = 'root'


    spark = SparkSession \
        .builder \
        .appName("测试本地连接hive") \
        .master("local[2]") \
        .config("spark.sql.warehouse.dir", 'hdfs://bigdata01:9820/user/hive/warehouse') \
        .config('hive.metastore.uris', 'thrift://bigdata01:9083') \
        .config("spark.sql.shuffle.partitions", 2) \
        .enableHiveSupport() \
        .getOrCreate()

    df = spark.read.format("csv").option("sep", "\t").load("../../datas/zuoye/emp.tsv").toDF("id", "name", "sal",
                                                                                             "comm", "deptno")

    # 获取年龄最大的人的名字
    df.createOrReplaceTempView("emps")
    rsDf = spark.sql("""
           select * from emps where comm is not null
        """)

    rsDf.write.saveAsTable("yhdb03.emp")

    spark.stop()

三、总结

        SparkSQL 读写数据功能丰富强大,涵盖多种数据源与格式,理解其原理、语法和操作细节,结合不同业务场景(如数据分析、数据迁移、数据存储优化等)灵活运用,能极大提升大数据处理效率,助力在大数据领域深挖数据价值、攻克业务难题,为数据驱动决策筑牢根基。后续实践中,多尝试不同数据、场景组合,深化掌握程度。


原文地址:https://blog.csdn.net/weixin_64726356/article/details/144330272

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!