将数据上传至hdfs的两种方式:java代码上传、将数据放入kafka中,通过flume抽取
目录
1、 生成一条,使用 java 代码将数据放入hdfs上传。
2、 生成一条,编写kafka生产者,将数据放入kafka。kafka source-->flume -->hdfs sink
场景题:
使用 java 代码随机生成学生信息,学生的学号从 0001 开始,学生姓名可以使用一个集合随机选出学生的姓名,年龄的话随机生成 15~25 之间,每生成一条就休息 200ms, 将该数据存入hdfs平台。
1、 生成一条,使用 java 代码将数据放入hdfs上传。
package com.bigdata.Day03;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.util.Random;
public class randomStudent_02 {
public static void main(String[] args) throws Exception {
//创建连接
Configuration conf=new Configuration();
//连接端口
conf.set("fs.defaultFS", "hdfs://bigdata01:9820");
//获取连接对象
FileSystem fs= FileSystem.get(conf);
String[] student = {"zhangsan","lisi","wangwu","zhaoliu"};
Random random = new Random();
//生成随机学生id、姓名、年龄
String studentId = String.format("%04d",random.nextInt(10)+1);
String studentName = student[random.nextInt(student.length)];
int age = random.nextInt(11)+15;
String infor = studentId + ',' + studentName + "," + age;
//数据将要写入hdfs的地址
Path hdfs=new Path("/flume/11-12-02/zuoye.txt");
// 如果文件不存在则创建
if (!fs.exists(hdfs)) {
fs.createNewFile(hdfs);
}
// 获取输出流
FSDataOutputStream outputStream = fs.append(hdfs);
// 写入数据
outputStream.write(infor.getBytes());
// 刷新并确保数据写入磁盘
outputStream.hsync();
//关闭资源
fs.close();
}
}
2、 生成一条,编写kafka生产者,将数据放入kafka。kafka source-->flume -->hdfs sink
第一步:生成随机数据,将数据放入kafka中。
package com.bigdata.Day03;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.Random;
public class randomStudent_01 {
public static void main(String[] args) {
Properties properties = new Properties();
// 连接kafka
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");
// 字段反序列化 key 和 value
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
// 创建一个kafka生产者的对象
KafkaProducer<String,String> kafkaProducer = new KafkaProducer<>(properties);
String[] student = {"zhangsan","lisi","wangwu","zhaoliu"};
Random random = new Random();
//生成随机学生id、姓名、年龄
String studentId = String.format("%04d",random.nextInt(20)+1);
String studentName = student[random.nextInt(student.length)];
int age = random.nextInt(11)+15;
String infor = studentId + ',' + studentName + "," + age;
ProducerRecord<String,String> record = new ProducerRecord<>("homework1112",infor);
// 调用 send 方法,发送消息
kafkaProducer.send(record);
kafkaProducer.close();
}
}
第二步:创建数据存入kafka的topic
kafka-topics.sh --bootstrap-server bigdata01:9092 --create --topic homework1112
第三步:编写conf文件:
vi test.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 100
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092
a1.sources.r1.kafka.topics = homework1112
a1.sources.r1.kafka.consumer.group.id = zuoye01
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/11-12
编辑完之后记得保存,然后执行以下命令:
//先进入kafka的bin目录下(以下路径仅供参考)
cd /opt/installs/kafka3/bin
//执行conf文件,向hdfs中抽取数据
flume-ng agent -n a1 -c ../conf -f ./homework1112.conf -Dflume.root.logger=INFO,console
第四步:如果没有启动hdfs记得启动一下:
//启动命令
start-dfs.sh
//通过端口,进入hdfs界面(ip地址:9870)
bigdata:9870
抽取成功效果展示:
原文地址:https://blog.csdn.net/weixin_64860388/article/details/143722224
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!