Elkeid 原始数据接入指南

适用版本

Elkeid 社区版v1.9.1

综述

Elkeid为了优化数据流压力和追求数据流灵活性,选用PB格式来传递数据。PB在kafka中存储为 byte[] 形式的消息,因此在消费数据后需要按照protobuf的形式进行解压,否则看到将是乱码字段。因此对于一般情况下使用来说,转移成JSON 数据流会更符合大数据套件的数据接入情况。这里提供接受Elkeid数据流并转换成JSON数据流的两种样例,供使用者参考。

PB Schema

如下即为 HIDS的 PB的protobuf schema,需要注意的是Item类为了灵活的支持多种版本的数据更新,Map中通过K-V的方式包含了基本全部类型的数据格式。因此单独PB后需要针对Map遍历提取K-V对放入全体数据后才能较好的支持进入大数据套件。

syntax = "proto3";
option go_package = "hids_pb";
package hids_pb;

//server -> bmq
message MQData{
  int32 data_type = 1;
  int64 timestamp = 2;
  Item body = 3;

  string agent_id = 4;
  string in_ipv4_list = 5;
  string ex_ipv4_list = 6;
  string in_ipv6_list = 7;
  string ex_ipv6_list = 8;
  string hostname = 9;
  string version = 10;
  string product = 11;

  int64  time_pkg = 12;
  string psm_name = 13;
  string psm_path = 14;
  string tags = 15;
}

message Item{
  map<string,string> fields = 1;
}

MQData.proto

Map内的数据字段请参考 Elkeid数据说明

简便方式(Elkeid-HUB)

Elkeid原生自带基于golang的PB数据解析,因此可以通过利用Elkeid-HUB进行数据的解压和ETL操作。

配置输出端

首先找到对应的输出端,Elkeid原生支持将PB的数据流转换成JSON并以ES/Kafka等方式进行输出,此处以kafka为例,填入所需的Kafka Bootstrap Server 地址(含端口)和topic名即可。此处建议打开AddTimestamp追加输出HUB的时间(将Unix系统时间转换成时间戳格式)

配置Project

在project中新增project,并按如下新增到Kafka的输出。注意OUTPUT后的值要和上面配置输出端的OutputID 保持一致。配之后点击Confirm 保存

INPUT.hids --> OUTPUT.your_kafka

启动

点击 Rule Publish 提交更改。等待任务完成后点击Project Operation 将任务启动即可。之后即可在所配制的Kafka topic中看到JSON格式的数据字段

注意如果用户希望直接将HIDS数据写入ES,请参考上述 Elkeid HUB 配置

如果用户利用Logstash进行日志采集,可以参考Logstash Protobuf Plugin进行配置。

Consumer 样例

这里以Python Kafka 套件消费数据为主进行展示

需要 kafka-python 和 protobuf 两个 python3 lib

安装protoc

首先需要安装protobuf的 compiler,主要用于生成 MQData_pb2.py 这个文件。用户可以使用我们直接提供的文件,或者自行编译。这里以 linux 为例,执行下面语句即可安装3.14的 protoc

PROTOC_ZIP=protoc-3.14.0-linux-x86_64.zip
curl -OL https://github.com/protocolbuffers/protobuf/releases/download/v3.14.0/$PROTOC_ZIP
sudo unzip -o $PROTOC_ZIP -d /usr/local bin/protoc
sudo unzip -o $PROTOC_ZIP -d /usr/local 'include/*'
rm -f $PROTOC_ZIP

将上面的PB Schema进行编译

protoc -I=. --python_out=. ./MQData.proto

会生成 MQData_pb2.py 文件,将这个文件放到您的项目中。

MQData_pb2.py

编写consumer

因为Elkeid PB 将主要数据放入 body.field 这个map中,因此需要对数据进行拍平,生成原生的一级数据结构。这里提供一个 decoder 供Kafka Serializer使用

#!/usr/bin/python3

