工程实践 BigData

实战项目:用户行为数据处理与分析平台

构建一个完整的用户行为数据处理与分析平台:

发布于 2026/02/26 2 分钟

撰写时间:2026年2月 作者:Bobot 🦐

🎯 本章目标:通过一个完整项目,整合 Hadoop、Hive、Kafka、Spark 等技术


一、项目概述

1.1 业务场景

构建一个完整的用户行为数据处理与分析平台

用户行为数据流

App/Web ──▶ Nginx ──▶ Kafka ──▶ Spark Streaming ──▶ HDFS

                              └──▶ Flink ──▶ Redis ──▶ 实时大屏

Hive ──▶ Spark SQL ──▶ Data Warehouse ──▶ BI 报表 ──▶ 管理后台

1.2 数据说明

用户行为数据

字段类型说明
user_idString用户ID
session_idString会话ID
event_typeString事件类型(pv/click/addcart/buy/favorite)
item_idString商品ID
item_categoryString商品类目
timestampLong时间戳
device_typeString设备类型
ipStringIP地址
provinceString省份

1.3 技术架构

层级技术选型
消息队列Kafka
实时计算Spark Streaming + Flink
离线计算Hive + Spark SQL
存储HDFS + Redis
调度Airflow

二、环境准备

2.1 Docker 快速启动

# 启动所有服务
version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.4.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.4.0
    depends_on: [zookeeper]
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092

  hadoop:
    image: bde2020/hadoop-namenode:2.0.0-hadoop3.2.1-java8
    ports:
      - "9870:9870"  # HDFS Web UI
      - "8088:8088"  # YARN Web UI

  hive:
    image: bde2020/hive:2.3.9-with-metastore-postgresql
    depends_on: [hadoop]
    ports:
      - "10000:10000"  # HiveServer2

  spark:
    image: bitnami/spark:3.2.1
    ports:
      - "8080:8080"

2.2 目录结构

user-behavior-platform/
├── src/
│   ├── main/
│   │   ├── java/
│   │   │   └── com/
│   │   │       └── bigdata/
│   │   │           ├── producer/
│   │   │           │   └── UserEventProducer.java
│   │   │           ├── streaming/
│   │   │           │   ├── SparkStreamingJob.java
│   │   │           │   └── FlinkJob.java
│   │   │           ├── etl/
│   │   │           │   └── HiveETL.java
│   │   │           └── sink/
│   │   │               └── DataSink.java
│   │   └── resources/
│   │       └── application.yml
│   └── test/
├── scripts/
│   ├── create_tables.sql
│   └── run_etl.sh
└── pom.xml

三、数据采集:Kafka Producer

3.1 模拟数据生成

// UserEventProducer.java
public class UserEventProducer {

    private static final String[] EVENT_TYPES = {"pv", "click", "addcart", "buy", "favorite"};
    private static final String[] CATEGORIES = {"手机", "电脑", "服装", "食品", "图书"};
    private static final String[] PROVINCES = {"北京", "上海", "广东", "浙江", "江苏"};
    private static final String[] DEVICES = {"iOS", "Android", "PC", "Mobile"};

    private static final Random random = new Random();

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        long userIdBase = 1000000;
        long itemIdBase = 500000;

        while (true) {
            // 模拟用户行为
            UserEvent event = UserEvent.builder()
                .userId("U" + (userIdBase + random.nextInt(100000)))
                .sessionId("S" + UUID.randomUUID().toString().substring(0, 8))
                .eventType(EVENT_TYPES[random.nextInt(EVENT_TYPES.length)])
                .itemId("I" + (itemIdBase + random.nextInt(50000)))
                .itemCategory(CATEGORIES[random.nextInt(CATEGORIES.length)])
                .timestamp(System.currentTimeMillis())
                .deviceType(DEVICES[random.nextInt(DEVICES.length)])
                .province(PROVINCES[random.nextInt(PROVINCES.length)])
                .build();

            String json = JSON.toJSONString(event);
            producer.send(new ProducerRecord<>("user_events", event.getUserId(), json));

            // 模拟每秒产生 100 条
            Thread.sleep(10);
        }
    }
}

@Data
@Builder
class UserEvent {
    private String userId;
    private String sessionId;
    private String eventType;
    private String itemId;
    private String itemCategory;
    private long timestamp;
    private String deviceType;
    private String province;
}

3.2 启动 Kafka Topic

# 创建 Topic
kafka-topics.sh --create \
    --topic user_events \
    --partitions 6 \
    --replication-factor 1 \
    --bootstrap-server localhost:9092

# 查看 Topic
kafka-topics.sh --list --bootstrap-server localhost:9092

四、实时计算:Spark Streaming

4.1 实时统计需求

  1. 实时 PV/UV:每 10 秒统计
  2. 实时类目销售:每 30 秒统计
  3. 实时用户行为:写入 HDFS

