clickhouse日志检索平台

大多数情况下,日志检索平台都是使用ELK,除了ELK外,还可以使用clickhouse,架构如下:

  • filebeat:负责采集日志
  • kafa:filebeat采集后发送到kafka中
  • clickhouse:通过自带的kafka引擎从kafka中获取数据
  • clickvisual:一个类kibana的展示平台,用于日志展示和查看,告警等

部署

filebeat、kafka部署可参考链接filebeat和kafka部署,除了使用filebeat,也可以使用categraf等工具

clickhouse部署可参考链接:clickhouse部署

关于如何配置filebeat采集日志并发送到kafka,可参考elk章节,本例子中只演示clickhouse如何消费kafka的数据,并通过clickvisual来展示

1、首先查看kafka中的消息格式,如下:

{"@timestamp":"2024-01-11T01:15:58.938Z","@metadata":{"beat":"filebeat","type":"_doc","version":"7.15.2"},"log":{"offset":295369,"file":{"path":"/var/log/messages"}},"message":"Jan 11 09:15:54 localhost systemd: Started OpenSSH server daemon.","tags":["message-access"],"fields":{"kafka_topic":"message_access_log"}}

上述例子采集的是/var/log/messages中的数据,从json数据看出,如果我们只是查看日志,主要用到的就是timestamp、message、path三个字段

2、clickhouse如何要消费kakfa中的数据并持久化存储,主要涉及三个步骤:

  • 创建一个持久化存储的表
  • 获取kafka中数据的引擎表,此表select一次后数据就不存在了
  • 数据转换视图,用于将引擎表中的数据持久化到存储表中

注:下面创建的表默认都是在default库中创建的,如果要在其他库创建,需要指定库名.表名

(1)、创建持久化存储表event2(表名可自定义的),命令如下:

CREATE TABLE IF NOT EXISTS event2
(
    id UUID DEFAULT generateUUIDv4(),
    timestamp  String Codec(ZSTD),
    message    LowCardinality(String),
    path       String Codec(ZSTD),
    datetime       DateTime Default now(),
    PRIMARY KEY (id)
) Engine = MergeTree
ORDER BY (id);
  • id:定义一个id字段,每次获取数据后此id都是不同的(不定义id也可以,自己随意)
  • timestamp:此字段用于存储kafka中的timestamp数据
  • message:此字段用于存储kafka中的message 数据
  • path:此字段用户存储kafka中的path数据
  • datatime:定时时间字段,默认为当前时间,也可不定义,随意

在clickhouse-client命令行中执行即可,如图:

注意:这个表名event2一般可定义为日志名字,比如nginx_access.log,因为这名字后续需要在clickvisual中显示的,如果不清楚的显示名字,会导致很难知道是哪个日志

(2)、创建引擎表json_queue2,内容如下:

CREATE TABLE IF NOT EXISTS json_queue2 (
      all String
) ENGINE = Kafka
SETTINGS kafka_broker_list = '192.168.49.82:9092',
       kafka_topic_list = 'message_access_log',
       kafka_group_name = 'message_access_log',
       kafka_format = 'JSONAsString',
       kafka_skip_broken_messages = 10000,
       kafka_max_block_size = 1048576;
  • all String:自定义列名为all(与视图中的all要一致),类型为String
  • kafka_broker_list:指定kafka地址,多个地址之间使用逗号分开
  • kafka_topic_list: 指定kafka的topic,要与filebeat中定义的一样
  • kafka_group_name:指定kafka组名,可自定义
  • kafka_format:指定从kafka中读取的数据格式,本例是JSON字符串,除此之外还有CSV格式
  • kafka_skip_broken_messages:遇到无法解析的消息是,跳过10000条消息,这个可自定义配置,配置100也行
  • kafka_max_block_size:从kafka中读取的块大小(以字节为单位),这里是1M

在clickhouse-client的命令行终端执行,如图:

(3)、创建数据转换视图json_mv2,执行命令如下:

CREATE MATERIALIZED VIEW json_mv2 TO event2 AS
SELECT
    JSONExtract(all, '@timestamp', 'String') AS timestamp,
    JSONExtract(all, 'message', 'String') AS message,
    JSONExtractString(all,'log','file','path') AS path
FROM json_queue2;
  • TO event2:指定物化视图的目标表为event2
  • JSONExtract(all, ‘@timestamp’, ‘String’) AS timestamp:从所有的json数据(all)中提取字段@timestamp,并解析为String类型的timestamp(这个timestamp对应的就是event2中的timestamp)
  • JSONExtract(all, ‘message’, ‘String’) AS message:与上面用法一样,解析为message
  • JSONExtractString(all,’log’,’file’,’path’) AS path:从所有的json数据(all)中提取log字段下的file字段下的path字段,并解析为path

注:引擎表中的字段如果为all,那么视图中的也是all,如果变都要变,要保持一尺

JSONExtract 和 JSONExtractString 有什么区别?

JSONExtract 函数用于提取 JSON 字段的值,并将其转换为指定的数据类型,语法如下:

JSONExtract(json, field, type) #从json中提取字段field并解析为type类型

JSONExtractString 函数用于提取 JSON 字段的值,并将其作为字符串返回。语法如下:

JSONExtractString(json, field)    #从json中提取字段field作为字符串返回
JSONExtractString(json, field,field1,field2) #提取{"field":{"field1":{"field2":"aaa"}}}中的field2

在clickhouse-client的命令行执行即可,如图:

3、验证是否正常消费到了kafka中的数据

在机器上重启sshd,messages中会产生日志并被filebeat收集然后发送到kafka,我们查看表event2中,看是否获取到,如图:

从上图可以看到,数据已经存储到了clickhouse的event2表中,message就是日志信息

部署ClickVisual用于展示日志

本例子中使用容器方式来运行的,直接docker pull clickvisual/clickvisual拉取即可,官方地址如下:

https://clickvisual.net/zh/clickvisual/02install/quick-start.html

先运行起来一个临时容器,先把里面的配置文件拷贝出来,三个文件如下:

  • docker.toml #docker cp clickvisual:/clickvisual/config .
  • rbac.conf
  • resource.yaml

1、修改配置文件docker.toml,配置MySQL地址,如图:

2、登录MySQL机器49.83,创建数据库,如下:

create database clickvisual

3、启动clickvisual , 再把配置文件挂载进去,执行命令如下:

docker run -d --name clickvisual -e EGO_CONFIG_PATH=/clickvisual/config/docker.toml -e EGO_LOG_WRITER=stderr -p 19001:19001   -v /data/clickvisual/config:/clickvisual/config clickvisual/clickvisual

4、通过IP地址和端口19001访问,可以看到初始化界面,如图:

5、点击初始化并成功后即可跳转到登录界面,如图:

默认的登录账号和密码都是clickvisual

6、登录后点击系统管理–实例管理-新增实例,如图:

注:上图中9000后没有指定任何数据库,也是可以测试成功并连接的

7、在实例上右键–接入已有日志库,日志库选择event2,如图:

  • 数据是存储在event2中,因此这里数据源只选择event2即可
  • 时间解析字段为必选,因此要求event2中要有这个字段datetime

8、重启chronyd,刷新界面,即可看到日志信息,如图:

9、快速检索所需字段,可在查询语句中输入如下语句,如图:

`message` like '%sshd%'      #模糊匹配,查询结果以橙色显示

到此已经完成了从数据采集到最后的展示

注:如果采集多个日志,并且kafka中有多个topic,那么就需要在clickhouse中创建多个引擎表和存储表,最后在clickvisual中添加日志源即可,参照上述方法即可

常见错误:

如果clickhouse中获取不到kafka中数据,并且日志中报错如下:

<Error> void DB::StorageKafka::threadFunc(size_t): Code: 117. DB::Exception: JSON object must begin with '{'.: (Input format doesn't allow to skip errors): (at row 1)

此时说明topic中的数据可能有异常,先将topic删除,然后等待filebeat重新创建并写入即可

标签