学习大数据DAY59 全量抽取和增量抽取实战
目录
需求流程:
需求分析与规范
作业
#!/bin/python3
import pymysql
import sys
# 自动写 datax 的 json 文件
if len(sys.argv)!=3:
print("使用方法为:python3 full.py 数据库名 表名")
sys.exit()
sys_name=sys.argv[1]
table_name=sys.argv[2]
# datax_json=f"{sys_name}.{table_name}_full.json"
db=pymysql.connect(
host='zhiyun.pub',
port=23306,
user='zhiyun',
password='zhiyun',
database='information_schema'
)
cursor=db.cursor()
cursor.execute(f"select column_name,data_type from
information_schema.columns where table_schema='{sys_name}' and
table_name='{table_name}'")
data=cursor.fetchall()
fileds=[]for field in data:
field_name = field[0]
field_type = field[1]
#转换成 hive 类型
field_hive_type="string"
if field_type=="int" or field_type=="tinyint" or
field_type=="bigint":
field_hive_type="int"
if field_type=="float" or field_type=="double":
field_hive_type="float"
fileds.append([field_name,field_hive_type])
db.close()
print("=============== 配置 datax ===============")
file_path=f"/zhiyun/shihaihong/jobs/{sys_name}_{table_name}_fu
ll.json"
template_path="/zhiyun/shihaihong/jobs/template.json"
with open(template_path,"r",encoding="utf-8") as f:
template_content=f.read()
new_content=template_content.replace("#sys_name#",sys_name)
new_content=new_content.replace("#table_name#",table_name)
#列的替换
lines=[]
for filed in fileds:
line='
{"name":"'+filed[0]+'
","type":"'+filed[1]+'"},'
lines.append(line)
columns="\n".join(lines)
columns=columns.strip(",")
new_content=new_content.replace("\"#columns#\"",columns)
#写入到新的配置
with open(file_path,"w",encoding="utf-8") as ff:
ff.write(new_content)
ff.close()
f.close()
print("datax 文件配置成功")
print("=============== 配置 hive ===============")file_path=f"/zhiyun/shihaihong/sql/{sys_name}_{table_name}_ful
l.sql"
template_path="/zhiyun/shihaihong/sql/template.sql"
with open(template_path,"r",encoding="utf-8") as f:
template_content=f.read()
new_content=template_content.replace("#sys_name#",sys_name)
new_content=new_content.replace("#table_name#",table_name)
#列的替换
lines=[]
for filed in fileds:
line=f"
{filed[0]} {filed[1]},"
lines.append(line)
columns="\n".join(lines)
columns=columns.strip(",")
new_content=new_content.replace("#columns#",columns)
#写入到新的配置
with open(file_path,"w",encoding="utf-8") as ff:
ff.write(new_content)
ff.close()
print("hive 建表文件生成成功")
json 模板:
template.json:
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl": ["jdbc:mysql://zhiyun.pub:233
06/crm?useSSL=false"
],
"table": [
"#table_name#"
]
}
],
"password": "zhiyun",
"username": "zhiyun"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
"#column#"
],
"defaultFS": "hdfs://cdh02:8020",
"fieldDelimiter": "\t",
"fileName":
"#sys_name#_#table_name#_full.data",
"fileType": "orc",
"path":
"/zhiyun/shihaihong/ods/#sys_name#_#table_name#_full",
"writeMode": "truncate"
}
}
}
],
"setting": {
"speed": {
"channel": "3"
}
}
}
}
create database if not exists ods_shihaihong location
"/zhiyun/shihaihong/ods";-- 增量表
create external table if not exists
ods_shihaihong.#table_name#_full(
#columns#
)
row format delimited fields terminated by "\t"
lines terminated by "\n"
stored as orc
location "/zhiyun/shihaihong/ods/#sys_name#_#table_name#_full";
#!/bin/bash
echo "生成全量配置文件"mkdir -p /zhiyun/shihaihong/jobs
echo '{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://zhiyun.pub:233
06/crm?useSSL=false"
],
"table": [
"user_base_info_his"
]
}
],
"password": "zhiyun",
"username": "zhiyun"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{"name":
"id","type":"int"},
{"name":"user_id","type":"string"},
{"name":"user_type","type":"string"}
,
{"name":"source","type":"string"},
{"name":"erp_code","type":"string"}
,
{"name":"active_time","type":"strin
g"},
{"name":"name","type":"string"},
{"name":"sex","type":"string"},
{"name":"education","type":"string"}
,{"name":"job","type":"string"},
{"name":"email","type":"string"},
{"name":"wechat","type":"string"},
{"name":"webo","type":"string"},
{"name":"birthday","type":"string"}
,
{"name":"age","type":"int"},
{"name":"id_card_no","type":"string
"},
{"name":"social_insurance_no","type
":"string"},
{"name":"address","type":"string"},
{"name":"last_subscribe_time","type
":"int"}
],
"defaultFS": "hdfs://cdh02:8020",
"fieldDelimiter": "\t",
"fileName":
"crm_user_base_info_his_full.data",
"fileType": "orc",
"path":
"/zhiyun/shihaihong/ods/crm_user_base_info_his_full",
"writeMode": "truncate"
}
}
}
],
"setting": {
"speed": {
"channel": "3"
}
}
}
}' > /zhiyun/shihaihong/jobs/crm_user_base_info_his_full.json
echo "开始抽取"
hadoop fs -mkdir -p
/zhiyun/shihaihong/ods/crm_user_base_info_his_full
python /opt/datax/bin/datax.py
/zhiyun/shihaihong/jobs/crm_user_base_info_his_full.json
echo "hive 建表"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e 'create database if not exists ods_shihaihong location
"/zhiyun/shihaihong/ods";
-- 增量表
create external table if not exists
ods_shihaihong.user_base_info_his_full(
id int,
user_id string,
user_type string,
source string,
erp_code string,
active_time string,
name string,
sex string,
education string,
job string,
email string,
wechat string,
webo string,
birthday string,
age int,
id_card_no string,
social_insurance_no string,
address string,
last_subscribe_time int
)
row format delimited fields terminated by "\t"
lines terminated by "\n"
stored as orc
location "/zhiyun/shihaihong/ods/crm_user_base_info_his_full";
'
# echo "加载数据"
# beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
# load data inpath
\"/zhiyun/shihaihong/tmp/crm_user_base_info_his_full/*\"
overwrite into table ods_shihaihong.crm_user_base_info_his_full
partition(createtime='$day');
# "
echo "验证数据"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
select count(1) from ods_shihaihong.user_base_info_his_full;
"echo "抽取完成"
#!/bin/bash
echo "生成全量配置文件"
mkdir -p /zhiyun/shihaihong/jobs
echo '{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://zhiyun.pub:233
06/erp?useSSL=false"
],
"table": [
"u_memcard_reg"
]
}
],
"password": "zhiyun",
"username": "zhiyun"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{"name":
"id","type":"int"},
{"name":"memcardno","type":"string"}
,
{"name":"busno","type":"string"},
{"name":"introducer","type":"string
"},
{"name":"cardtype","type":"int"},
{"name":"cardlevel","type":"int"},{"name":"cardpass","type":"string"}
,
{"name":"cardstatus","type":"int"},
{"name":"saleamount","type":"string
"},
{"name":"realamount","type":"string
"},
{"name":"puramount","type":"string"}
,
{"name":"integral","type":"string"}
,
{"name":"integrala","type":"string"}
,
{"name":"integralflag","type":"int"}
,
{"name":"cardholder","type":"string
"},
{"name":"cardaddress","type":"strin
g"},
{"name":"sex","type":"string"},
{"name":"tel","type":"string"},
{"name":"handset","type":"string"},
{"name":"fax","type":"string"},
{"name":"createuser","type":"string
"},
{"name":"createtime","type":"string
"},
{"name":"tstatus","type":"int"},
{"name":"notes","type":"string"},
{"name":"stamp","type":"string"},
{"name":"idcard","type":"string"},
{"name":"birthday","type":"string"}
,
{"name":"allowintegral","type":"int
"},
{"name":"apptype","type":"string"},
{"name":"applytime","type":"string"}
,
{"name":"invalidate","type":"string
"},
{"name":"lastdate","type":"string"}
,
{"name":"bak1","type":"string"},{"name":"scrm_userid","type":"strin
g"}
],
"defaultFS": "hdfs://cdh02:8020",
"fieldDelimiter": "\t",
"fileName":
"erp_u_memcard_reg_full.data",
"fileType": "orc",
"path":
"/zhiyun/shihaihong/ods/erp_u_memcard_reg_full",
"writeMode": "truncate"
}
}
}
],
"setting": {
"speed": {
"channel": "3"
}
}
}
}' > /zhiyun/shihaihong/jobs/erp_u_memcard_reg_full.json
echo "开始抽取"
hadoop fs -mkdir -p /zhiyun/shihaihong/ods/erp_u_memcard_reg_full
python /opt/datax/bin/datax.py
/zhiyun/shihaihong/jobs/erp_u_memcard_reg_full.json
echo "hive 建表"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
create database if not exists ods_shihaihong location
"/zhiyun/shihaihong/ods";
-- 增量表
create external table if not exists
ods_shihaihong.u_memcard_reg_full(
id int,
memcardno string,
busno string,
introducer string,
cardtype int,
cardlevel int,
cardpass string,
cardstatus int,saleamount string,
realamount string,
puramount string,
integral string,
integrala string,
integralflag int,
cardholder string,
cardaddress string,
sex string,
tel string,
handset string,
fax string,
createuser string,
createtime string,
tstatus int,
notes string,
stamp string,
idcard string,
birthday string,
allowintegral int,
apptype string,
applytime string,
invalidate string,
lastdate string,
bak1 string,
scrm_userid string
)
row format delimited fields terminated by "\t"
lines terminated by "\n"
stored as orc
location "/zhiyun/shihaihong/ods/erp_u_memcard_reg_full";
'
# echo "加载数据"
# beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
# load data inpath
\"/zhiyun/shihaihong/tmp/erp_u_memcard_reg_full/*\" overwrite
into table ods_shihaihong.erp_u_memcard_reg_full
partition(createtime='$day');
# "
echo "验证数据"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "select count(1) from ods_shihaihong.u_memcard_reg_full;
"
echo "抽取完成"
#!/bin/bash
echo "生成全量配置文件"
mkdir -p /zhiyun/shihaihong/jobs
echo '{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://zhiyun.pub:233
06/erp?useSSL=false"
],
"table": [
"c_memcard_class_group"
]
}
],
"password": "zhiyun",
"username": "zhiyun"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{"name":
"createtime","type":"string"},{"name":"createuser","type":"string
"},
{"name":"groupid","type":"int"},
{"name":"groupname","type":"string"}
,
{"name":"notes","type":"string"},
{"name":"stamp","type":"int"}
],
"defaultFS": "hdfs://cdh02:8020",
"fieldDelimiter": "\t",
"fileName":
"erp_c_memcard_class_group_full.data",
"fileType": "orc",
"path":
"/zhiyun/shihaihong/ods/erp_c_memcard_class_group_full",
"writeMode": "truncate"
}
}
}
],
"setting": {
"speed": {
"channel": "3"
}
}
}
}' > /zhiyun/shihaihong/jobs/erp_c_memcard_class_group_full.json
echo "开始抽取"
hadoop fs -mkdir -p
/zhiyun/shihaihong/ods/erp_c_memcard_class_group_full
python /opt/datax/bin/datax.py
/zhiyun/shihaihong/jobs/erp_c_memcard_class_group_full.json
echo "hive 建表"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
create database if not exists ods_shihaihong location
"/zhiyun/shihaihong/ods";
-- 增量表
create external table if not exists
ods_shihaihong.c_memcard_class_group_full(
createtime string,
createuser string,groupid int,
groupname string,
notes string,
stamp int
)
row format delimited fields terminated by "\t"
lines terminated by "\n"
stored as orc
location
"/zhiyun/shihaihong/ods/erp_c_memcard_class_group_full";
'
# echo "加载数据"
# beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
# load data inpath
\"/zhiyun/shihaihong/tmp/erp_c_memcard_class_group_full/*\"
overwrite into table
ods_shihaihong.erp_c_memcard_class_group_full
partition(createtime='$day');
# "
echo "验证数据"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
select count(1) from ods_shihaihong.c_memcard_class_group_full;
"
echo "抽取完成"
erp.u_memcard_reg_c:
#!/bin/bash
echo "生成全量配置文件"
mkdir -p /zhiyun/shihaihong/jobs
echo '{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader","parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://zhiyun.pub:233
06/erp?useSSL=false"
],
"table": [
"u_memcard_reg_c"
]
}
],
"password": "zhiyun",
"username": "zhiyun"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{"name":
"id","type":"int"},
{"name":"memcardno","type":"string"}
,
{"name":"sickness","type":"string"}
,
{"name":"status","type":"string"}
],
"defaultFS": "hdfs://cdh02:8020",
"fieldDelimiter": "\t",
"fileName":
"erp_u_memcard_reg_c_full.data",
"fileType": "orc",
"path":
"/zhiyun/shihaihong/ods/erp_u_memcard_reg_c_full",
"writeMode": "truncate"
}
}
}
],
"setting": {
"speed": {"channel": "3"
}
}
}
}' > /zhiyun/shihaihong/jobs/erp_u_memcard_reg_c_full.json
echo "开始抽取"
hadoop fs -mkdir -p
/zhiyun/shihaihong/ods/erp_u_memcard_reg_c_full
python /opt/datax/bin/datax.py
/zhiyun/shihaihong/jobs/erp_u_memcard_reg_c_full.json
echo "hive 建表"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
create database if not exists ods_shihaihong location
"/zhiyun/shihaihong/ods";
-- 增量表
create external table if not exists
ods_shihaihong.u_memcard_reg_c_full(
id int,
memcardno string,
sickness string,
status string
)
row format delimited fields terminated by "\t"
lines terminated by "\n"
stored as orc
location "/zhiyun/shihaihong/ods/erp_u_memcard_reg_c_full";
'
# echo "加载数据"
# beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
# load data inpath
\"/zhiyun/shihaihong/tmp/erp_u_memcard_reg_c_full/*\" overwrite
into table ods_shihaihong.erp_u_memcard_reg_c_full
partition(createtime='$day');
# "
echo "验证数据"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
select count(1) from ods_shihaihong.u_memcard_reg_c_full;
"echo "抽取完成"
#!/bin/bash
echo "生成全量配置文件"
mkdir -p /zhiyun/shihaihong/jobs
echo '{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://zhiyun.pub:233
06/his?useSSL=false"
],
"table": [
"chronic_patient_info_new"
]
}
],
"password": "zhiyun",
"username": "zhiyun"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{"name":
"id","type":"int"},{"name":"member_id","type":"string"}
,
{"name":"erp_code","type":"string"}
,
{"name":"extend","type":"string"},
{"name":"detect_time","type":"strin
g"},
{"name":"bec_chr_mbr_date","type":"
string"}
],
"defaultFS": "hdfs://cdh02:8020",
"fieldDelimiter": "\t",
"fileName":
"his_chronic_patient_info_new_full.data",
"fileType": "orc",
"path":
"/zhiyun/shihaihong/ods/his_chronic_patient_info_new_full",
"writeMode": "truncate"
}
}
}
],
"setting": {
"speed": {
"channel": "3"
}
}
}
}' >
/zhiyun/shihaihong/jobs/his_chronic_patient_info_new_full.json
echo "开始抽取"
hadoop fs -mkdir -p
/zhiyun/shihaihong/ods/his_chronic_patient_info_new_full
python /opt/datax/bin/datax.py
/zhiyun/shihaihong/jobs/his_chronic_patient_info_new_full.json
echo "hive 建表"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
create database if not exists ods_shihaihong location
"/zhiyun/shihaihong/ods";
-- 增量表create external table if not exists
ods_shihaihong.chronic_patient_info_new_full(
id int,
member_id string,
erp_code string,
extend string,
detect_time string,
bec_chr_mbr_date string
)
row format delimited fields terminated by "\t"
lines terminated by "\n"
stored as orc
location
"/zhiyun/shihaihong/ods/his_chronic_patient_info_new_full";
'
# echo "加载数据"
# beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
# load data inpath
\"/zhiyun/shihaihong/tmp/his_chronic_patient_info_new_full/*\"
overwrite into table
ods_shihaihong.his_chronic_patient_info_new_full
partition(createtime='$day');
# "
echo "验证数据"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
select count(1) from
ods_shihaihong.chronic_patient_info_new_full;
"
echo "抽取完成"
#!/bin/bash
echo "生成全量配置文件"
mkdir -p /zhiyun/shihaihong/jobsecho '{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://zhiyun.pub:233
06/erp?useSSL=false"
],
"table": [
"c_org_busi"
]
}
],
"password": "zhiyun",
"username": "zhiyun"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{"name":
"id","type":"int"},
{"name":"busno","type":"string"},
{"name":"orgname","type":"string"},
{"name":"orgsubno","type":"string"}
,
{"name":"orgtype","type":"string"},
{"name":"salegroup","type":"string"}
,
{"name":"org_tran_code","type":"str
ing"},
{"name":"accno","type":"string"},
{"name":"sendtype","type":"string"}
,
{"name":"sendday","type":"string"},{"name":"maxday","type":"string"},
{"name":"minday","type":"string"},
{"name":"notes","type":"string"},
{"name":"stamp","type":"string"},
{"name":"status","type":"string"},
{"name":"customid","type":"string"}
,
{"name":"whl_vendorno","type":"stri
ng"},
{"name":"whlgroup","type":"string"}
,
{"name":"rate","type":"string"},
{"name":"creditamt","type":"string"}
,
{"name":"creditday","type":"string"}
,
{"name":"peoples","type":"string"},
{"name":"area","type":"string"},
{"name":"abc","type":"string"},
{"name":"address","type":"string"},
{"name":"tel","type":"string"},
{"name":"principal","type":"string"}
,
{"name":"identity_card","type":"str
ing"},
{"name":"mobil","type":"string"},
{"name":"corporation","type":"strin
g"},
{"name":"saler","type":"string"},
{"name":"createtime","type":"string
"},
{"name":"bank","type":"string"},
{"name":"bankno","type":"string"},
{"name":"bak1","type":"string"},
{"name":"bak2","type":"string"},
{"name":"a_bak1","type":"string"},
{"name":"aa_bak1","type":"string"},
{"name":"b_bak1","type":"string"},
{"name":"bb_bak1","type":"string"},
{"name":"y_bak1","type":"string"},
{"name":"t_bak1","type":"string"},
{"name":"ym_bak1","type":"string"},
{"name":"tm_bak1","type":"string"},{"name":"supervise_code","type":"st
ring"},
{"name":"monthrent","type":"string"}
,
{"name":"wms_warehid","type":"strin
g"},
{"name":"settlement_cycle","type":"
string"},
{"name":"apply_cycle","type":"strin
g"},
{"name":"applydate","type":"string"}
,
{"name":"accounttype","type":"strin
g"},
{"name":"applydate_last","type":"st
ring"},
{"name":"paymode","type":"string"},
{"name":"yaolian_flag","type":"stri
ng"},
{"name":"org_longitude","type":"str
ing"},
{"name":"org_latitude","type":"stri
ng"},
{"name":"org_province","type":"stri
ng"},
{"name":"org_city","type":"string"}
,
{"name":"org_area","type":"string"}
,
{"name":"business_time","type":"str
ing"},
{"name":"yaolian_group","type":"str
ing"},
{"name":"pacard_storeid","type":"st
ring"},
{"name":"opening_time","type":"stri
ng"},
{"name":"ret_ent_id","type":"string
"},
{"name":"ent_id","type":"string"}
],
"defaultFS": "hdfs://cdh02:8020",
"fieldDelimiter": "\t","fileName": "erp_c_org_busi_full.data",
"fileType": "orc",
"path":
"/zhiyun/shihaihong/ods/erp_c_org_busi_full",
"writeMode": "truncate"
}
}
}
],
"setting": {
"speed": {
"channel": "3"
}
}
}
}' > /zhiyun/shihaihong/jobs/erp_c_org_busi_full.json
echo "开始抽取"
hadoop fs -mkdir -p /zhiyun/shihaihong/ods/erp_c_org_busi_full
python /opt/datax/bin/datax.py
/zhiyun/shihaihong/jobs/erp_c_org_busi_full.json
echo "hive 建表"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
create database if not exists ods_shihaihong location
"/zhiyun/shihaihong/ods";
-- 增量表
create external table if not exists
ods_shihaihong.c_org_busi_full(
id int,
busno string,
orgname string,
orgsubno string,
orgtype string,
salegroup string,
org_tran_code string,
accno string,
sendtype string,
sendday string,
maxday string,
minday string,
notes string,
stamp string,status string,
customid string,
whl_vendorno string,
whlgroup string,
rate string,
creditamt string,
creditday string,
peoples string,
area string,
abc string,
address string,
tel string,
principal string,
identity_card string,
mobil string,
corporation string,
saler string,
createtime string,
bank string,
bankno string,
bak1 string,
bak2 string,
a_bak1 string,
aa_bak1 string,
b_bak1 string,
bb_bak1 string,
y_bak1 string,
t_bak1 string,
ym_bak1 string,
tm_bak1 string,
supervise_code string,
monthrent string,
wms_warehid string,
settlement_cycle string,
apply_cycle string,
applydate string,
accounttype string,
applydate_last string,
paymode string,
yaolian_flag string,
org_longitude string,
org_latitude string,
org_province string,org_city string,
org_area string,
business_time string,
yaolian_group string,
pacard_storeid string,
opening_time string,
ret_ent_id string,
ent_id string
)
row format delimited fields terminated by "\t"
lines terminated by "\n"
stored as orc
location "/zhiyun/shihaihong/ods/erp_c_org_busi_full";
'
# echo "加载数据"
# beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
# load data inpath
\"/zhiyun/shihaihong/tmp/erp_c_org_busi_full/*\" overwrite into
table ods_shihaihong.erp_c_org_busi_full
partition(createtime='$day');
# "
echo "验证数据"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
select count(1) from ods_shihaihong.c_org_busi_full;
"
echo "抽取完成"
#!/bin/bash
echo "生成全量配置文件"
mkdir -p /zhiyun/shihaihong/jobs
echo '{
"job": {"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://zhiyun.pub:233
06/erp?useSSL=false"
],
"table": [
"c_code_value"
]
}
],
"password": "zhiyun",
"username": "zhiyun"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{"name":
"id","type":"int"},
{"name":"cat_name","type":"string"}
,
{"name":"cat_code","type":"string"}
,
{"name":"val_name","type":"string"}
,
{"name":"var_desc","type":"string"}
],
"defaultFS": "hdfs://cdh02:8020",
"fieldDelimiter": "\t",
"fileName":
"erp_c_code_value_full.data",
"fileType": "orc",
"path":
"/zhiyun/shihaihong/ods/erp_c_code_value_full",
"writeMode": "truncate"}
}
}
],
"setting": {
"speed": {
"channel": "3"
}
}
}
}' > /zhiyun/shihaihong/jobs/erp_c_code_value_full.json
echo "开始抽取"
hadoop fs -mkdir -p /zhiyun/shihaihong/ods/erp_c_code_value_full
python /opt/datax/bin/datax.py
/zhiyun/shihaihong/jobs/erp_c_code_value_full.json
echo "hive 建表"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
create database if not exists ods_shihaihong location
"/zhiyun/shihaihong/ods";
-- 增量表
create external table if not exists
ods_shihaihong.c_code_value_full(
id int,
cat_name string,
cat_code string,
val_name string,
var_desc string
)
row format delimited fields terminated by "\t"
lines terminated by "\n"
stored as orc
location "/zhiyun/shihaihong/ods/erp_c_code_value_full";
'
# echo "加载数据"
# beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
# load data inpath
\"/zhiyun/shihaihong/tmp/erp_c_code_value_full/*\" overwrite
into table ods_shihaihong.erp_c_code_value_full
partition(createtime='$day');
# "echo "验证数据"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
select count(1) from ods_shihaihong.c_code_value_full;
"
echo "抽取完成"
#!/bin/python3
import os
import pandas as pd
from openpyxl import load_workbook
ss=''
lst=[]#使用 pandas 的 read_excel 函数读取指定路径的 Excel 文件。
sheet_name=None 表示读取文件中的所有工作表,而 header=2 表示数据的表
头位于第 3 行(索引从 0 开始)
dfs = pd.read_excel('/zhiyun/shihaihong/data/12.码值
表.xlsx',sheet_name=None,header=2)
dir=list(dfs.keys())
#获取 xlsx 文件数据
for i in range(len(dir)):
if i>1:
#获取 A2 行数据
wb = load_workbook(filename='/zhiyun/shihaihong/data/12.
码值表.xlsx')
str_head = wb[dir[i]]['A2'].value
data=dfs[dir[i]]
#获取其它行数据
lst1=[]
for i in data.columns:
for j in range(len(data)):
if data[i][j] != 'NaN':
lst1.append(str(data[i][j]))
n=int(len(lst1)/2)
for i in range(n):
ss=f"{str_head.split('-')[0]}|{str_head.split('-')[
1]}|{lst1[i]}|{lst1[i+n]}"
lst.append(ss)
print("写入数据到 data")
template_path = "/zhiyun/shihaihong/data/code_value.txt"
with open(template_path,"w",encoding="utf-8") as f:
content="\n".join(lst)
f.write(content)
f.close
print("上传 data 文件 到 hdfs")
os.system(f"hdfs dfs -mkdir -p /zhiyun/shihaihong/filetxt/")
os.system(f"hdfs dfs -put {template_path}
/zhiyun/shihaihong/filetxt/")
#!/bin/bash
# 作用: 完成从编写配置文件到验证数据的整个过程
# 需要在任何节点都可以执行
# 创建本人文件夹
mkdir -p /zhiyun/shihaihong/data /zhiyun/shihaihong/jobs
/zhiyun/shihaihong/python /zhiyun/shihaihong/shell
/zhiyun/shihaihong/sql
echo "hive 建表"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e'
create database if not exists ods_shihaihong location
"/zhiyun/shihaihong/ods";
create external table if not exists
ods_shihaihong.c_code_value_full(
cat_name string,
cat_code string,
val_name string,
var_desc string
)
row format delimited fields terminated by "|"
lines terminated by "\n"
stored as textfile
location "/zhiyun/shihaihong/filetxt";'
echo "hive 建表完成"
echo "验证数据"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
select count(1) from ods_shihaihong.c_code_value_full;
"
echo "验证完成"
执行后,用 shell 脚本抽取
作业2
#!/bin/bash
echo "生成全量配置文件"
mkdir -p /zhiyun/shihaihong/jobs
echo '{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://zhiyun.pub:233
06/erp?useSSL=false"
],
"table": [
"u_sale_m"]
}
],
"password": "zhiyun",
"username": "zhiyun"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{"name":"id","type":"int"},
{"name":"saleno","type":"string"},
{"name":"busno","type":"string"},
{"name":"posno","type":"string"},
{"name":"extno","type":"string"},
{"name":"extsource","type":"string"}
,
{"name":"o2o_trade_from","type":"st
ring"},
{"name":"channel","type":"int"},
{"name":"starttime","type":"string"}
,
{"name":"finaltime","type":"string"}
,
{"name":"payee","type":"string"},
{"name":"discounter","type":"string
"},
{"name":"crediter","type":"string"}
,
{"name":"returner","type":"string"}
,
{"name":"warranter1","type":"string
"},
{"name":"warranter2","type":"string
"},
{"name":"stdsum","type":"string"},
{"name":"netsum","type":"string"},
{"name":"loss","type":"string"},
{"name":"discount","type":"float"},
{"name":"member","type":"string"},
{"name":"precash","type":"string"},
{"name":"stamp","type":"string"},{"name":"shiftid","type":"string"},
{"name":"shiftdate","type":"string"}
,
{"name":"yb_saleno","type":"string"
}
],
"defaultFS": "hdfs://cdh02:8020",
"fieldDelimiter": "\t",
"fileName": "erp_u_sale_m_full.data",
"fileType": "orc",
"path":
"/zhiyun/shihaihong/ods/erp_u_sale_m_full",
"writeMode": "truncate"
}
}
}
],
"setting": {
"speed": {
"channel": "3"
}
}
}
}' > /zhiyun/shihaihong/jobs/erp_u_sale_m_full.json
echo "开始抽取"
hadoop fs -mkdir -p /zhiyun/shihaihong/ods/erp_u_sale_m_full
python /opt/datax/bin/datax.py
/zhiyun/shihaihong/jobs/erp_u_sale_m_full.json
echo "hive 建表"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
create database if not exists ods_shihaihong location
"/zhiyun/shihaihong/ods";
-- 增量表
create external table if not exists ods_shihaihong.u_sale_m_full(
id int,
saleno string,
busno string,
posno string,
extno string,
extsource string,
o2o_trade_from string,channel int,
starttime string,
finaltime string,
payee string,
discounter string,
crediter string,
returner string,
warranter1 string,
warranter2 string,
stdsum string,
netsum string,
loss string,
discount float,
member string,
precash string,
stamp string,
shiftid string,
shiftdate string,
yb_saleno string
)
row format delimited fields terminated by "\t"
lines terminated by "\n"
stored as orc
location "/zhiyun/shihaihong/ods/erp_u_sale_m_full";
'
# echo "加载数据"
# beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
# load data inpath \"/zhiyun/shihaihong/tmp/erp_u_sale_m_full/*\"
overwrite into table ods_shihaihong.erp_u_sale_m_full
partition(createtime='$day');
# "
echo "验证数据"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
select count(1) from ods_shihaihong.u_sale_m_full;
"
echo "抽取完成"
写入任务调度平台:
#!/bin/bash
day=$(date -d "yesterday" +%Y-%m-%d)
if [ $1 != "" ]; thenday=$(date -d "$1 -1 day" +%Y-%m-%d);
fi;
echo "抽取的日期为 $day"
echo "生成增量配置文件"
mkdir -p /zhiyun/shihaihong/jobs
echo '
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://zhiyun.pub:233
06/erp?useSSL=false"
],
"querySql": [
"select * from u_sale_m where
stamp between '\'"$day" 00:00:00\'' and '\'"$day" 23:59:59\'' and
id>0"
]
}
],
"password": "zhiyun",
"username": "zhiyun"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{"name":"id","type":"int"},
{"name":"saleno","type":"string"},
{"name":"busno","type":"string"},
{"name":"posno","type":"string"},
{"name":"extno","type":"string"},{"name":"extsource","type":"string"}
,
{"name":"o2o_trade_from","type":"str
ing"},
{"name":"channel","type":"int"},
{"name":"starttime","type":"string"}
,
{"name":"finaltime","type":"string"}
,
{"name":"payee","type":"string"},
{"name":"discounter","type":"string"}
,
{"name":"crediter","type":"string"},
{"name":"returner","type":"string"},
{"name":"warranter1","type":"string"}
,
{"name":"warranter2","type":"string"}
,
{"name":"stdsum","type":"string"},
{"name":"netsum","type":"string"},
{"name":"loss","type":"string"},
{"name":"discount","type":"float"},
{"name":"member","type":"string"},
{"name":"precash","type":"string"},
{"name":"stamp","type":"string"},
{"name":"shiftid","type":"string"},
{"name":"shiftdate","type":"string"}
,
{"name":"yb_saleno","type":"string"}
],
"defaultFS": "hdfs://cdh02:8020",
"fieldDelimiter": "\t",
"fileName": "erp_u_sale_m_inc.data",
"fileType": "orc",
"path":
"/zhiyun/shihaihong/tmp/erp_u_sale_m_inc",
"writeMode": "truncate"
}
}
}
],
"setting": {
"speed": {"channel": 2
}
}
}
}' > /zhiyun/shihaihong/jobs/erp_u_sale_m_inc.json
echo "开始抽取"
hadoop fs -mkdir -p /zhiyun/shihaihong/tmp/erp_u_sale_m_inc
python /opt/datax/bin/datax.py
/zhiyun/shihaihong/jobs/erp_u_sale_m_inc.json
echo "hive 建表"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
create database if not exists ods_shihaihong location
"/zhiyun/shihaihong/ods";
create external table if not exists
ods_shihaihong.erp_u_sale_m_inc(
id int,
saleno string,
busno string,
posno string,
extno string,
extsource string,
o2o_trade_from string,
channel int,
starttime string,
finaltime string,
payee string,
discounter string,
crediter string,
returner string,
warranter1 string,
warranter2 string,
stdsum string,
netsum string,
loss string,
discount float,
member string,
precash string,
shiftid string,
shiftdate string,
yb_saleno string) partitioned by (stamp string)
row format delimited fields terminated by "\t"
lines terminated by "\n"
stored as orc
location "/zhiyun/shihaihong/ods/erp_u_sale_m_inc";
'
echo "加载数据"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
load data inpath '/zhiyun/shihaihong/tmp/erp_u_sale_m_inc/*'
overwrite into table ods_shihaihong.erp_u_sale_m_inc
partition(stamp='"$day"');
"
echo "验证数据"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
show partitions ods_shihaihong.erp_u_sale_m_inc;
select count(1) from ods_shihaihong.erp_u_sale_m_inc where stamp
= '"$day"';
select * from ods_shihaihong.erp_u_sale_m_inc where stamp =
'"$day"' limit 5;
"
echo "抽取完成"
#!/bin/bash
echo "生成全量配置文件"
mkdir -p /zhiyun/shihaihong/jobsecho '{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://zhiyun.pub:233
06/erp?useSSL=false"
],
"table": [
"u_sale_pay"
]
}
],
"password": "zhiyun",
"username": "zhiyun"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{"name":
"id","type":"int"},
{"name":"saleno","type":"string"},
{"name":"cardno","type":"string"},
{"name":"netsum","type":"string"},
{"name":"paytype","type":"string"},
{"name":"bak1","type":"string"}
],
"defaultFS": "hdfs://cdh02:8020",
"fieldDelimiter": "\t",
"fileName": "erp_u_sale_pay_full.data",
"fileType": "orc",
"path":
"/zhiyun/shihaihong/ods/erp_u_sale_pay_full",
"writeMode": "truncate"}
}
}
],
"setting": {
"speed": {
"channel": "3"
}
}
}
}' > /zhiyun/shihaihong/jobs/erp_u_sale_pay_full.json
echo "开始抽取"
hadoop fs -mkdir -p /zhiyun/shihaihong/ods/erp_u_sale_pay_full
python /opt/datax/bin/datax.py
/zhiyun/shihaihong/jobs/erp_u_sale_pay_full.json
echo "hive 建表"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
create database if not exists ods_shihaihong location
"/zhiyun/shihaihong/ods";
-- 增量表
create external table if not exists
ods_shihaihong.u_sale_pay_full(
id int,
saleno string,
cardno string,
netsum string,
paytype string,
bak1 string
)
row format delimited fields terminated by "\t"
lines terminated by "\n"
stored as orc
location "/zhiyun/shihaihong/ods/erp_u_sale_pay_full";
'
# echo "加载数据"
# beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
# load data inpath
\"/zhiyun/shihaihong/tmp/erp_u_sale_pay_full/*\" overwrite into
table ods_shihaihong.erp_u_sale_pay_full
partition(createtime='$day');# "
echo "验证数据"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
select count(1) from ods_shihaihong.u_sale_pay_full;
"
echo "抽取完成"
#!/bin/bash
day=$(date -d "yesterday" +%Y-%m-%d)
if [ $1 != "" ]; then
day=$(date -d "$1 -1 day" +%Y-%m-%d);
fi;
echo "抽取的日期为 $day"
echo "生成增量配置文件"
mkdir -p /zhiyun/shihaihong/jobs
echo '
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://zhiyun.pub:233
06/erp?useSSL=false"
],
"table": [
"select u_sale_pay.*,stamp
from u_sale_pay left join u_sale_m on
u_sale_pay.saleno=u_sale_m.saleno where stamp between '\'"$day"
00:00:00\'' and '\'"$day" 23:59:59\'' and id>0 "
]
}
],
"password": "zhiyun",
"username": "zhiyun"
}},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{"name":
"id","type":"int"},
{"name":"saleno","type":"string"},
{"name":"cardno","type":"string"},
{"name":"netsum","type":"string"},
{"name":"paytype","type":"string"},
{"name":"bak1","type":"string"},
{"name":"stamp","type":"string"}
],
"defaultFS": "hdfs://cdh02:8020",
"fieldDelimiter": "\t",
"fileName": "erp_u_sale_pay_inc.data",
"fileType": "orc",
"path":
"/zhiyun/shihaihong/tmp/erp_u_sale_pay_inc",
"writeMode": "truncate"
}
}
}
],
"setting": {
"speed": {
"channel": "3"
}
}
}
}
' > /zhiyun/shihaihong/jobs/erp_u_sale_play_inc.json
echo "开始抽取"
hadoop fs -mkdir -p /zhiyun/shihaihong/tmp/erp_u_sale_play_inc
python /opt/datax/bin/datax.py
/zhiyun/shihaihong/jobs/erp_u_sale_play_inc.json
echo "hive 建表"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e 'create database if not exists ods_shihaihong location
"/zhiyun/shihaihong/ods";
create external table if not exists
ods_shihaihong.erp_u_sale_play_inc(
id int,
saleno string,
cardno string,
netsum string,
paytype string,
bak1 string
) partitioned by (stamp string)
row format delimited fields terminated by "\t"
lines terminated by "\n"
stored as orc
location "/zhiyun/shihaihong/ods/erp_u_sale_play_inc";
'
echo "加载数据"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
load data inpath '/zhiyun/shihaihong/tmp/erp_u_sale_play_inc/*'
overwrite into table ods_shihaihong.erp_u_sale_play_inc
partition(stamp='"$day"');
"
echo "验证数据"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
show partitions ods_shihaihong.erp_u_sale_play_inc;
select count(1) from ods_shihaihong.erp_u_sale_play_inc where
stamp = '"$day"';
select * from ods_shihaihong.erp_u_sale_play_inc where stamp =
'"$day"' limit 5;
"
echo "抽取完成"
原文地址:https://blog.csdn.net/shh2000424/article/details/142375636
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!