
1、在mysql中创建数据库gma
2、创建所有的表
init_mysql_table.sql
3、开启canal动态topic功能
vim /usr/local/soft/canal/conf/example/instance.properties
# 监控gma数据库,不同的表发送到表名的topic上
canal.mq.dynamicTopic=gma…*
4、启动zk 和kafka
5、启动canal
cd /usr/local/soft/canal/bin
./startup.sh
6、导入数据
load_data.sql
2、在flink中构建ods层1、需要先启动hadoop hive的元数据服务, flink on yarn-session
# 启动hadoop start-all.sh # 启动hive元数据服务 nohup hive --service metastore >> metastore.log 2>&1 & # 启动yarn-session yarn-session.sh -jm 1024m -tm 1096
2、进入sql-client创建ods层的表
sql-client.sh embedded # 创建库 create database gma_ods; create database gma_dwd; create database gma_dws; create database gma_dim; create database gma_ads;3、在ods层中创建所有表
ods_mysql_kafka_base_category1.sql ods_mysql_kafka_base_category2.sql ods_mysql_kafka_base_category3.sql ods_mysql_kafka_base_province.sql ods_mysql_kafka_base_region.sql ods_mysql_kafka_base_trademark.sql ods_mysql_kafka_date_info.sql ods_mysql_kafka_holiday_info.sql ods_mysql_kafka_holiday_year.sql ods_mysql_kafka_order_detailr.sql ods_mysql_kafka_order_info.sql ods_mysql_kafka_order_status_log.sql ods_mysql_kafka_payment_info.sql ods_mysql_kafka_sku_info.sql ods_mysql_kafka_user_info.sql4、构建dim层
4.1、地区维表1、在mysql 中创建gma_dim数据库用于存储维表数据
再mysql中创建gma_dim
1、在flink sql-client中创建jdbc表
-- 在flink中创建sink表 drop table gma_dim.dim_kafka_mysql_region; CREATE TABLE gma_dim.dim_kafka_mysql_region( id bigint, name STRING, area_code STRING, region_name STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://master:3306/gma_dim?useUnicode=true&characterEncoding=UTF-8', 'table-name' = 'dim_kafka_mysql_region', 'username' = 'root', 'password' = '123456' );
2、在mysql中创建地区维表
-- mysql中创建接收表 CREATE TABLE `dim_kafka_mysql_region` ( `id` bigint(20) NOT NULL COMMENT 'id', `name` varchar(20) DEFAULT NULL COMMENT '省名称', `area_code` varchar(20) DEFAULT NULL COMMENT '行政区位码', `region_name` varchar(20) DEFAULT NULL COMMENT '大区名', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
3、在到flink中创建mysql cdc表
--- 使用mysqlcdc读取维表 --- mysql cdc 只支持读取,不支持写入 drop table gma_dim.dim_kafka_mysql_region_cdc; CREATE TABLE gma_dim.dim_kafka_mysql_region_cdc( id bigint, name STRING, area_code STRING, region_name STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'master', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'database-name' = 'gma_dim', 'table-name' = 'dim_kafka_mysql_region' );
4、将项目打包提交到集群中运行
flink run -c com.shujia.dim.DImKafkaMysqlRegion gma9-1.0.jar4.2 、商品维度表
1、在flink中创建sink表
-- 在flink中创建sink表
drop table gma_dim.dim_kafka_mysql_sku;
CREATE TABLE gma_dim.dim_kafka_mysql_sku(
`id` bigint COMMENT 'skuid(itemID)',
`spu_id` bigint COMMENT 'spuid',
`price` decimal(10,0) COMMENT '价格',
`sku_name` STRING COMMENT 'sku名称',
`sku_desc` STRING COMMENT '商品规格描述',
`weight` decimal(10,2) COMMENT '重量',
`tm_name` STRING COMMENT '品牌名',
`category3_name` STRING COMMENT '三级分类id(冗余)',
`category2_name` STRING COMMENT '二级分类id(冗余)',
`category1_name` STRING COMMENT '一级分类id(冗余)',
`sku_default_img` STRING COMMENT '默认显示图片(冗余)',
`create_time`TIMESTAMP(3) COMMENT '创建时间',
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://master:3306/gma_dim?useUnicode=true&characterEncoding=UTF-8',
'table-name' = 'dim_kafka_mysql_sku',
'username' = 'root',
'password' = '123456'
);
2、在mysql中创建维表
-- mysql中创建接收表
CREATE TABLE `dim_kafka_mysql_sku` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'skuid(itemID)',
`spu_id` bigint(20) DEFAULT NULL COMMENT 'spuid',
`price` decimal(10,0) DEFAULT NULL COMMENT '价格',
`sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名称',
`sku_desc` varchar(2000) DEFAULT NULL COMMENT '商品规格描述',
`weight` decimal(10,2) DEFAULT NULL COMMENT '重量',
`tm_name` varchar(200) DEFAULT NULL COMMENT '品牌(冗余)',
`category3_name` varchar(200) DEFAULT NULL COMMENT '三级分类id(冗余)',
`category2_name` varchar(200) DEFAULT NULL COMMENT '二级分类id(冗余)',
`category1_name` varchar(200) DEFAULT NULL COMMENT '一级分类id(冗余)',
`sku_default_img` varchar(200) DEFAULT NULL COMMENT '默认显示图片(冗余)',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
3、在flink创建创建cdc表
drop table gma_dim.dim_kafka_mysql_sku_cdc;
CREATE TABLE gma_dim.dim_kafka_mysql_sku_cdc(
`id` bigint COMMENT 'skuid(itemID)',
`spu_id` bigint COMMENT 'spuid',
`price` decimal(10,0) COMMENT '价格',
`sku_name` STRING COMMENT 'sku名称',
`sku_desc` STRING COMMENT '商品规格描述',
`weight` decimal(10,2) COMMENT '重量',
`tm_name` STRING COMMENT '品牌名',
`category3_name` STRING COMMENT '三级分类id(冗余)',
`category2_name` STRING COMMENT '二级分类id(冗余)',
`category1_name` STRING COMMENT '一级分类id(冗余)',
`sku_default_img` STRING COMMENT '默认显示图片(冗余)',
`create_time`TIMESTAMP(3) COMMENT '创建时间',
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'master',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'gma_dim',
'table-name' = 'dim_kafka_mysql_sku'
);
4、将项目打包上传运行
flink run -c com.shujia.dim.DimKafkaMysqlSku gma9-1.0.jar4.3、用户维度表
1、在flink sql -client 中创建sink表
drop table gma_dim.dim_kafka_mysql_user_info; CREATE TABLE gma_dim.dim_kafka_mysql_user_info( `id` bigint COMMENT '编号', `login_name` STRING COMMENT '用户名称', `nick_name` STRING COMMENT '用户昵称', `passwd` STRING COMMENT '用户密码', `name` STRING COMMENT '用户姓名', `phone_num` STRING COMMENT '手机号', `email` STRING COMMENT '邮箱', `head_img` STRING COMMENT '头像', `user_level` STRING COMMENT '用户级别', `birthday` DATE COMMENT '用户生日', `gender` STRING COMMENT '性别 M男,F女', `create_time` TIMESTAMP(3) COMMENT '创建时间', PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://master:3306/gma_dim?useUnicode=true&characterEncoding=UTF-8', 'table-name' = 'dim_kafka_mysql_user_info', 'username' = 'root', 'password' = '123456' );
2、在mysql中创建维度表
CREATE TABLE `dim_kafka_mysql_user_info` ( `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号', `login_name` varchar(200) DEFAULT NULL COMMENT '用户名称', `nick_name` varchar(200) DEFAULT NULL COMMENT '用户昵称', `passwd` varchar(200) DEFAULT NULL COMMENT '用户密码', `name` varchar(200) DEFAULT NULL COMMENT '用户姓名', `phone_num` varchar(200) DEFAULT NULL COMMENT '手机号', `email` varchar(200) DEFAULT NULL COMMENT '邮箱', `head_img` varchar(200) DEFAULT NULL COMMENT '头像', `user_level` varchar(200) DEFAULT NULL COMMENT '用户级别', `birthday` date DEFAULT NULL COMMENT '用户生日', `gender` varchar(1) DEFAULT NULL COMMENT '性别 M男,F女', `create_time` datetime DEFAULT NULL COMMENT '创建时间', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=153 DEFAULT CHARSET=utf8 COMMENT='用户维表';
3、在flink中创建cdc 表
-- 创建flink cdc 维表, 不能作为sink表,只能作为source drop table gma_dim.dim_kafka_mysql_user_info_cdc; CREATE TABLE gma_dim.dim_kafka_mysql_user_info_cdc( `id` bigint COMMENT '编号', `login_name` STRING COMMENT '用户名称', `nick_name` STRING COMMENT '用户昵称', `passwd` STRING COMMENT '用户密码', `name` STRING COMMENT '用户姓名', `phone_num` STRING COMMENT '手机号', `email` STRING COMMENT '邮箱', `head_img` STRING COMMENT '头像', `user_level` STRING COMMENT '用户级别', `birthday` DATE COMMENT '用户生日', `gender` STRING COMMENT '性别 M男,F女', `create_time` TIMESTAMP(3) COMMENT '创建时间', PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'master', 'port' = '3306', 'username' = 'root', 'password' = 'MyNewPass4!', 'database-name' = 'gma_dim', 'table-name' = 'dim_kafka_mysql_user_info' );
4、提交代码运行
flink run -c com.shujia.dim.DimKafkaMysqlUserInfo gma9-1.0.jar5、dwd 层 5.1、支付订单明细表
1、在flink中创建支付订单明细表
-- ---------------------------------
-- DWD层,支付订单明细表dwd_paid_order_detail
-- ---------------------------------
CREATE TABLE gma_dwd.dwd_paid_order_detail
(
detail_id BIGINT COMMENT '编号',
order_id BIGINT COMMENT '订单编号',
user_id BIGINT COMMENT '用户id',
province_id INT COMMENT '地区',
sku_id BIGINT COMMENT 'sku_id',
sku_num INT COMMENT '购买个数',
order_price DECIMAL(10,0) COMMENT '购买价格(下单时sku价格)',
create_time TIMESTAMP(0) COMMENT '创建时间',
payment_way STRING COMMENT '付款方式',
pay_time TIMESTAMP(0) COMMENT '支付时间'
) WITH (
'connector' = 'kafka',
'topic' = 'gma_dwd.dwd_paid_order_detail',
'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
'format' = 'changelog-json'
)
2、启动程序
flink run -c com.shujia.dwd.DwdPaidOrderDetail gma9-1.0.jar6、ads 层 6.1、省指标
1、在flink中创建表
-- ---------------------------------
-- 使用 DDL创建MySQL中的ADS层表
-- 指标:1.每天每个省份的订单数
-- 2.每天每个省份的订单金额
-- ---------------------------------
CREATE TABLE gma_ads.ads_province_index(
province_id INT,
area_code STRING,
province_name STRING,
region_name STRING,
order_amount DECIMAL(10,2),
order_count BIGINT,
dt STRING,
PRIMARY KEY (province_id, dt) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://master:3306/gma_ads?useUnicode=true&characterEncoding=UTF-8',
'table-name' = 'ads_province_index',
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'root',
'password' = '123456'
)
2、在mysql中创建表
CREATE TABLE `ads_province_index` ( province_id INT, area_code varchar(200), province_name varchar(200), region_name varchar(200), order_amount DECIMAL(10,2), order_count bigint(20), dt varchar(200), PRIMARY KEY (`province_id`,`dt`) ) COMMENT='地区指标表';
3、启动指标计算程序
flink run -c com.shujia.ads.AdsProvinceIndex gma9-1.0.jar6.2、商品指标
1、在flink中创建表
-- ---------------------------------
-- 使用 DDL创建MySQL中的ADS层表
-- 指标 1.每天每个商品对应的订单个数
-- 2.每天每个商品对应的订单金额
-- 3.每天每个商品对应的数量
-- ---------------------------------
drop table gma_ads.ads_sku_index;
CREATE TABLE gma_ads.ads_sku_index
(
sku_id BIGINT,
weight decimal(10,2),
tm_name STRING ,
price decimal(10,2),
spu_id BIGINT,
category3_name STRING ,
category2_name STRING,
category1_name STRING,
order_count BIGINT NOT NULL,
order_amount decimal(10,2),
sku_count int,
dt STRING,
PRIMARY KEY (sku_id,dt) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://node1:3306/gma_ads?useUnicode=true&characterEncoding=UTF-8',
'table-name' = 'ads_sku_index',
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'root',
'password' = '123456'
)
2、在mysql中创建表
CREATE TABLE `ads_sku_index` ( sku_id bigint(20), weight decimal(10,2), tm_name varchar(200) , price decimal(10,2), spu_id varchar(200) , category3_name varchar(200) , category2_name varchar(200) , category1_name varchar(200) , order_count bigint(20), order_amount decimal(10,2), sku_count bigint(20), dt varchar(200) , PRIMARY KEY (`sku_id`,`dt`) ) COMMENT='商品指标表';
3、启动指标计算任务
flink run -c com.shujia.ads.AdsSkuIndex gma9-1.0.jar
7、提供数据接口1、修改mysql地址以及用户名密码
com.shujia.gma.controller.ProvinceController
com.shujia.gma.controller.SkuController
2、启动接口服务
com.shujia.gma.GmaApiApplication
3、访问接口
localhost:8080/getSkuIndex?&id=1&day=2019-11-23
localhost:8080/getProvinceIndex?&id=1&day=2019-11-23
8、监控 8.1 部署prometheus (底层TSDB ,时序数据库)1、上传解压修改配置文件
vim prometheus.yml
# 在最后增加配置
- job_name: 'pushgateway'
scrape_interval: 10s
honor_labels: true
static_configs:
- targets: ['localhost:9091']
labels:
instance: pushgateway
2、启动prometheus
nohup ./prometheus &8.2 部署pushgateway
1、上传解压启动
nohup ./pushgateway &8.3 然后在 Flink 的配置⽂件中添加:
cd /usr/local/soft/flink-1.11.2/conf vim flink-conf.yaml # 增加配置 metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter metrics.reporter.promgateway.host: master # promgateway 地址 metrics.reporter.promgateway.port: 9091 metrics.reporter.promgateway.jobName: shujia metrics.reporter.promgateway.randomJobNameSuffix: true metrics.reporter.promgateway.deleteOnShutdown: false8.4、重启flink yarn session 启动flink任务 8.5访问页面查看指标
http://master:9091/
8.6 部署Grafana上传解压启动
./grafana-server
访问页面 用户名:admin 密码: admin
http://master:3000/dashboard
增加prometheus 数据源
增加数据表
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)