自学内容网 自学内容网

SpringBoot项目——使用Spark对爬虫爬取下的数据进行清洗

随着互联网信息呈爆炸式增长,爬虫技术被广泛用于从海量网页中抓取有价值的数据。然而,爬取到的数据往往存在格式不规范、重复、噪声等诸多问题,需要高效的数据清洗流程来保障数据质量,Spark 在其中发挥了关键作用。

什么是Spark

Spark 是当今大数据领域最活跃、最热门、最高效的大数据通用计算平台之一

Spark 是为大规模数据处理而设计的分布式计算框架,旨在处理海量数据的存储和分析任务。它可以在集群环境中运行,将计算任务分布到多个节点上,利用集群的并行处理能力来加速数据处理过程。提供了基础的弹性分布式数据集(RDD)抽象,是 Spark 的核心部分,可进行通用的分布式数据处理操作。

Spark的优点

  1. 快:与Hadoop的MapReduce相比,Spark基于内存的运算要快100倍以上;而基于磁盘的运算也要快10倍以上。Spark实现了高效的DAG执行引擎,可以通过基于内存来高效地处理数据流。
  2. 易用:Spark支持Java、Python和Scala的API,还支持超过80种高级算法,使用户可以快速构建不同应用。而且Spark支持交互式的Python和Scala的Shell,这意味着可以非常方便的在这些Shell中使用Spark集群来验证解决问题的方法,而不是像以前一样,需要打包、上传集群、验证等。这对于原型开发非常重要。
  3. 通用性:Spark提供了统一的解决方案。Spark可以用于批处理、交互式查询(通用Spark SQL)、实时流处理(通过Spark Streaming)、机器学习(通过Spark MLlib)和图计算(通过Spark GraphX)。这些不同类型的处理都可以在同一应用中无缝使用。Spark统一的解决方案非常具有吸引力,毕竟任何公司都想用统一的平台处理问题,减少开发和维护的人力成本和部署平台的物理成本。当然还有,作为统一的解决方案,Spark并没有以牺牲性能为代价。相反,在性能方面Spark具有巨大优势。
  4. 可融合性:Spark非常方便的与其他开源产品进行融合。比如,Spark可以使用Hadoop的YARN和Apache Mesos作为它的资源管理和调度器,并且可以处理所有Hadoop支持的数据,包括HDFS、HBase和Cassanda等。这对于已部署Hadoop集群的用户特别重要,因为不需要做任何数据迁移就可以使用Spark强大的处理能力。Spark也可以不依赖第三方的资源管理器和调度器,它实现了Standalone作为其内置资源管理器和调度框架,这样进一步降低了Spark的使用门槛,使得所有人可以非常容易地部署和使用Spark。此外Spark还提供了在EC2上部署Standalone的Spark集群的工具。

Spark的使用

Spark大至使用流程

  1. 要先将数据进行存放在一个txt文本文件当中,
  2. 使用Spark进行读取txt中的文本数据,进行数据处理
  3. 将清洗后的数据转存到原来的txt文件当中
  4. 想要存放到数据库当中,则将txt文本文件中的数据再次读出出来存放进去即可

Spark的maven依赖

Spark想要在Springboot项目中使用要引入相应的maven依赖

<dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-core_2.12</artifactId>
   <version>3.4.1</version>
</dependency>
<dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-sql_2.12</artifactId>
   <version>3.4.1</version>
</dependency>