4.2 Spark Streaming 作业

// SparkStreamingJob.java
public class SparkStreamingJob {

    public static void main(String[] args) throws Exception {
        SparkConf conf = new SparkConf()
            .setAppName("UserBehaviorAnalytics")
            .setMaster("local[*]");

        JavaStreamingContext ssc = new JavaStreamingContext(
            conf, Durations.seconds(10));

        ssc.checkpoint("/tmp/spark-checkpoint");

        // 1. 消费 Kafka
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers", "localhost:9092");
        kafkaParams.put("group.id", "spark-streaming-group");
        kafkaParams.put("auto.offset.reset", "latest");
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);

        JavaInputDStream<ConsumerRecord<String, String>> stream =
            KafkaUtils.createDirectStream(
                ssc,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.Subscribe(
                    Arrays.asList("user_events"),
                    kafkaParams
                )
            );

        // 2. 解析 JSON
        JavaDStream<UserEvent> events = stream
            .map(record -> JSON.parseObject(record.value(), UserEvent.class));

        // 3. 实时 PV/UV 统计(每 10 秒窗口)
        JavaPairDStream<String, Integer> pvStats = events
            .mapToPair(event -> new Tuple2<>("pv", 1))
            .reduceByKeyAndWindow((a, b) -> a + b, Durations.seconds(10));

        JavaPairDStream<String, Integer> uvStats = events
            .mapToPair(event -> new Tuple2<>(event.getUserId(), 1))
            .reduceByKeyAndWindow((a, b) -> a + b, Durations.seconds(10))
            .mapToPair(tuple -> new Tuple2<>("uv", 1));

        // 4. 实时类目销售统计
        JavaPairDStream<String, Integer> categoryStats = events
            .filter(event -> "buy".equals(event.getEventType()))
            .mapToPair(event -> new Tuple2<>(event.getItemCategory(), 1))
            .reduceByKeyAndWindow((a, b) -> a + b, Durations.seconds(30));

        // 5. 输出到 Redis
        pvStats.foreachRDD(rdd -> {
            if (!rdd.isEmpty()) {
                rdd.collectAsMap().forEach((k, v) -> {
                    jedis.incrBy("real_time_pv", v);
                });
            }
        });

        categoryStats.foreachRDD(rdd -> {
            if (!rdd.isEmpty()) {
                rdd.collectAsMap().forEach((category, count) -> {
                    jedis.hincrBy("category_sales", category, count);
                });
            }
        });

        // 6. 写入 HDFS(按批次)
        events.foreachRDD((rdd, time) -> {
            if (!rdd.isEmpty()) {
                JavaRDD<String> jsonRDD = rdd.map(JSON::toJSONString);
                String path = String.format(
                    "/user/hive/warehouse/user_events/dt=%s/%d",
                    new SimpleDateFormat("yyyy-MM-dd").format(new Date()),
                    time.milliseconds()
                );
                jsonRDD.saveAsTextFile(path);
            }
        });

        // 打印统计
        pvStats.print();
        uvStats.print();
        categoryStats.print();

        ssc.start();
        ssc.awaitTermination();
    }
}

5.1 实时大屏需求

  1. 今日 GMV:实时累计
  2. 实时订单数:实时累计
  3. Top 10 类目:最近 1 小时
  4. 实时热卖商品:最近 30 分钟
// FlinkJob.java
public class FlinkJob {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(30000);
        env.setParallelism(4);

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // 1. 创建 Kafka 表
        tEnv.executeSql("""
            CREATE TEMPORARY VIEW user_events (
                user_id STRING,
                session_id STRING,
                event_type STRING,
                item_id STRING,
                item_category STRING,
                `timestamp` BIGINT,
                device_type STRING,
                province STRING,
                event_time AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp` / 1000)),
                WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
            ) WITH (
                'connector' = 'kafka',
                'topic' = 'user_events',
                'properties.bootstrap.servers' = 'localhost:9092',
                'properties.group.id' = 'flink-group',
                'format' = 'json'
            )
            """);

        // 2. 实时 GMV
        Table gmvTable = tEnv.sqlQuery("""
            SELECT
                SUM(CASE WHEN event_type = 'buy' THEN 1 ELSE 0 END) AS order_count,
                COUNT(DISTINCT user_id) AS buyer_count,
                TUMBLE_START(event_time, INTERVAL '1' DAY) AS window_start
            FROM user_events
            WHERE event_time >= TIMESTAMP '1970-01-01 00:00:00'
            GROUP BY TUMBLE(event_time, INTERVAL '1' DAY)
            """);

        // 3. Top 10 类目(最近 1 小时)
        Table categoryTable = tEnv.sqlQuery("""
            SELECT
                item_category,
                COUNT(*) AS cnt
            FROM user_events
            WHERE event_time > CURRENT_TIMESTAMP - INTERVAL '1' HOUR
            GROUP BY item_category
            ORDER BY cnt DESC
            LIMIT 10
            """);

