HDFSImporter 是用于将存放于 HDFS 上的数据批量地导入到 Sensors Analytics 的工具。
默认情况下,HDFSImporter读取的数据是位于 HDFS 上的文本文件,且格式符合 Sensors Analytics 的要求,数据格式相关定义请参考:
https://www.sensorsdata.cn/manual/data_schema.html
若希望使用 HDFSImporter 导入其他格式的数据,就需要开发自己的 InputFormat,处理流程大致如下:
         InputFormat<LongWritable, Text>
    HDFS ==============================> 符合 Sensors Analytics 的数据格式定义的数据 => mapper
一个例子,读取的文件格式 ORCFile,具体表结构和数据如下:
CREATE EXTERNAL TABLE rawdata.event_table_example (
        distinct_id STRING,
        `time` TIMESTAMP,
        event_name STRING,
        ip STRING,
        app_version STRING,
        user_agent STRING
) STORED AS ORC
LOCATION '/sa/tmp/orc/event_table_example/';
INSERT INTO rawdata.event_table_example VALUES('zhangsan', '2017-07-05 13:36:00.000', 'PageView', '10.0.106.25', 'V1.7.2699', 'Mozilla');经过 自定义数据预处理模块 处理,得到 符合 Sensors Analytics 数据格式定义的数据,内容如下:
{
    "distinct_id": "zhangsan",
    "time":1499232960000,
    "type":"track",
    "event":"PageView",
    "properties":{
        "$ip":"10.0.106.25",
        "$user_agent":"Mozilla",
        "$app_version":"V1.7.2699"
    }
}src 代码为一个具体的示例,功能是读取 ORCFile 并转为符合 Sensors Analysis 要求的格式。
参考代码示例,实现自己的 InputFormat,注意必须继承自 FileInputFormat<LongWritable, Text>
类定义示意如下:
public class SaOrcInputFormat extends FileInputFormat<LongWritable, Text> {
  @Override
  public RecordReader<LongWritable, Text>
      createRecordReader(InputSplit inputSplit,
                         TaskAttemptContext taskAttemptContext
                         ) throws IOException, InterruptedException {
  }
}- 
编译 Java 代码并打包。以样例为例子:
mvn clean package
 - 
将编译出来的
custom_inputformat-1.0-SNAPSHOT.jar拷贝到部署了 Sensors Analytics 服务的机器目录下,如:/home/sa_cluster/custom_lib/custom_inputformat-1.0-SNAPSHOT.ja。注意尽量不要放在/home/sa_cluster/sa/子目录下,否则有可能在神策系统升级的时候被覆盖掉。 - 
(可选)指定环境变量和类名参数,并开启
debug模式,然后启动 HDFSImporter 导入任务进行数据校验。# 通过该环境变量设置 jar 包的位置 export CUSTOM_LIB_JARS='/home/sa_cluster/custom_lib/custom_inputformat-1.0-SNAPSHOT.jar' sh /home/sa_cluster/sa/tools/hdfs_importer/bin/hdfs_importer.sh \ --path /data/your_input \ --project your_project \ --input_format com.sensorsdata.analytics.mapreduce.orcfile.SaOrcInputFormat \ # 通过该参数指定 InputFormat 类名 --debug # 通过该参数开启 debug 模式,只会校验数据,不会真正的进行导入
 - 
数据校验符合预期之后,就可以关掉
debug模式,进行正常的数据导入了。# 通过该环境变量设置 jar 包的位置 export CUSTOM_LIB_JARS='/home/sa_cluster/custom_lib/custom_inputformat-1.0-SNAPSHOT.jar' sh /home/sa_cluster/sa/tools/hdfs_importer/bin/hdfs_importer.sh \ --path /data/your_input \ --project your_project \ --input_format com.sensorsdata.analytics.mapreduce.orcfile.SaOrcInputFormat # 通过该参数指定 InputFormat 类名
 
- 自定义的InputFormat抛出任意未被捕获的异常都有可能直接导致导入任务执行失败