爬取出数据的存储

 本项目进行爬取的是求职网站,会将求职岗位的信息放入到一个List集合当中,通过遍历这个List集合,将数据存放到txt文本中

    /**
     * @param jobs 爬取下来的数据集合
     * @param filePath 你要存放数据的地址
     */
    public static void writeDataToTxt(List<Job> jobs, String filePath) {

        //进行检查文件是否存在 若不存在则进行创建一个路径为filePath的文件
        File file = new File(filePath);
        if (!file.exists()) {
            try {
                file.createNewFile();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        try (BufferedWriter writer = new BufferedWriter(new FileWriter(filePath))) {
            // 将数据写入文件
            for(Job job : jobs) {
                //这里因为直接进行放入到txt文本文件难以进行格式处理 我们使用手动进行拼接操作
                String j = job.getJobName()+","+job.getCompany()+","+job.getSalary()+","+job.getExperience()+","+job.getEducational()+","+job.getCity()+","+job.getCompanyScale()+","+job.getCompanyStatus();
                writer.write(j);
                // 写入换行符
                writer.newLine();
            }
        } catch (IOException e) {
            // 处理可能出现的异常
            System.err.println("写入文件时出现错误: " + e.getMessage());
        }
    }

Spark的数据清洗

本项目中的Spark进行数据清洗是通过JavaRDD<E>

JavaRDD 是 Spark 为 Java 开发者提供的弹性分布式数据集(Resilient Distributed Dataset,RDD)接口。从本质上讲,它是一个分布式的集合,其中的数据被划分成多个分区(Partitions),这些分区可以分布在集群中的不同节点上进行并行处理。这种分布式的特性使得 JavaRDD 能够高效地处理大规模的数据。

JavaRDD 是 Spark Core 的重要组成部分,是构建其他高级功能(如 Spark SQL、MLlib 和 Spark Streaming)的基础。在 Spark SQL 中,数据通常被封装成 DataFrame 或 Dataset,但这些高级数据结构的底层实现往往也依赖于 RDD 的基本概念和操作。

JavaRDD 可以存储各种类型的数据,包括 Java 基本数据类型(如intdoubleboolean等)和自定义的 Java 对象。

使用JavaRDD filter方法和map方法进行对数据的处理

  1. filter 方法是用来对流中的元素进行筛选的,它的返回值应该是一个 boolean 类型的表达式,用来判断该元素是否应该保留在流中
  2.  map 方法是一种转换操作,它对 JavaRDD 中的每个元素应用一个函数,并返回一个新的 JavaRDD
    /**
     * @param path 要进行清洗的txt文件路径
     */
    public static void DataClening(String path){
        SparkSession spark = SparkSession
                .builder()//将 SparkContext、SQLContext 和 HiveContext 等功能集成在一起
                .appName("Java Spark")//为 Spark 应用程序设置一个名称 名称会显示在 Spark 集群的监控界面上,有助于识别和管理应用程序
                .master("local[*]")//表示在本地运行 Spark
                .getOrCreate();//尝试获取现有的 SparkSession 实例,如果不存在则创建一个新的。这样可以确保在同一个 JVM 中不会创建多个 SparkSession,避免资源浪费

        JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());

        //读取数据源到RDD中
        JavaRDD<String> rdd = sc.textFile(path);

        //对RDD中的数据进行清洗处理
        JavaRDD<String> cleanedData = cleanData(rdd);

        // 打印清洗后的数据(可以根据需要保存到文件或进行其他操作) 此处不要省略 不然会报错
        cleanedData.collect().forEach(System.out::println);

        //进行检查文件是否存在
        File file = new File(path);
        if (file.exists()) {
            file.delete();
        }

        try (BufferedWriter writer = new BufferedWriter(new FileWriter(path))) {
            // 将数据写入文件
            for(String job : cleanedData.collect()) {
                writer.write(job);
                // 写入换行符
                writer.newLine();
            }
        } catch (IOException e) {
            // 处理可能出现的异常
            System.err.println("清洗数据后写入文件时出现错误: " + e.getMessage());
        }
        // 停止Spark上下文
        sc.stop();
    }


    /**
     * 对数据进行清洗操作
     * @param rawData 需要进行清洗的JavaRDD对象
     * @return 返回进行清洗的JavaRDD对象
     */
    private static JavaRDD<String> cleanData(JavaRDD<String> rawData) {
        return rawData
                //filter 方法是用来对流中的元素进行筛选的,它的返回值应该是一个 boolean 类型的表达式,用来判断该元素是否应该保留在流中
                // 移除空行
                .filter(line -> !line.trim().isEmpty())
                // 例如,移除含有特定字符(如null)的行
                //.filter(line -> !line.contains("null"))


                // map 是一种转换操作,它对 JavaRDD 中的每个元素应用一个函数,并返回一个新的 JavaRDD
                // 可能需要根据分隔符(例如逗号)拆分字段并重新格式化
                .map(line -> {
                    String[] fields = line.replace("null", "暂无数据信息").split(",");
                    //进行数据检查 确保这工作名称等8项信息完整 不完整直接进行设置为空
                    if (fields.length == 8) {
                        //将fields字符串数组中的信息进行拼接起来 这里仅做简单拼接为清洗后的格式,实际清洗可以根据需求更复杂
                        return String.join(",", fields);
                    } else {
                        return ""; //将不满足8项信息的数据进行设置为空处理
                    }

                })
                // 去除清洗后的空行或不需要的数据
                .filter(line -> !line.trim().isEmpty())
                //去重
                .distinct();
    }

清洗后数据存放

此处会将txt文件中的数据进行取出,放入到一个List集合当中,想要放入到数据库中,只需要进行遍历这个集合即可

    /**
     * 
     * @param path 将指定位置的txt文件中的信息 放入到List集合中 便于插入到数据库中
     * @return 返回文本中的·数据信息
     */
    public static List<Job> getJobbyTxtFile(String path){
        List<Job> jobs = new ArrayList<>();
        try (BufferedReader reader = new BufferedReader(new FileReader(path))) {
            String line;
            while ((line = reader.readLine())!= null) {
                jobs.add(toJob(line));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

        return jobs;
    }
    
    public static Job toJob(String job){
        String[] jobsContext = job.split(",");
        Job j = new Job();
        j.setJobName(jobsContext[0]);
        j.setCompany(jobsContext[1]);
        j.setSalary(jobsContext[2]);
        j.setExperience(jobsContext[3]);
        j.setEducational(jobsContext[4]);
        j.setCity(jobsContext[5]);
        j.setCompanyScale(jobsContext[6]);
        j.setCompanyStatus(jobsContext[7]);
        return j;
    }


原文地址:https://blog.csdn.net/xxxmine/article/details/145042885

免责声明:本站文章内容转载自网络资源,如侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!