
Date: December 17, 2021
1. 背景在工作中,数据仓库主要使用parquet格式作为数据存储,有些场景中,需要使用datax进行数据出仓。但是,目前Alibaba Datax 的HdfsReader插件并不支持Parquet格式。在网上也查了不少博客和资料,并没有看到相关的插件开源,因此决定自己开发。
2. *** 作步骤 2.1 代码开发从alibaba Datax官网拉取代码,并新建分支,对hdfsreader模块进行调整,增加对parquet文件读取的相关代码块。主要修改点在下边两个类中:
根据ORC读取方式的代码,结合parquet的文件格式以及parquet API 进行改造,主要在DFSUtil.java 中增加parquetFileStartRead方法,代码如下,请参考:
public void parquetFileStartRead(String sourceParquetFilePath, Configuration readerSliceConfig,
RecordSender recordSender, TaskPluginCollector taskPluginCollector) {
LOG.info(String.format("Start Read parquetfile [%s].", sourceParquetFilePath));
List column = UnstructuredStorageReaderUtil
.getListColumnEntry(readerSliceConfig, com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COLUMN);
String nullFormat = readerSliceConfig.getString(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.NULL_FORMAT);
StringBuilder allColumns = new StringBuilder();
StringBuilder allColumnTypes = new StringBuilder();
boolean isReadAllColumns = false;
int columnIndexMax = -1;
// 判断是否读取所有列
if (null == column || column.size() == 0) {
int allColumnsCount = getParquetAllColumnsCount(sourceParquetFilePath);
columnIndexMax = allColumnsCount - 1;
isReadAllColumns = true;
} else {
columnIndexMax = getMaxIndex(column);
}
for (int i = 0; i <= columnIndexMax; i++) {
allColumns.append("col");
allColumnTypes.append("string");
if (i != columnIndexMax) {
allColumns.append(",");
allColumnTypes.append(":");
}
}
if (columnIndexMax >= 0) {
JobConf conf = new JobConf(hadoopConf);
Path parquetFilePath = new Path(sourceParquetFilePath);
Properties p = new Properties();
p.setProperty("columns", allColumns.toString());
p.setProperty("columns.types", allColumnTypes.toString());
try {
//方式1:不采用。原因:使用ParquetHiveSerDe 读取时,会将parquet中的String类型字段读取成BytesWritable,但难以转换为String
// ParquetHiveSerDe serde = new ParquetHiveSerDe();
// serde.initialize(conf, p);
// StructObjectInspector inspector = (StructObjectInspector) serde.getObjectInspector();
// InputFormat, ?> in = new MapredParquetInputFormat();
// FileInputFormat.setInputPaths(conf, parquetFilePath.toString());
//
// //If the network disconnected, will retry 45 times, each time the retry interval for 20 seconds
// //Each file as a split
// //TODO multy threads
// InputSplit[] splits = in.getSplits(conf, 1);
//
// RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL);
// Object key = reader.createKey();
// Object value = reader.createvalue();
// // 获取列信息
// List extends StructField> fields = inspector.getAllStructFieldRefs();
//
// List
附上本人的gitee代码仓库: https://gitee.com/jackielee4cn/bigdata-history-query.git
欢迎指正或Start。
3.使用样例 3.1 编译安装下载源码,编译打包,找到模块文件 的 target/datax/plugin/reader/hdfsreader.zip 文件。将文件解压到datax安装目录的${DATAX_HOME}/plugin/reader/ 下 。注意提前备份原有默认的hdfsreader插件,以免出现问题时,进行回滚。
3.2 配置datax job配置方式与官网的orc根式hdfsreader方式一致,只是这里的fileType除了可以使用text、orc、csv等格式外,还可以用 parquet 。具体样例内容如下:
test_hdfsreader_parquet.job
{
"job": {
"setting": {
"speed": {
"channel": 3,
"byte": 10485760
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"path": "/user/hive/warehouse/test.db/test_datax_parquet/date_id=20211201",
"defaultFS": "hdfs://test01:8020",
"fileType": "parquet",
"skipHeader": false,
"column": [
{
"index": "0",
"type": "long"
},
{
"index": "1",
"type": "string"
},
{
"index": "2",
"type": "long"
},
{
"index": "3",
"type": "double"
},
{
"index": "4",
"type": "string"
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "replace",
"username": "write_user",
"password": "Writeuser@123",
"column": [
"`f_id`",
"`f_order_id`",
"`f_is_refund`",
"`f_amount`",
"`f_time`"
],
"connection": [
{
"table": [
"test_datax_parquet"
],
"jdbcUrl": "jdbc:mysql://test02:3306/test?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&dontTrackOpenResources=true"
}
]
}
}
}
]
}
}
3.3 执行job
python ${DATAX_HOME}/bin/datax.py test_hdfsreader_parquet.job
查看控制台日志,执行正常。
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)