        // 4. 实时热卖商品(最近 30 分钟)
        Table hotItemTable = tEnv.sqlQuery("""
            SELECT
                item_id,
                COUNT(*) AS buy_count
            FROM user_events
            WHERE event_type = 'buy'
                AND event_time > CURRENT_TIMESTAMP - INTERVAL '30' MINUTE
            GROUP BY item_id
            ORDER BY buy_count DESC
            LIMIT 20
            """);

        // 5. 输出到 Redis
        TableSchema schema = TableSchema.builder()
            .field("category", DataTypes.STRING())
            .field("cnt", DataTypes.BIGINT())
            .build();

        tEnv.executeSql("""
            CREATE TEMPORARY TABLE category_sink (
                category STRING,
                cnt BIGINT
            ) WITH (
                'connector' = 'redis',
                'hostname' = 'localhost',
                'port' = '6379',
                'format' = 'json'
            )
            """);

        categoryTable.executeInsert("category_sink");

        env.execute("Real-time Dashboard Job");
    }
}

六、离线分析:Hive + Spark

6.1 数仓分层设计

数仓分层

ODS (原始数据层)
  └── user_events_raw

DWD (明细层)
  └── user_events_detail

DWS (汇总层)
  ├── dws_user_daily    -- 用户每日汇总
  ├── dws_item_daily    -- 商品每日汇总
  └── dws_category_daily -- 类目每日汇总

ADS (应用层)
  └── ads_report_*      -- 报表数据

6.2 建表语句

-- ODS 层:原始数据
CREATE EXTERNAL TABLE user_events_raw (
    user_id STRING,
    session_id STRING,
    event_type STRING,
    item_id STRING,
    item_category STRING,
    `timestamp` BIGINT,
    device_type STRING,
    province STRING
)
PARTITIONED BY (dt STRING)
STORED AS PARQUET
LOCATION '/user/hive/warehouse/user_events_raw';

-- DWD 层:明细数据(清洗后)
CREATE TABLE user_events_detail (
    user_id STRING,
    session_id STRING,
    event_type STRING,
    item_id STRING,
    item_category STRING,
    event_time TIMESTAMP,
    device_type STRING,
    province STRING,
    hour INT,
    date_date DATE
)
STORED AS PARQUET;

-- DWS 层:用户每日汇总
CREATE TABLE dws_user_daily (
    user_id STRING,
    date_date DATE,
    pv INT,
    click INT,
    cart INT,
    buy INT,
    favorite INT,
    uv_first_time TIMESTAMP
)
STORED AS PARQUET;

-- ADS 层:报表数据
CREATE TABLE ads_daily_report (
    date_date DATE,
    total_pv BIGINT,
    total_uv BIGINT,
    total_order BIGINT,
    total_gmv DECIMAL(15, 2),
    conversion_rate DECIMAL(10, 4)
)
STORED AS PARQUET;

6.3 ETL 作业

-- Step 1: ODS -> DWD 数据清洗
INSERT OVERWRITE TABLE user_events_detail
SELECT
    user_id,
    session_id,
    event_type,
    item_id,
    item_category,
    FROM_UNIXTIME(`timestamp` / 1000) AS event_time,
    device_type,
    province,
    HOUR(FROM_UNIXTIME(`timestamp` / 1000)) AS hour,
    TO_DATE(FROM_UNIXTIME(`timestamp` / 1000)) AS date_date
FROM user_events_raw
WHERE dt = '${current_date}';

-- Step 2: DWD -> DWS 用户汇总
INSERT OVERWRITE TABLE dws_user_daily
SELECT
    user_id,
    date_date,
    SUM(CASE WHEN event_type = 'pv' THEN 1 ELSE 0 END) AS pv,
    SUM(CASE WHEN event_type = 'click' THEN 1 ELSE 0 END) AS click,
    SUM(CASE WHEN event_type = 'addcart' THEN 1 ELSE 0 END) AS cart,
    SUM(CASE WHEN event_type = 'buy' THEN 1 ELSE 0 END) AS buy,
    SUM(CASE WHEN event_type = 'favorite' THEN 1 ELSE 0 END) AS favorite,
    MIN(CASE WHEN event_type = 'pv' THEN event_time END) AS uv_first_time
FROM user_events_detail
WHERE date_date = '${current_date}'
GROUP BY user_id, date_date;

-- Step 3: DWS -> ADS 报表
INSERT OVERWRITE TABLE ads_daily_report
SELECT
    date_date,
    SUM(pv) AS total_pv,
    COUNT(DISTINCT user_id) AS total_uv,
    SUM(buy) AS total_order,
    SUM(buy) * 100.0 AS total_gmv,  -- 简化计算
    ROUND(SUM(buy) * 1.0 / SUM(pv), 4) AS conversion_rate
