自学内容网 自学内容网

将数据上传至hdfs的两种方式:java代码上传、将数据放入kafka中,通过flume抽取

 

目录

1、 生成一条,使用 java 代码将数据放入hdfs上传。

 2、 生成一条,编写kafka生产者,将数据放入kafka。kafka source-->flume -->hdfs sink  


场景题:

使用 java 代码随机生成学生信息,学生的学号从 0001 开始,学生姓名可以使用一个集合随机选出学生的姓名,年龄的话随机生成 15~25 之间,每生成一条就休息 200ms, 将该数据存入hdfs平台。

1、 生成一条,使用 java 代码将数据放入hdfs上传。 

package com.bigdata.Day03;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.util.Random;

public class randomStudent_02 {
    public static void main(String[] args) throws Exception {
        //创建连接
        Configuration conf=new Configuration();
        //连接端口
        conf.set("fs.defaultFS", "hdfs://bigdata01:9820");
        //获取连接对象
        FileSystem fs= FileSystem.get(conf);

        String[] student = {"zhangsan","lisi","wangwu","zhaoliu"};
        Random random = new Random();
        //生成随机学生id、姓名、年龄
        String studentId = String.format("%04d",random.nextInt(10)+1);
        String studentName = student[random.nextInt(student.length)];
        int age = random.nextInt(11)+15;
        String infor = studentId + ',' + studentName + "," + age;
        //数据将要写入hdfs的地址
        Path hdfs=new Path("/flume/11-12-02/zuoye.txt");
        // 如果文件不存在则创建
        if (!fs.exists(hdfs)) {
            fs.createNewFile(hdfs);
        }
        // 获取输出流
        FSDataOutputStream outputStream = fs.append(hdfs);
        // 写入数据
        outputStream.write(infor.getBytes());
        // 刷新并确保数据写入磁盘
        outputStream.hsync();
        //关闭资源
        fs.close();
    }
}

2、 生成一条,编写kafka生产者,将数据放入kafka。kafka source-->flume -->hdfs sink  

第一步:生成随机数据,将数据放入kafka中。 

package com.bigdata.Day03;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.Random;


public class randomStudent_01 {
    public static void main(String[] args) {
        Properties properties = new Properties();
        // 连接kafka
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");
        // 字段反序列化   key 和  value
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        // 创建一个kafka生产者的对象
        KafkaProducer<String,String> kafkaProducer = new KafkaProducer<>(properties);
        String[] student = {"zhangsan","lisi","wangwu","zhaoliu"};

        Random random = new Random();
        //生成随机学生id、姓名、年龄
        String studentId = String.format("%04d",random.nextInt(20)+1);
        String studentName = student[random.nextInt(student.length)];
        int age = random.nextInt(11)+15;

        String infor = studentId + ',' + studentName + "," + age;

        ProducerRecord<String,String> record = new ProducerRecord<>("homework1112",infor);
        //  调用 send 方法,发送消息
        kafkaProducer.send(record);

        kafkaProducer.close();

    }
}

第二步:创建数据存入kafka的topic

 kafka-topics.sh --bootstrap-server bigdata01:9092 --create --topic homework1112

第三步:编写conf文件:

vi test.conf 
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 100
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092
a1.sources.r1.kafka.topics = homework1112
a1.sources.r1.kafka.consumer.group.id = zuoye01

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/11-12

编辑完之后记得保存,然后执行以下命令: 

//先进入kafka的bin目录下(以下路径仅供参考)
cd /opt/installs/kafka3/bin
//执行conf文件,向hdfs中抽取数据
flume-ng agent -n a1 -c ../conf -f ./homework1112.conf -Dflume.root.logger=INFO,console

 第四步:如果没有启动hdfs记得启动一下:

//启动命令
start-dfs.sh
//通过端口,进入hdfs界面(ip地址:9870)
bigdata:9870

 抽取成功效果展示:

 

 

 


原文地址:https://blog.csdn.net/weixin_64860388/article/details/143722224

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!