一个监控数据量任务设计及开发,很low

一个监控数据量任务设计及开发,很low,第1张

一个监控数据量任务设计及开发,很low

一、需求

监控现在集群hive hbase的各个表的数据量变化情况,将数据量变化写到mysql的数据量表中

二、需求分析与开发

    hive大部分都是离线定时任务,所以在离线任务写入到hive之后COUNT一下写入的表的数据量

    表设计

设计一张每一张表的数据量表

--数据量表
create table if not exists batch_test_saas.saas_tables_data_size
(
    id                     string     COMMENT 'id 表对应时间+"_"+表id',
    target_table_id        int        COMMENT '写入的表id',
    write_time             timestamp  COMMENT '写入时间',
    write_data_size        int        COMMENT '本次写入数据量',
    serve_name             string     COMMENT '写入的任务',
    table_data_size        int        COMMENT '截止当前表总数据量'
)COMMENT '数据量表'
PARTITIonED BY (
    year    string    COMMENT 'year=yyyy',
    month   string    COMMENT 'month=yyyy-MM',
    day     string    COMMENT 'day=yyyy-MM-dd',
    hour    string    COMMENT 'hour=yyyy-MM-dd-HH'
    )
row format delimited fields terminated by 't'
stored as parquet
TBLPROPERTIES ('parquet.compresssion'='SNAPPY');

 设计一张表信息表,就是用来存id的和所属库、是否是分区表

--表信息表
create table if not exists batch_test_saas.table_infos
(
    table_name      string  COMMENT '表名',
    table_id        int     COMMENT '表id',
    database_belong string  COMMENT '所属库',
    is_partition    boolean COMMENT '是否是分区表'
) COMMENT '表信息表';

设计一张单表数据量表

--单表数据量
create table if not exists batch_test_saas.single_table_data_size
(
    table_id    int       COMMENT '表id',
    tabl_size   int       COMMENT '表数据量',
    last_update timestamp COMMENT '最后更新时间'
) COMMENT '单标数据量'
    PARTITIonED BY (
        year string COMMENT 'yyyy',
        month string COMMENT 'yyyy-MM',
        day string COMMENT 'yyyy-MM-dd'
        )
    row format delimited fields terminated by ','
    stored as parquet
    TBLPROPERTIES ('parquet.compresssion' = 'SNAPPY');

创建一个lastupdate表

create table if not exists batch_test_saas.last_update(
    table_id    int  COMMENT '表id',
    last_update timestamp COMMENT '最后更新时间'
);

创建一个mysql的做可视化的表