# decoder of Elkeid PB, input string and will dump json for you.
def pbDecoder(value):
    ret = {}
    aMQData = MQData.MQData();
    aMQData.ParseFromString(value)

    # common part of message
    ret["data_type"] = str(aMQData.data_type)
    ret["timestamp"] = str(aMQData.timestamp)
    ret["agent_id"] = aMQData.agent_id
    ret["in_ipv4_list"] = aMQData.in_ipv4_list
    ret["ex_ipv4_list"] = aMQData.ex_ipv4_list
    ret["in_ipv6_list"] = aMQData.in_ipv6_list
    ret["ex_ipv6_list"] = aMQData.ex_ipv6_list
    ret["hostname"] = aMQData.hostname
    ret["version"] = aMQData.version
    ret["product"] = aMQData.product
    ret["time_pkg"] = str(aMQData.time_pkg)
    ret["psm_name"] = aMQData.psm_name
    ret["psm_path"] = aMQData.psm_path
    ret["tags"] = aMQData.tags

    # major data part of message
    for key in aMQData.body.fields:
        ret[key] =  aMQData.body.fields[key]
    
    return json.dumps(ret)

然后创建 Kafka consumer,将上述的decoder作为 kafkavalue_deserializer 传递进去。Elkeid的默认topic为 hids_svr

#!/usr/bin/python3
from kafka import KafkaConsumer
import MQData_pb2 as MQData # 刚刚编译的那个class文件
import json

# decoder of Elkeid PB, input string and will dump json for you.
def pbDecoder(value):
    ret = {}
    aMQData = MQData.MQData();
    aMQData.ParseFromString(value)

    # common part of message
    ret["data_type"] = str(aMQData.data_type)
    ret["timestamp"] = str(aMQData.timestamp)
    ret["agent_id"] = aMQData.agent_id
    ret["in_ipv4_list"] = aMQData.in_ipv4_list
    ret["ex_ipv4_list"] = aMQData.ex_ipv4_list
    ret["in_ipv6_list"] = aMQData.in_ipv6_list
    ret["ex_ipv6_list"] = aMQData.ex_ipv6_list
    ret["hostname"] = aMQData.hostname
    ret["version"] = aMQData.version
    ret["product"] = aMQData.product
    ret["time_pkg"] = str(aMQData.time_pkg)
    ret["psm_name"] = aMQData.psm_name
    ret["psm_path"] = aMQData.psm_path
    ret["tags"] = aMQData.tags

    # major data part of message
    for key in aMQData.body.fields:
        ret[key] =  aMQData.body.fields[key]
    
    return json.dumps(ret)


# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer('hids_svr',
                         group_id='test',
                         auto_offset_reset='latest',
                         bootstrap_servers=['10.2.0.67:9092', '10.2.0.233:9092', '10.2.0.92:9092'],
                         value_deserializer = lambda m: pbDecoder(m))

# Print all message in JSON format
# this is the part that you need to code to your job
for message in consumer:
    # do something with message.value
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))

数据持久化建议

整体建议

数据在获取到Json格式后即可根据Elkeid 数据说明 的字段进行相应的存储工作。一般情况对于HIDS数据持久化存储需求应该符合使用方公司自身的数据中台规范需求。这里仅仅提供建议,而没有标准化方法可以直接使用。

大宽表

对于机器数量少于1w台的情况,我们建议用户将Elkeid数据以大宽表的形式存储。也就是说将所有数据用共同的表schema进行存储。我们建议用户将Elkeid数据按照时间进行分区,方便后续进行过期(TTL)设置。

建表DDL请参考如下:

CREATE TABLE `YOUR_DBNAME`.`hids_rawdata`(
    `agent_id` STRING COMMENT 'HIDS 的 agent_id',
    `argv` STRING COMMENT '进程命令行',
    `comm` STRING COMMENT '进程命令行首项',
    `data_type` STRING COMMENT '数据类型',
    `dip` STRING COMMENT '链接目的IP',
    `dport` STRING COMMENT '链接目的端口',
    `exe` STRING COMMENT '命令首项的文件路径',
    `exe_hash` STRING COMMENT '命令首项的文件哈希',
    `hostname` STRING COMMENT '主机名',
    `ld_preload` STRING COMMENT '执行时加载的额外so文件',
    `nodename` STRING COMMENT '节点名',
    `pgid` STRING COMMENT '进程组ID',
    `pid` STRING COMMENT '进程ID',
    `pid_tree` STRING COMMENT '进程链条',
    `ppid` STRING COMMENT '父进程ID',
    `run_path` STRING COMMENT '命令执行时所在的位置',
    `res` STRING COMMENT '返回值',
    `sa_family` STRING COMMENT '链接的协议组',
    `sessionid` STRING COMMENT 'linux进程会话ID',
    `sip` STRING COMMENT '链接源IP',
    `socket_pid` STRING COMMENT '进程链上存在socket的进程ID',
    `sport` STRING COMMENT '链接源端口',
    `ssh` STRING COMMENT '进程创建时的SSH环境变量信息',
    `stdin` STRING COMMENT '标准输入',
    `stdout` STRING COMMENT '标准输出',
    `tgid` STRING COMMENT '线程组ID',
    `time` STRING COMMENT '进程发生时间',
    `tty` STRING COMMENT 'tty字符终端',
    `uid` STRING COMMENT 'linux上用户ID',
    `username` STRING COMMENT 'linux上用户名',
    `query` STRING COMMENT 'DNS 查询语句',
    `qr` STRING COMMENT 'DNS 查询qr',
    `opcode` STRING COMMENT 'DNS 查询opcode',
    `rcode` STRING COMMENT 'DNS 查询rcode',
    `file_path` STRING COMMENT '创建文件路径',
    `ptrace_request` STRING COMMENT 'ptrace时的请求ID',
    `target_pid` STRING COMMENT 'ptrace时被调试进程ID',
    `lkm_file` STRING COMMENT '内核模块名(老)',
    `old_uid` STRING COMMENT '提权前linux用户ID',
    `module_name` STRING COMMENT '内核模块名',
    `hidden` STRING COMMENT '是否为隐藏内核模块',
    `syscall_number` STRING COMMENT '被Hook的syscall ID',
    `interrupt_number` STRING COMMENT '被Hook的中断 ID',
    `data` STRING COMMENT '数据(某些场景使用)',
    `socket_argv` STRING COMMENT '进程链上存在socket的进程命令行',
    `ppid_argv` STRING COMMENT '父进程命令行',
    `pgid_argv` STRING COMMENT '进程组长命令行',
    `connect_type` STRING COMMENT '链接的类型',
    `oldname` STRING COMMENT '原用户名',
    `newname` STRING COMMENT '新用户名',
    `mprotect_prot` STRING COMMENT 'mprotect_prot',
    `owner_pid` STRING COMMENT 'owner_pid',
    `owner_file` STRING COMMENT 'owner_file',
    `vm_file` STRING COMMENT 'vm_file',
    `status` STRING COMMENT 'ssh登陆状态',
    `types` STRING COMMENT 'ssh登陆类型',
    `invalid` STRING COMMENT 'ssh用户是否存在',
    `rawlog` STRING COMMENT 'ssh登陆日志',
    `authorized` STRING COMMENT 'kerberos登陆的linux用户名',
    `principal` STRING COMMENT 'kerberos登陆的kerb用户名',
    `user` STRING COMMENT 'ssh登陆的linux用户名',
    `ex_ipv4_list` STRING COMMENT '外网IPv4列表',
    `ex_ipv6_list` STRING COMMENT '外网IPv6列表',
    `in_ipv4_list` STRING COMMENT '内网IPv4列表',
    `in_ipv6_list` STRING COMMENT '内网IPv6列表',
    `version` STRING COMMENT 'HIDS版本号',
    `sid` STRING COMMENT '进程会话sid',
    `addr` STRING COMMENT 'addr',
    `target_argv` STRING COMMENT 'ptrace被调试进程的argv',
    `old_username` STRING COMMENT '提权前用户名',
    `lkm_hash` STRING COMMENT '内核模块哈希 ',
    `timestamp` STRING COMMENT '时间戳',
    `time_pkg` STRING COMMENT '上传时间',
    `tags` STRING COMMENT '机器标签',
    `ko_file` STRING COMMENT '新增内核模块名',
    `flags` STRING COMMENT 'memfd - flags',
    `fdname` STRING COMMENT 'memfd - flags',
    `new_name` STRING COMMENT 'mv/link/prctl 新文件名',
    `old_name` STRING COMMENT 'mv/link 旧文件名',
    `option` STRING COMMENT 'prctl',
    `pns` STRING COMMENT 'pid_namespace',
    `root_pns` STRING COMMENT 'root fs pid_namespace',
    `psm` STRING COMMENT 'pod psm',
    `pod_name` STRING COMMENT 'pod name',
    `sandbox_task_id` STRING COMMENT '沙箱任务id(含蜜罐)',
    `sandbox_task_type` STRING COMMENT '沙箱类型(含蜜罐)',
    `cpu` STRING COMMENT 'CPU Heartbeat',
    `io` STRING COMMENT 'IO Heartbeat',
    `kernel_version` STRING COMMENT 'kernel_version -- heartbeat',
    `memory` STRING COMMENT 'memory -- heartbeat',
    `net_type` STRING COMMENT 'net_type -- heartbeat',
    `platform` STRING COMMENT 'platform  -- heartbeat',
    `platform_version` STRING COMMENT 'platform_version -- heartbeat',
    `plugins` STRING COMMENT 'plugins  -- heartbeat',
    `slab` STRING COMMENT 'slab  -- heartbeat',
    `exe_size` STRING COMMENT 'exe 大小,静态检测',
    `psm_name` STRING COMMENT 'psm_name',
    `psm_path` STRING COMMENT 'psm_path',
    `product` STRING COMMENT '产品类型',
    `mode` STRING COMMENT 'mode ',
    `file` STRING COMMENT 'file ',
    `sec` STRING COMMENT 'sec ',
    `nsec` STRING COMMENT 'nsec ',
    `sig` STRING COMMENT 'sig',
    `dev` STRING COMMENT 'dev',
    `fstype` STRING COMMENT 'fstype',
    `wait` STRING COMMENT 'wait',
    `sb_id` STRING COMMENT 'sb_id',
    `manufacturer` STRING COMMENT 'manufacturer',
    `serial` STRING COMMENT 'serial',
    `action` STRING COMMENT 'action',
    `dpid` STRING COMMENT 'dpid',
    `dpid_argv` STRING COMMENT 'dpid_argv',
    `cred` STRING COMMENT 'cred',
    `dcred` STRING COMMENT 'dcred',
    `name` STRING COMMENT 'name',
    `idc` STRING COMMENT 'idc',
    `platform_family` STRING COMMENT 'platform_family',
    `net_mode` STRING COMMENT 'net mode ',
    `boot_at` STRING COMMENT 'boot_at',
    `started_at` STRING COMMENT 'started_at'
) COMMENT '全量数据' PARTITIONED BY (
    `date` STRING COMMENT 'date', 
    `hour` STRING COMMENT 'hour'
) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' WITH SERDEPROPERTIES ('serialization.format' = '1') STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' TBLPROPERTIES (
    'transient_lastDdlTime' = '1649647413',
    'primary_key' = ''
)

这里需要注意的是,Elkeid数据中并不存在直接的date / hour字段 因此可以先对Elkeid数据所在HDFS分区进行外表建立 然后从外表将数据导入内表分区时手动设置partition

  1. ALTER TABLE hids_rawdata ADD PARTITION (date='2008-08-08', hour='08')
  2. LOAD DATA INPATH '/user/hids_data' INTO TABLE hids_rawdata PARTITION(dt='2008-08- 08', hour='08');

持久化存储预估资源

1 台健康Elkeid Agent在普遍业务场景下每小时产出数据 20MB左右

对于需要日志长期持久化的场景,这里给出存储需求列表供参考

机器存储天数1副本2副本3副本
100台30天2TB4TB6TB
100台180天9TB18TB27TB
100台365天18TB36TB54TB
10000台30天144TB288TB432TB
10000台180天864TB1728TB2592TB
10000台365天1752TB3504TB5256TB