目录

pgsql-33 实时数据分析系统

33 - 实时数据分析系统

1. 📖 概述

使用PostgreSQL构建实时数据分析系统,包括用户行为分析、实时报表等。

2. 📊 事件表设计

-- 用户行为事件表
CREATE TABLE events (
    id BIGSERIAL PRIMARY KEY,
    event_type VARCHAR(50) NOT NULL,
    user_id INTEGER,
    session_id UUID,
    properties JSONB,
    created_at TIMESTAMPTZ DEFAULT NOW()
) PARTITION BY RANGE (created_at);

-- 按月分区
CREATE TABLE events_2024_01 PARTITION OF events
    FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');

CREATE TABLE events_2024_02 PARTITION OF events
    FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');

-- 索引
CREATE INDEX idx_events_type ON events(event_type);
CREATE INDEX idx_events_user ON events(user_id);
CREATE INDEX idx_events_created ON events(created_at);
CREATE INDEX idx_events_props ON events USING GIN(properties);

3. 📈 实时聚合

3.1 使用物化视图

-- 实时用户统计
CREATE MATERIALIZED VIEW realtime_user_stats AS
SELECT
    DATE_TRUNC('hour', created_at) AS hour,
    event_type,
    COUNT(DISTINCT user_id) AS unique_users,
    COUNT(*) AS event_count
FROM events
WHERE created_at >= NOW() - INTERVAL '24 hours'
GROUP BY DATE_TRUNC('hour', created_at), event_type;

CREATE UNIQUE INDEX ON realtime_user_stats(hour, event_type);

-- 定时刷新(每分钟)
SELECT cron.schedule('refresh-stats', '* * * * *',
    'REFRESH MATERIALIZED VIEW CONCURRENTLY realtime_user_stats');

4. 🔥 漏斗分析

-- 转化漏斗
WITH funnel AS (
    SELECT
        COUNT(DISTINCT CASE WHEN event_type = 'page_view' THEN user_id END) AS viewed,
        COUNT(DISTINCT CASE WHEN event_type = 'add_to_cart' THEN user_id END) AS added,
        COUNT(DISTINCT CASE WHEN event_type = 'checkout' THEN user_id END) AS checked_out,
        COUNT(DISTINCT CASE WHEN event_type = 'purchase' THEN user_id END) AS purchased
    FROM events
    WHERE created_at >= NOW() - INTERVAL '7 days'
)
SELECT
    viewed,
    added,
    ROUND(100.0 * added / viewed, 2) AS add_rate,
    checked_out,
    ROUND(100.0 * checked_out / added, 2) AS checkout_rate,
    purchased,
    ROUND(100.0 * purchased / checked_out, 2) AS purchase_rate
FROM funnel;

5. 🎯 用户留存分析

-- 留存率计算
WITH cohorts AS (
    SELECT
        user_id,
        DATE_TRUNC('week', MIN(created_at)) AS cohort_week
    FROM events
    GROUP BY user_id
),
user_activity AS (
    SELECT
        user_id,
        DATE_TRUNC('week', created_at) AS activity_week
    FROM events
    GROUP BY user_id, DATE_TRUNC('week', created_at)
)
SELECT
    c.cohort_week,
    ua.activity_week,
    EXTRACT(WEEK FROM AGE(ua.activity_week, c.cohort_week)) AS weeks_since_signup,
    COUNT(DISTINCT ua.user_id) AS active_users,
    (SELECT COUNT(*) FROM cohorts WHERE cohort_week = c.cohort_week) AS cohort_size,
    ROUND(100.0 * COUNT(DISTINCT ua.user_id) /
          (SELECT COUNT(*) FROM cohorts WHERE cohort_week = c.cohort_week), 2) AS retention_rate
FROM cohorts c
LEFT JOIN user_activity ua ON c.user_id = ua.user_id
GROUP BY c.cohort_week, ua.activity_week
ORDER BY c.cohort_week, ua.activity_week;

6. 📚 下一步

学习Go语言连接PostgreSQL


关键要点:

  • 使用分区表存储海量事件数据
  • 物化视图提供实时报表
  • JSONB灵活存储事件属性
  • 窗口函数分析用户行为