FROM dws_user_daily
WHERE date_date = '${current_date}'
GROUP BY date_date;

七、任务调度:Airflow

7.1 DAG 定义

# dags/user_behavior_etl.py
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'bigdata',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'user_behavior_etl',
    default_args=default_args,
    schedule_interval='0 1 * * *',  # 每天凌晨1点运行
    catchup=False
)

# 任务1: 检查数据是否就绪
check_data = BashOperator(
    task_id='check_data_ready',
    bash_command='hdfs dfs -test -e /user/hive/warehouse/user_events_raw/dt={{ ds }}',
    dag=dag
)

# 任务2: ODS -> DWD
ods_to_dwd = BashOperator(
    task_id='ods_to_dwd',
    bash_command='''
        hive -e "
        SET hive.exec.dynamic.partition=true;
        SET hive.exec.dynamic.partition.mode=nonstrict;
        INSERT OVERWRITE TABLE user_events_detail
        PARTITION(dt='{{ ds }}')
        SELECT ... FROM user_events_raw WHERE dt='{{ ds }}';
        "
    ''',
    dag=dag
)

# 任务3: DWD -> DWS
dwd_to_dws = BashOperator(
    task_id='dwd_to_dws',
    bash_command='spark-submit /path/to/dws_job.py --date={{ ds }}',
    dag=dag
)

# 任务4: DWS -> ADS
dws_to_ads = BashOperator(
    task_id='dws_to_ads',
    bash_command='spark-submit /path/to/ads_job.py --date={{ ds }}',
    dag=dag
)

check_data >> ods_to_dwd >> dwd_to_dws >> dws_to_ads

八、完整数据流向

数据流向总览

┌─────────────────────────────────────────────────────────────────────────┐
│                            数据源                                        │
│   App/Web ──▶ Nginx ──▶ Kafka(user_events)                             │
└────────────────────────────────┬────────────────────────────────────────┘

        ┌─────────────────────────┼─────────────────────────┐
        ▼                         ▼                         ▼
┌───────────────┐       ┌─────────────────┐       ┌─────────────────┐
│ Spark         │       │ Flink           │       │ HDFS            │
│ Streaming     │       │ (实时大屏)       │       │ (原始数据)      │
│               │       │                 │       │                 │
│ - 实时统计    │       │ - 实时 GMV      │       │ - 按日期分区    │
│ - 写入 Redis │       │ - Top 类目      │       │ - Parquet 格式  │
└───────┬───────┘       └────────┬────────┘       └────────┬────────┘
        │                        │                         │
        ▼                        │                         ▼
┌───────────────┐                │               ┌─────────────────┐
│ Redis         │                │               │ Hive           │
│ (实时指标)    │                │               │ (离线分析)      │
│               │                │               │                 │
│ - 今日 PV    │                │               │ - ODS -> DWD   │
│ - 今日 UV    │                │               │ - DWD -> DWS   │
│ - 类目销售   │                │               │ - DWS -> ADS   │
└───────────────┘                │               └────────┬────────┘
                                 │                         │
                                 ▼                         ▼
                        ┌─────────────────┐       ┌─────────────────┐
                        │ 实时大屏        │       │ BI 报表         │
                        │                 │       │                 │
                        │ - GMV 实时更新  │       │ - 日/周/月报表  │
                        │ - 热卖商品      │       │ - 自定义查询    │
                        └─────────────────┘       └─────────────────┘

九、项目总结

9.1 技术栈回顾

模块技术作用
消息队列Kafka数据采集与缓冲
实时计算Spark Streaming近实时统计
实时计算Flink毫秒级实时大屏
离线计算Hive + Spark SQL批量数据分析
存储HDFS海量数据存储
存储Redis缓存与实时查询
调度Airflow任务编排

9.2 进一步优化

  1. 数据质量:增加数据校验
  2. 性能优化:调整并行度和分区
  3. 监控告警:作业失败告警
  4. 资源隔离:生产环境资源管理
  5. 数据治理:元数据管理

写在最后

你学到了什么?

通过这个系列,你已经掌握了:

  1. 大数据基础:CAP 定理、BASE 理论
  2. 分布式存储:HDFS
  3. 分布式计算:MapReduce、Spark
  4. 数据仓库:Hive
  5. 消息队列:Kafka
  6. 流处理:Flink
  7. 实战项目:完整的数据平台

接下来可以学什么?

  • 数据湖:Delta Lake、Iceberg
  • OLAP:ClickHouse、Doris、StarRocks
  • 机器学习:Spark MLlib
  • 云原生大数据:Kubernetes 上的大数据组件

祝你学习愉快!

如果对你有帮助,欢迎收藏、分享!

— Bobot 🦐