CREATE TABLE `batch_tables_data_size` (
  `id` varchar(256) NOT NULL,
  `target_table_id` int(11) DEFAULT NULL COMMENT '写入的表',
  `write_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '写入时间',
  `write_data_size` int(11) DEFAULT NULL COMMENT '本次写入数据量',
  `serve_name` varchar(255) DEFAULT NULL COMMENT '写入的任务',
  `table_data_size` int(11) DEFAULT NULL COMMENT '截止当前表总数据量',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

3.接下来做的这个叫啥,我也不清楚了,就叫它详细逻辑吧

首先是lastupdate表的详细 *** 作

这个表的数据量是非常小的,例如我们监控的库一共有20张表,那lastupdate表就只有20条数据,所以每次都是insert overwrite *** 作

表里只记录了每张表的最后更新时间,和表的唯一id

sql如下:

--参数1、currentTimeStamp:时间戳linux赋予
--2、table_name:传入
with infos_id as
         (SELECT
              t.table_id,
              --timestamp(1643531339802) as last_update
              timestamp(${currentTimeStamp}) as last_update
          from batch_test_saas.table_infos t
          where t.table_name = '${table_name}')
insert overwrite table batch_test_saas.last_update
SELECT
    t2.table_id,
    coalesce(t1.last_update,t2.last_update) as last_update
from batch_test_saas.last_update t2 left join infos_id t1
                                              on t1.table_id = t2.table_id
;

接着是计算单标数据量的逻辑

--计算单表数据量,并插入到单表数据量表,带lastupdate字段
--参数1、table_name:传入计算的表名
--2、currentTimeStamp:时间戳、linux获取

insert into table batch_test_saas.single_table_data_size partition(year,month,day)
select t1.table_id,
       t.tabl_size,
       t.last_update,
       t.year,
       t.month,
       t.day
from (select '${table_name}'                                       as table_name,
             COUNT(*)                                              as tabl_size,
             timestamp(${currentTimeStamp})                        as last_update,
             year(timestamp(${currentTimeStamp}))                  as year,
             substr(to_date(timestamp(${currentTimeStamp})), 0, 7) as month,
             to_date(timestamp(${currentTimeStamp}))               as day
      from batch_test_saas.mysql_vehicle_synchronized_RJ) t
         left join batch_test_saas.table_infos t1
                   on t1.table_name = t.table_name;

然后我们继续

--计算表数据量的最终结果表,每次该表 *** 作更新完,会通过sqoop导出到msyql相对应的表中
--根据计算表的类型来做where区间filter
--规则是小时任务分区为day,日度任务分区为month,月度任务分区为year,除此三种情况不选择分区filter
--参数1:当前计算的表
--参数2:分区字段值
--参数3:当前表所属任务
--tablename=
--partitioncolumn=
--belongsever=
with table_ids as
         (select o.table_id
          from batch_test_saas.table_infos o
          where o.table_name = '${tablename}')
insert into table batch_test_saas.saas_tables_data_size partition (year, month, day, hour)
select t1.table_id,
       t1.last_update,
       t1.table_size,
       '${belongsever}'                                               as serve_name,
       coalesce(t2.table_data_size, 0) + t1.table_size                as table_data_size,
       year(timestamp(t1.last_update))                                as year,
       substr(timestamp(t1.last_update), 0, 7)                        as month,
       to_date(timestamp(t1.last_update))                             as day,
       from_unixtime(int(timestamp(t1.last_update)), 'yyyy-MM-dd-HH') as hour
from (select row_number() over (partition by t.table_id order by t.last_update desc) as rn,
             t.table_id                                                              as table_id,
             t.last_update                                                           as last_update,
             t.tabl_size                                                             as table_size
      from batch_test_saas.single_table_data_size t
      where t.day = '${partitioncolumn}'
        and t.table_id in (select m.table_id from table_ids m)) t1
         left join (select s.target_table_id,
                           if(s.table_data_size = 0, s.write_data_size, s.table_data_size) as table_data_size,
                           s.write_time
                    from batch_test_saas.saas_tables_data_size s
                    where s.target_table_id in (select s1.table_id from table_ids s1)
                      and s.day = '2022-01-30'
                    order by s.write_time desc
                    limit 1) t2
where t1.rn = 1;

4.导出到mysql

这里导出使用的sqoop导出

#! /bin/bash

#测试参数
hour=`date +“%Y-%m-%d-%H”`
#传参
inserthour=$1

echo "
#! /bin/bash
sqoop export \
--connect jdbc:mysql://192.168.8.157:3307/car_non_road?serverTimezone=Asia/Shanghai --username web-develop --password FengYun2021 \
--table batch_tables_data_size \
--hcatalog-table saas_tables_data_size \
--hcatalog-database batch_test_saas \
--hcatalog-partition-keys hour  \
--hcatalog-partition-values '${inserthour}' \
--input-fields-terminated-by 't' \
--input-null-string '\\N' \
--input-null-non-string '\\N' \
--update-key id \
--update-mode allowinsert \
-m 2
" > /opt/test/scripts/noreal/saas_noroad/scripts/sqoop_export_table_size2mysql.sh

每次做导出 *** 作时候生成导出的sqoop脚本,因为得按照时间分区去导出,这里使用了update-mode为allowinsert,其实也可以全表导出这样的话,会浪费时间,大量的数据都是重复的,所以干脆每次都按小时分区去导出,这样即使有重复的数据,也不会浪费很多时间

三、调度

最后写一个脚本调用一下

#! /bin/bash
##执行生成更新update语句的脚本,并执行
sh /opt/test/scripts/noreal/saas_noroad/scripts/update_table_lastUpdate.sh
##执行生成计算单表数据量的sql,并执行sql
sh /opt/test/scripts/noreal/saas_noroad/scripts/single_table_data_size.sh           ##执行表数据量最终计算
sh /opt/test/scripts/noreal/saas_noroad/hsql/saas_tables_data_size.sh               ##执行生成sqoop导出语句
sh /opt/test/scripts/noreal/saas_noroad/scripts/build_sqoop_export_table_size2mysql.sh ##执行sqoop导出语句
sh /opt/test/scripts/noreal/saas_noroad/scripts/sqoop_export_table_size2mysql.sh 

欢迎分享,转载请注明来源:内存溢出

原文地址:https://www.54852.com/zaji/5716184.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-12-17
下一篇2022-12-17

发表评论

登录后才能评论

评论列表(0条)

    保存