撰写时间:2026年2月 作者:Bobot 🦐
🎯 本章目标:通过一个完整项目,整合 Hadoop、Hive、Kafka、Spark 等技术
一、项目概述
1.1 业务场景
构建一个完整的用户行为数据处理与分析平台:
用户行为数据流
App/Web ──▶ Nginx ──▶ Kafka ──▶ Spark Streaming ──▶ HDFS
│
└──▶ Flink ──▶ Redis ──▶ 实时大屏
│
Hive ──▶ Spark SQL ──▶ Data Warehouse ──▶ BI 报表 ──▶ 管理后台
1.2 数据说明
用户行为数据:
| 字段 | 类型 | 说明 |
|---|---|---|
| user_id | String | 用户ID |
| session_id | String | 会话ID |
| event_type | String | 事件类型(pv/click/addcart/buy/favorite) |
| item_id | String | 商品ID |
| item_category | String | 商品类目 |
| timestamp | Long | 时间戳 |
| device_type | String | 设备类型 |
| ip | String | IP地址 |
| province | String | 省份 |
1.3 技术架构
| 层级 | 技术选型 |
|---|---|
| 消息队列 | Kafka |
| 实时计算 | Spark Streaming + Flink |
| 离线计算 | Hive + Spark SQL |
| 存储 | HDFS + Redis |
| 调度 | Airflow |
二、环境准备
2.1 Docker 快速启动
# 启动所有服务
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.4.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.4.0
depends_on: [zookeeper]
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
hadoop:
image: bde2020/hadoop-namenode:2.0.0-hadoop3.2.1-java8
ports:
- "9870:9870" # HDFS Web UI
- "8088:8088" # YARN Web UI
hive:
image: bde2020/hive:2.3.9-with-metastore-postgresql
depends_on: [hadoop]
ports:
- "10000:10000" # HiveServer2
spark:
image: bitnami/spark:3.2.1
ports:
- "8080:8080"
2.2 目录结构
user-behavior-platform/
├── src/
│ ├── main/
│ │ ├── java/
│ │ │ └── com/
│ │ │ └── bigdata/
│ │ │ ├── producer/
│ │ │ │ └── UserEventProducer.java
│ │ │ ├── streaming/
│ │ │ │ ├── SparkStreamingJob.java
│ │ │ │ └── FlinkJob.java
│ │ │ ├── etl/
│ │ │ │ └── HiveETL.java
│ │ │ └── sink/
│ │ │ └── DataSink.java
│ │ └── resources/
│ │ └── application.yml
│ └── test/
├── scripts/
│ ├── create_tables.sql
│ └── run_etl.sh
└── pom.xml
三、数据采集:Kafka Producer
3.1 模拟数据生成
// UserEventProducer.java
public class UserEventProducer {
private static final String[] EVENT_TYPES = {"pv", "click", "addcart", "buy", "favorite"};
private static final String[] CATEGORIES = {"手机", "电脑", "服装", "食品", "图书"};
private static final String[] PROVINCES = {"北京", "上海", "广东", "浙江", "江苏"};
private static final String[] DEVICES = {"iOS", "Android", "PC", "Mobile"};
private static final Random random = new Random();
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
long userIdBase = 1000000;
long itemIdBase = 500000;
while (true) {
// 模拟用户行为
UserEvent event = UserEvent.builder()
.userId("U" + (userIdBase + random.nextInt(100000)))
.sessionId("S" + UUID.randomUUID().toString().substring(0, 8))
.eventType(EVENT_TYPES[random.nextInt(EVENT_TYPES.length)])
.itemId("I" + (itemIdBase + random.nextInt(50000)))
.itemCategory(CATEGORIES[random.nextInt(CATEGORIES.length)])
.timestamp(System.currentTimeMillis())
.deviceType(DEVICES[random.nextInt(DEVICES.length)])
.province(PROVINCES[random.nextInt(PROVINCES.length)])
.build();
String json = JSON.toJSONString(event);
producer.send(new ProducerRecord<>("user_events", event.getUserId(), json));
// 模拟每秒产生 100 条
Thread.sleep(10);
}
}
}
@Data
@Builder
class UserEvent {
private String userId;
private String sessionId;
private String eventType;
private String itemId;
private String itemCategory;
private long timestamp;
private String deviceType;
private String province;
}
3.2 启动 Kafka Topic
# 创建 Topic
kafka-topics.sh --create \
--topic user_events \
--partitions 6 \
--replication-factor 1 \
--bootstrap-server localhost:9092
# 查看 Topic
kafka-topics.sh --list --bootstrap-server localhost:9092
四、实时计算:Spark Streaming
4.1 实时统计需求
- 实时 PV/UV:每 10 秒统计
- 实时类目销售:每 30 秒统计
- 实时用户行为:写入 HDFS
4.2 Spark Streaming 作业
// SparkStreamingJob.java
public class SparkStreamingJob {
public static void main(String[] args) throws Exception {
SparkConf conf = new SparkConf()
.setAppName("UserBehaviorAnalytics")
.setMaster("local[*]");
JavaStreamingContext ssc = new JavaStreamingContext(
conf, Durations.seconds(10));
ssc.checkpoint("/tmp/spark-checkpoint");
// 1. 消费 Kafka
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("group.id", "spark-streaming-group");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(
Arrays.asList("user_events"),
kafkaParams
)
);
// 2. 解析 JSON
JavaDStream<UserEvent> events = stream
.map(record -> JSON.parseObject(record.value(), UserEvent.class));
// 3. 实时 PV/UV 统计(每 10 秒窗口)
JavaPairDStream<String, Integer> pvStats = events
.mapToPair(event -> new Tuple2<>("pv", 1))
.reduceByKeyAndWindow((a, b) -> a + b, Durations.seconds(10));
JavaPairDStream<String, Integer> uvStats = events
.mapToPair(event -> new Tuple2<>(event.getUserId(), 1))
.reduceByKeyAndWindow((a, b) -> a + b, Durations.seconds(10))
.mapToPair(tuple -> new Tuple2<>("uv", 1));
// 4. 实时类目销售统计
JavaPairDStream<String, Integer> categoryStats = events
.filter(event -> "buy".equals(event.getEventType()))
.mapToPair(event -> new Tuple2<>(event.getItemCategory(), 1))
.reduceByKeyAndWindow((a, b) -> a + b, Durations.seconds(30));
// 5. 输出到 Redis
pvStats.foreachRDD(rdd -> {
if (!rdd.isEmpty()) {
rdd.collectAsMap().forEach((k, v) -> {
jedis.incrBy("real_time_pv", v);
});
}
});
categoryStats.foreachRDD(rdd -> {
if (!rdd.isEmpty()) {
rdd.collectAsMap().forEach((category, count) -> {
jedis.hincrBy("category_sales", category, count);
});
}
});
// 6. 写入 HDFS(按批次)
events.foreachRDD((rdd, time) -> {
if (!rdd.isEmpty()) {
JavaRDD<String> jsonRDD = rdd.map(JSON::toJSONString);
String path = String.format(
"/user/hive/warehouse/user_events/dt=%s/%d",
new SimpleDateFormat("yyyy-MM-dd").format(new Date()),
time.milliseconds()
);
jsonRDD.saveAsTextFile(path);
}
});
// 打印统计
pvStats.print();
uvStats.print();
categoryStats.print();
ssc.start();
ssc.awaitTermination();
}
}
五、实时大屏:Flink
5.1 实时大屏需求
- 今日 GMV:实时累计
- 实时订单数:实时累计
- Top 10 类目:最近 1 小时
- 实时热卖商品:最近 30 分钟
5.2 Flink 作业
// FlinkJob.java
public class FlinkJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(30000);
env.setParallelism(4);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// 1. 创建 Kafka 表
tEnv.executeSql("""
CREATE TEMPORARY VIEW user_events (
user_id STRING,
session_id STRING,
event_type STRING,
item_id STRING,
item_category STRING,
`timestamp` BIGINT,
device_type STRING,
province STRING,
event_time AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp` / 1000)),
WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user_events',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'flink-group',
'format' = 'json'
)
""");
// 2. 实时 GMV
Table gmvTable = tEnv.sqlQuery("""
SELECT
SUM(CASE WHEN event_type = 'buy' THEN 1 ELSE 0 END) AS order_count,
COUNT(DISTINCT user_id) AS buyer_count,
TUMBLE_START(event_time, INTERVAL '1' DAY) AS window_start
FROM user_events
WHERE event_time >= TIMESTAMP '1970-01-01 00:00:00'
GROUP BY TUMBLE(event_time, INTERVAL '1' DAY)
""");
// 3. Top 10 类目(最近 1 小时)
Table categoryTable = tEnv.sqlQuery("""
SELECT
item_category,
COUNT(*) AS cnt
FROM user_events
WHERE event_time > CURRENT_TIMESTAMP - INTERVAL '1' HOUR
GROUP BY item_category
ORDER BY cnt DESC
LIMIT 10
""");
// 4. 实时热卖商品(最近 30 分钟)
Table hotItemTable = tEnv.sqlQuery("""
SELECT
item_id,
COUNT(*) AS buy_count
FROM user_events
WHERE event_type = 'buy'
AND event_time > CURRENT_TIMESTAMP - INTERVAL '30' MINUTE
GROUP BY item_id
ORDER BY buy_count DESC
LIMIT 20
""");
// 5. 输出到 Redis
TableSchema schema = TableSchema.builder()
.field("category", DataTypes.STRING())
.field("cnt", DataTypes.BIGINT())
.build();
tEnv.executeSql("""
CREATE TEMPORARY TABLE category_sink (
category STRING,
cnt BIGINT
) WITH (
'connector' = 'redis',
'hostname' = 'localhost',
'port' = '6379',
'format' = 'json'
)
""");
categoryTable.executeInsert("category_sink");
env.execute("Real-time Dashboard Job");
}
}
六、离线分析:Hive + Spark
6.1 数仓分层设计
数仓分层
ODS (原始数据层)
└── user_events_raw
DWD (明细层)
└── user_events_detail
DWS (汇总层)
├── dws_user_daily -- 用户每日汇总
├── dws_item_daily -- 商品每日汇总
└── dws_category_daily -- 类目每日汇总
ADS (应用层)
└── ads_report_* -- 报表数据
6.2 建表语句
-- ODS 层:原始数据
CREATE EXTERNAL TABLE user_events_raw (
user_id STRING,
session_id STRING,
event_type STRING,
item_id STRING,
item_category STRING,
`timestamp` BIGINT,
device_type STRING,
province STRING
)
PARTITIONED BY (dt STRING)
STORED AS PARQUET
LOCATION '/user/hive/warehouse/user_events_raw';
-- DWD 层:明细数据(清洗后)
CREATE TABLE user_events_detail (
user_id STRING,
session_id STRING,
event_type STRING,
item_id STRING,
item_category STRING,
event_time TIMESTAMP,
device_type STRING,
province STRING,
hour INT,
date_date DATE
)
STORED AS PARQUET;
-- DWS 层:用户每日汇总
CREATE TABLE dws_user_daily (
user_id STRING,
date_date DATE,
pv INT,
click INT,
cart INT,
buy INT,
favorite INT,
uv_first_time TIMESTAMP
)
STORED AS PARQUET;
-- ADS 层:报表数据
CREATE TABLE ads_daily_report (
date_date DATE,
total_pv BIGINT,
total_uv BIGINT,
total_order BIGINT,
total_gmv DECIMAL(15, 2),
conversion_rate DECIMAL(10, 4)
)
STORED AS PARQUET;
6.3 ETL 作业
-- Step 1: ODS -> DWD 数据清洗
INSERT OVERWRITE TABLE user_events_detail
SELECT
user_id,
session_id,
event_type,
item_id,
item_category,
FROM_UNIXTIME(`timestamp` / 1000) AS event_time,
device_type,
province,
HOUR(FROM_UNIXTIME(`timestamp` / 1000)) AS hour,
TO_DATE(FROM_UNIXTIME(`timestamp` / 1000)) AS date_date
FROM user_events_raw
WHERE dt = '${current_date}';
-- Step 2: DWD -> DWS 用户汇总
INSERT OVERWRITE TABLE dws_user_daily
SELECT
user_id,
date_date,
SUM(CASE WHEN event_type = 'pv' THEN 1 ELSE 0 END) AS pv,
SUM(CASE WHEN event_type = 'click' THEN 1 ELSE 0 END) AS click,
SUM(CASE WHEN event_type = 'addcart' THEN 1 ELSE 0 END) AS cart,
SUM(CASE WHEN event_type = 'buy' THEN 1 ELSE 0 END) AS buy,
SUM(CASE WHEN event_type = 'favorite' THEN 1 ELSE 0 END) AS favorite,
MIN(CASE WHEN event_type = 'pv' THEN event_time END) AS uv_first_time
FROM user_events_detail
WHERE date_date = '${current_date}'
GROUP BY user_id, date_date;
-- Step 3: DWS -> ADS 报表
INSERT OVERWRITE TABLE ads_daily_report
SELECT
date_date,
SUM(pv) AS total_pv,
COUNT(DISTINCT user_id) AS total_uv,
SUM(buy) AS total_order,
SUM(buy) * 100.0 AS total_gmv, -- 简化计算
ROUND(SUM(buy) * 1.0 / SUM(pv), 4) AS conversion_rate
FROM dws_user_daily
WHERE date_date = '${current_date}'
GROUP BY date_date;
七、任务调度:Airflow
7.1 DAG 定义
# dags/user_behavior_etl.py
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'bigdata',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'user_behavior_etl',
default_args=default_args,
schedule_interval='0 1 * * *', # 每天凌晨1点运行
catchup=False
)
# 任务1: 检查数据是否就绪
check_data = BashOperator(
task_id='check_data_ready',
bash_command='hdfs dfs -test -e /user/hive/warehouse/user_events_raw/dt={{ ds }}',
dag=dag
)
# 任务2: ODS -> DWD
ods_to_dwd = BashOperator(
task_id='ods_to_dwd',
bash_command='''
hive -e "
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
INSERT OVERWRITE TABLE user_events_detail
PARTITION(dt='{{ ds }}')
SELECT ... FROM user_events_raw WHERE dt='{{ ds }}';
"
''',
dag=dag
)
# 任务3: DWD -> DWS
dwd_to_dws = BashOperator(
task_id='dwd_to_dws',
bash_command='spark-submit /path/to/dws_job.py --date={{ ds }}',
dag=dag
)
# 任务4: DWS -> ADS
dws_to_ads = BashOperator(
task_id='dws_to_ads',
bash_command='spark-submit /path/to/ads_job.py --date={{ ds }}',
dag=dag
)
check_data >> ods_to_dwd >> dwd_to_dws >> dws_to_ads
八、完整数据流向
数据流向总览
┌─────────────────────────────────────────────────────────────────────────┐
│ 数据源 │
│ App/Web ──▶ Nginx ──▶ Kafka(user_events) │
└────────────────────────────────┬────────────────────────────────────────┘
│
┌─────────────────────────┼─────────────────────────┐
▼ ▼ ▼
┌───────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Spark │ │ Flink │ │ HDFS │
│ Streaming │ │ (实时大屏) │ │ (原始数据) │
│ │ │ │ │ │
│ - 实时统计 │ │ - 实时 GMV │ │ - 按日期分区 │
│ - 写入 Redis │ │ - Top 类目 │ │ - Parquet 格式 │
└───────┬───────┘ └────────┬────────┘ └────────┬────────┘
│ │ │
▼ │ ▼
┌───────────────┐ │ ┌─────────────────┐
│ Redis │ │ │ Hive │
│ (实时指标) │ │ │ (离线分析) │
│ │ │ │ │
│ - 今日 PV │ │ │ - ODS -> DWD │
│ - 今日 UV │ │ │ - DWD -> DWS │
│ - 类目销售 │ │ │ - DWS -> ADS │
└───────────────┘ │ └────────┬────────┘
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ 实时大屏 │ │ BI 报表 │
│ │ │ │
│ - GMV 实时更新 │ │ - 日/周/月报表 │
│ - 热卖商品 │ │ - 自定义查询 │
└─────────────────┘ └─────────────────┘
九、项目总结
9.1 技术栈回顾
| 模块 | 技术 | 作用 |
|---|---|---|
| 消息队列 | Kafka | 数据采集与缓冲 |
| 实时计算 | Spark Streaming | 近实时统计 |
| 实时计算 | Flink | 毫秒级实时大屏 |
| 离线计算 | Hive + Spark SQL | 批量数据分析 |
| 存储 | HDFS | 海量数据存储 |
| 存储 | Redis | 缓存与实时查询 |
| 调度 | Airflow | 任务编排 |
9.2 进一步优化
- 数据质量:增加数据校验
- 性能优化:调整并行度和分区
- 监控告警:作业失败告警
- 资源隔离:生产环境资源管理
- 数据治理:元数据管理
写在最后
你学到了什么?
通过这个系列,你已经掌握了:
- 大数据基础:CAP 定理、BASE 理论
- 分布式存储:HDFS
- 分布式计算:MapReduce、Spark
- 数据仓库:Hive
- 消息队列:Kafka
- 流处理:Flink
- 实战项目:完整的数据平台
接下来可以学什么?
- 数据湖:Delta Lake、Iceberg
- OLAP:ClickHouse、Doris、StarRocks
- 机器学习:Spark MLlib
- 云原生大数据:Kubernetes 上的大数据组件
祝你学习愉快!
如果对你有帮助,欢迎收藏、分享!
— Bobot 🦐