
一、需求
监控现在集群hive hbase的各个表的数据量变化情况,将数据量变化写到mysql的数据量表中
二、需求分析与开发
hive大部分都是离线定时任务,所以在离线任务写入到hive之后COUNT一下写入的表的数据量
表设计
设计一张每一张表的数据量表
--数据量表
create table if not exists batch_test_saas.saas_tables_data_size
(
id string COMMENT 'id 表对应时间+"_"+表id',
target_table_id int COMMENT '写入的表id',
write_time timestamp COMMENT '写入时间',
write_data_size int COMMENT '本次写入数据量',
serve_name string COMMENT '写入的任务',
table_data_size int COMMENT '截止当前表总数据量'
)COMMENT '数据量表'
PARTITIonED BY (
year string COMMENT 'year=yyyy',
month string COMMENT 'month=yyyy-MM',
day string COMMENT 'day=yyyy-MM-dd',
hour string COMMENT 'hour=yyyy-MM-dd-HH'
)
row format delimited fields terminated by 't'
stored as parquet
TBLPROPERTIES ('parquet.compresssion'='SNAPPY');
设计一张表信息表,就是用来存id的和所属库、是否是分区表
--表信息表
create table if not exists batch_test_saas.table_infos
(
table_name string COMMENT '表名',
table_id int COMMENT '表id',
database_belong string COMMENT '所属库',
is_partition boolean COMMENT '是否是分区表'
) COMMENT '表信息表';
设计一张单表数据量表
--单表数据量
create table if not exists batch_test_saas.single_table_data_size
(
table_id int COMMENT '表id',
tabl_size int COMMENT '表数据量',
last_update timestamp COMMENT '最后更新时间'
) COMMENT '单标数据量'
PARTITIonED BY (
year string COMMENT 'yyyy',
month string COMMENT 'yyyy-MM',
day string COMMENT 'yyyy-MM-dd'
)
row format delimited fields terminated by ','
stored as parquet
TBLPROPERTIES ('parquet.compresssion' = 'SNAPPY');
创建一个lastupdate表
create table if not exists batch_test_saas.last_update(
table_id int COMMENT '表id',
last_update timestamp COMMENT '最后更新时间'
);
创建一个mysql的做可视化的表
CREATE TABLE `batch_tables_data_size` ( `id` varchar(256) NOT NULL, `target_table_id` int(11) DEFAULT NULL COMMENT '写入的表', `write_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '写入时间', `write_data_size` int(11) DEFAULT NULL COMMENT '本次写入数据量', `serve_name` varchar(255) DEFAULT NULL COMMENT '写入的任务', `table_data_size` int(11) DEFAULT NULL COMMENT '截止当前表总数据量', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
3.接下来做的这个叫啥,我也不清楚了,就叫它详细逻辑吧
首先是lastupdate表的详细 *** 作
这个表的数据量是非常小的,例如我们监控的库一共有20张表,那lastupdate表就只有20条数据,所以每次都是insert overwrite *** 作
表里只记录了每张表的最后更新时间,和表的唯一id
sql如下:
--参数1、currentTimeStamp:时间戳linux赋予
--2、table_name:传入
with infos_id as
(SELECT
t.table_id,
--timestamp(1643531339802) as last_update
timestamp(${currentTimeStamp}) as last_update
from batch_test_saas.table_infos t
where t.table_name = '${table_name}')
insert overwrite table batch_test_saas.last_update
SELECT
t2.table_id,
coalesce(t1.last_update,t2.last_update) as last_update
from batch_test_saas.last_update t2 left join infos_id t1
on t1.table_id = t2.table_id
;
接着是计算单标数据量的逻辑
--计算单表数据量,并插入到单表数据量表,带lastupdate字段
--参数1、table_name:传入计算的表名
--2、currentTimeStamp:时间戳、linux获取
insert into table batch_test_saas.single_table_data_size partition(year,month,day)
select t1.table_id,
t.tabl_size,
t.last_update,
t.year,
t.month,
t.day
from (select '${table_name}' as table_name,
COUNT(*) as tabl_size,
timestamp(${currentTimeStamp}) as last_update,
year(timestamp(${currentTimeStamp})) as year,
substr(to_date(timestamp(${currentTimeStamp})), 0, 7) as month,
to_date(timestamp(${currentTimeStamp})) as day
from batch_test_saas.mysql_vehicle_synchronized_RJ) t
left join batch_test_saas.table_infos t1
on t1.table_name = t.table_name;
然后我们继续
--计算表数据量的最终结果表,每次该表 *** 作更新完,会通过sqoop导出到msyql相对应的表中
--根据计算表的类型来做where区间filter
--规则是小时任务分区为day,日度任务分区为month,月度任务分区为year,除此三种情况不选择分区filter
--参数1:当前计算的表
--参数2:分区字段值
--参数3:当前表所属任务
--tablename=
--partitioncolumn=
--belongsever=
with table_ids as
(select o.table_id
from batch_test_saas.table_infos o
where o.table_name = '${tablename}')
insert into table batch_test_saas.saas_tables_data_size partition (year, month, day, hour)
select t1.table_id,
t1.last_update,
t1.table_size,
'${belongsever}' as serve_name,
coalesce(t2.table_data_size, 0) + t1.table_size as table_data_size,
year(timestamp(t1.last_update)) as year,
substr(timestamp(t1.last_update), 0, 7) as month,
to_date(timestamp(t1.last_update)) as day,
from_unixtime(int(timestamp(t1.last_update)), 'yyyy-MM-dd-HH') as hour
from (select row_number() over (partition by t.table_id order by t.last_update desc) as rn,
t.table_id as table_id,
t.last_update as last_update,
t.tabl_size as table_size
from batch_test_saas.single_table_data_size t
where t.day = '${partitioncolumn}'
and t.table_id in (select m.table_id from table_ids m)) t1
left join (select s.target_table_id,
if(s.table_data_size = 0, s.write_data_size, s.table_data_size) as table_data_size,
s.write_time
from batch_test_saas.saas_tables_data_size s
where s.target_table_id in (select s1.table_id from table_ids s1)
and s.day = '2022-01-30'
order by s.write_time desc
limit 1) t2
where t1.rn = 1;
4.导出到mysql
这里导出使用的sqoop导出
#! /bin/bash
#测试参数
hour=`date +“%Y-%m-%d-%H”`
#传参
inserthour=$1
echo "
#! /bin/bash
sqoop export \
--connect jdbc:mysql://192.168.8.157:3307/car_non_road?serverTimezone=Asia/Shanghai --username web-develop --password FengYun2021 \
--table batch_tables_data_size \
--hcatalog-table saas_tables_data_size \
--hcatalog-database batch_test_saas \
--hcatalog-partition-keys hour \
--hcatalog-partition-values '${inserthour}' \
--input-fields-terminated-by 't' \
--input-null-string '\\N' \
--input-null-non-string '\\N' \
--update-key id \
--update-mode allowinsert \
-m 2
" > /opt/test/scripts/noreal/saas_noroad/scripts/sqoop_export_table_size2mysql.sh
每次做导出 *** 作时候生成导出的sqoop脚本,因为得按照时间分区去导出,这里使用了update-mode为allowinsert,其实也可以全表导出这样的话,会浪费时间,大量的数据都是重复的,所以干脆每次都按小时分区去导出,这样即使有重复的数据,也不会浪费很多时间
三、调度
最后写一个脚本调用一下
#! /bin/bash ##执行生成更新update语句的脚本,并执行 sh /opt/test/scripts/noreal/saas_noroad/scripts/update_table_lastUpdate.sh ##执行生成计算单表数据量的sql,并执行sql sh /opt/test/scripts/noreal/saas_noroad/scripts/single_table_data_size.sh ##执行表数据量最终计算 sh /opt/test/scripts/noreal/saas_noroad/hsql/saas_tables_data_size.sh ##执行生成sqoop导出语句 sh /opt/test/scripts/noreal/saas_noroad/scripts/build_sqoop_export_table_size2mysql.sh ##执行sqoop导出语句 sh /opt/test/scripts/noreal/saas_noroad/scripts/sqoop_export_table_size2mysql.sh
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)