pgsql-33 实时数据分析系统
目录
33 - 实时数据分析系统
1. 📖 概述
使用PostgreSQL构建实时数据分析系统,包括用户行为分析、实时报表等。
2. 📊 事件表设计
-- 用户行为事件表
CREATE TABLE events (
id BIGSERIAL PRIMARY KEY,
event_type VARCHAR(50) NOT NULL,
user_id INTEGER,
session_id UUID,
properties JSONB,
created_at TIMESTAMPTZ DEFAULT NOW()
) PARTITION BY RANGE (created_at);
-- 按月分区
CREATE TABLE events_2024_01 PARTITION OF events
FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
CREATE TABLE events_2024_02 PARTITION OF events
FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');
-- 索引
CREATE INDEX idx_events_type ON events(event_type);
CREATE INDEX idx_events_user ON events(user_id);
CREATE INDEX idx_events_created ON events(created_at);
CREATE INDEX idx_events_props ON events USING GIN(properties);
3. 📈 实时聚合
3.1 使用物化视图
-- 实时用户统计
CREATE MATERIALIZED VIEW realtime_user_stats AS
SELECT
DATE_TRUNC('hour', created_at) AS hour,
event_type,
COUNT(DISTINCT user_id) AS unique_users,
COUNT(*) AS event_count
FROM events
WHERE created_at >= NOW() - INTERVAL '24 hours'
GROUP BY DATE_TRUNC('hour', created_at), event_type;
CREATE UNIQUE INDEX ON realtime_user_stats(hour, event_type);
-- 定时刷新(每分钟)
SELECT cron.schedule('refresh-stats', '* * * * *',
'REFRESH MATERIALIZED VIEW CONCURRENTLY realtime_user_stats');
4. 🔥 漏斗分析
-- 转化漏斗
WITH funnel AS (
SELECT
COUNT(DISTINCT CASE WHEN event_type = 'page_view' THEN user_id END) AS viewed,
COUNT(DISTINCT CASE WHEN event_type = 'add_to_cart' THEN user_id END) AS added,
COUNT(DISTINCT CASE WHEN event_type = 'checkout' THEN user_id END) AS checked_out,
COUNT(DISTINCT CASE WHEN event_type = 'purchase' THEN user_id END) AS purchased
FROM events
WHERE created_at >= NOW() - INTERVAL '7 days'
)
SELECT
viewed,
added,
ROUND(100.0 * added / viewed, 2) AS add_rate,
checked_out,
ROUND(100.0 * checked_out / added, 2) AS checkout_rate,
purchased,
ROUND(100.0 * purchased / checked_out, 2) AS purchase_rate
FROM funnel;
5. 🎯 用户留存分析
-- 留存率计算
WITH cohorts AS (
SELECT
user_id,
DATE_TRUNC('week', MIN(created_at)) AS cohort_week
FROM events
GROUP BY user_id
),
user_activity AS (
SELECT
user_id,
DATE_TRUNC('week', created_at) AS activity_week
FROM events
GROUP BY user_id, DATE_TRUNC('week', created_at)
)
SELECT
c.cohort_week,
ua.activity_week,
EXTRACT(WEEK FROM AGE(ua.activity_week, c.cohort_week)) AS weeks_since_signup,
COUNT(DISTINCT ua.user_id) AS active_users,
(SELECT COUNT(*) FROM cohorts WHERE cohort_week = c.cohort_week) AS cohort_size,
ROUND(100.0 * COUNT(DISTINCT ua.user_id) /
(SELECT COUNT(*) FROM cohorts WHERE cohort_week = c.cohort_week), 2) AS retention_rate
FROM cohorts c
LEFT JOIN user_activity ua ON c.user_id = ua.user_id
GROUP BY c.cohort_week, ua.activity_week
ORDER BY c.cohort_week, ua.activity_week;
6. 📚 下一步
关键要点:
- 使用分区表存储海量事件数据
- 物化视图提供实时报表
- JSONB灵活存储事件属性
- 窗口函数分析用户行为
xingliuhua