概述
GreptimeDB 提供了一个持续聚合功能,允许你实时聚合数据。这个功能在当你需要实时计算和查询总和、平均值或其他聚合时非常有用。持续聚合功能由 Flow 引擎提供。它不断地基于传入的数据更新聚合数据并将其实现。因此,你可以将其视为一个聪明的物化视图,它知道何时更新结果视图表以及如何以最小的努力更新它。一些常见的用例包括:
- 下采样数据点,使用如平均池化等方法减少存储和分析的数据量
- 提供近实时分析,提供可操作的信息
当你将数据插入 source 表时,数据也会被发送到 Flow 引擎并存储在其中。 Flow 引擎通过时间窗口计算聚合并将结果存储在目标表中。 整个过程如下图所示:
快速开始示例
以下是持续聚合查询的一个完整示例。
这个例子是根据输入表中的数据计算一系列统计数据,包括一分钟时间窗口内的总日志数、最小大小、最大大小、平均大小以及大小大于 550 的数据包数。
首先,创建一个 source 表 ngx_access_log 和一个 sink 表 ngx_statistics,如下所示:
CREATE TABLE `ngx_access_log` (
  `client` STRING NULL,
  `ua_platform` STRING NULL,
  `referer` STRING NULL,
  `method` STRING NULL,
  `endpoint` STRING NULL,
  `trace_id` STRING NULL FULLTEXT,
  `protocol` STRING NULL,
  `status` SMALLINT UNSIGNED NULL,
  `size` DOUBLE NULL,
  `agent` STRING NULL,
  `access_time` TIMESTAMP(3) NOT NULL,
  TIME INDEX (`access_time`)
)
WITH(
  append_mode = 'true'
);
CREATE TABLE `ngx_statistics` (
  `status` SMALLINT UNSIGNED NULL,
  `total_logs` BIGINT NULL,
  `min_size` DOUBLE NULL,
  `max_size` DOUBLE NULL,
  `avg_size` DOUBLE NULL,
  `high_size_count` BIGINT NULL,
  `time_window` TIMESTAMP time index,
  `update_at` TIMESTAMP NULL,
  PRIMARY KEY (`status`)
);
然后创建名为 ngx_aggregation 的 flow 任务,包括 count、min、max、avg size 列的聚合函数,以及大于 550 的所有数据包的大小总和。聚合是在 access_time 列的 1 分钟固定窗口中计算的,并且还按 status 列分组。因此,你可以实时了解有关数据包大小和对其的操作的信息,例如,如果 high_size_count 在某个时间点变得太高,你可以进一步检查是否有任何问题,或者如果 max_size 列在 1 分钟时间窗口内突然激增,你可以尝试定位该数据包并进一步检查。
CREATE FLOW ngx_aggregation
SINK TO ngx_statistics
AS
SELECT
    status,
    count(client) AS total_logs,
    min(size) as min_size,
    max(size) as max_size,
    avg(size) as avg_size,
    sum(case when `size` > 550 then 1 else 0 end) as high_size_count,
    date_bin(INTERVAL '1 minutes', access_time) as time_window,
FROM ngx_access_log
GROUP BY
    status,
    time_window;
要检查持续聚合是否正常工作,首先插入一些数据到源表 ngx_access_log 中。
INSERT INTO ngx_access_log 
VALUES
    ("android", "Android", "referer", "GET", "/api/v1", "trace_id", "HTTP", 200, 1000, "agent", "2021-07-01 00:00:01.000"),
    ("ios", "iOS", "referer", "GET", "/api/v1", "trace_id", "HTTP", 200, 500, "agent", "2021-07-01 00:00:30.500"),
    ("android", "Android", "referer", "GET", "/api/v1", "trace_id", "HTTP", 200, 600, "agent", "2021-07-01 00:01:01.000"),
    ("ios", "iOS", "referer", "GET", "/api/v1", "trace_id", "HTTP", 404, 700, "agent", "2021-07-01 00:01:01.500");
则 ngx_access_log 表将被增量更新以包含以下数据:
SELECT * FROM ngx_statistics;
 status | total_logs | min_size | max_size | avg_size | high_size_count |        time_window         |         update_at          
--------+------------+----------+----------+----------+-----------------+----------------------------+----------------------------
    200 |          2 |      500 |     1000 |      750 |               1 | 2021-07-01 00:00:00.000000 | 2024-07-24 08:36:17.439000
    200 |          1 |      600 |      600 |      600 |               1 | 2021-07-01 00:01:00.000000 | 2024-07-24 08:36:17.439000
    404 |          1 |      700 |      700 |      700 |               1 | 2021-07-01 00:01:00.000000 | 2024-07-24 08:36:17.439000
(3 rows)
尝试向 ngx_access_log 表中插入更多数据:
INSERT INTO ngx_access_log
VALUES
    ("android", "Android", "referer", "GET", "/api/v1", "trace_id", "HTTP", 200, 500, "agent", "2021-07-01 00:01:01.000"),
    ("ios", "iOS", "referer", "GET", "/api/v1", "trace_id", "HTTP", 404, 800, "agent", "2021-07-01 00:01:01.500");
结果表 ngx_statistics 将被增量更新,注意 max_size、avg_size 和 high_size_count 是如何更新的:
SELECT * FROM ngx_statistics;
 status | total_logs | min_size | max_size | avg_size | high_size_count |        time_window         |         update_at          
--------+------------+----------+----------+----------+-----------------+----------------------------+----------------------------
    200 |          2 |      500 |     1000 |      750 |               1 | 2021-07-01 00:00:00.000000 | 2024-07-24 08:36:17.439000
    200 |          2 |      500 |      600 |      550 |               1 | 2021-07-01 00:01:00.000000 | 2024-07-24 08:36:46.495000
    404 |          2 |      700 |      800 |      750 |               2 | 2021-07-01 00:01:00.000000 | 2024-07-24 08:36:46.495000
(3 rows)
ngx_statistics 表中的列解释如下:
- status: HTTP 响应的状态码。
- total_logs: 相同状态码的日志总数。
- min_size: 相同状态码的数据包的最小大小。
- max_size: 相同状态码的数据包的最大大小。
- avg_size: 相同状态码的数据包的平均大小。
- high_size_count: 包大小大于 550 的数据包数。
- time_window: 聚合的时间窗口。
- update_at: 聚合结果更新的时间。
下一步
恭喜你已经初步了解了持续聚合功能。 请参考以下章节了解更多: