这是一个典型的高并发、高频次物联网(IoT)数据上报与处理场景。面对 **10万设备**,**变化即上报 + 每5分钟全量上报** 的需求,是否每条记录都要处理?如何设计架构以兼顾 **可靠性、性能、成本和业务价值**?以下是我的系统性设计方案。
—
## 一、是否每条记录都必须处理?
### 不一定!关键看业务语义:
– **变化上报**:通常包含关键状态变更(如开关、告警、异常),**必须可靠处理**。
– **全量上报**:用于对账、兜底、状态快照,**可容忍部分丢失或延迟**,但需保证最终一致性。
> ✅ **结论**:
> – 变化事件:**强一致性 + 至少一次投递**(At-Least-Once)
> – 全量快照:**最终一致性 + 可降级/采样/压缩**
—
## 二、整体架构设计原则
| 目标 | 策略 |
|——|——|
| 高吞吐 | 异步、批处理、分区并行 |
| 高可用 | 多副本、自动重试、死信队列 |
| 低延迟 | 流式处理(非批处理) |
| 成本可控 | 分层存储、冷热分离、压缩 |
| 可扩展 | 水平扩展、无状态服务 |
—
## 三、技术栈选型建议(现代云原生方案)
| 层级 | 技术选型 |
|——|——–|
| **接入层** | MQTT Broker(如 EMQX / Mosquitto) 或 HTTP API(Nginx + TLS) |
| **消息队列** | Apache Kafka / Pulsar(高吞吐、持久化、分区) |
| **流处理引擎** | Flink / Spark Streaming(推荐 Flink:低延迟+Exactly-Once) |
| **时序数据库** | InfluxDB / TimescaleDB / TDengine(高效写入+时间窗口查询) |
| **关系数据库** | PostgreSQL(存设备元信息、最新状态快照) |
| **缓存** | Redis(实时状态、去重、限流) |
| **监控告警** | Prometheus + Grafana + AlertManager |
| **部署** | Kubernetes + Helm(弹性扩缩容) |
—
## 四、详细流程设计
### 1. 数据上报入口
– 设备通过 **MQTT 协议** 上报(轻量、低功耗),topic 按设备ID分区:
“`
/device/{deviceId}/telemetry
“`
– 若设备不支持 MQTT,可走 HTTPS + JSON,由 API Gateway 转发到 Kafka。
### 2. 消息队列分层
– **Kafka Topic 分两类**:
– `iot.events`:变化事件(高优先级)
– `iot.snapshots`:5分钟全量快照(低优先级)
> 利用 Kafka 的分区机制,按 `deviceId` 哈希分区,保证同一设备数据有序。
### 3. 流处理(Flink Job)
#### 功能模块:
– **去重 & 过滤**:基于 Redis 缓存最近 N 秒相同值(避免抖动重复上报)
– **状态更新**:
– 写入 **时序数据库**(原始数据保留)
– 更新 **PostgreSQL 中的 latest_state 表**(最新状态,供业务快速查询)
– **异常检测**:规则引擎(如 Drools)或 ML 模型(Flink ML)实时告警
– **聚合统计**:每分钟设备在线率、平均温度等指标 → 写入 InfluxDB
#### 容错机制:
– Flink Checkpoint + Kafka offset 提交 → **Exactly-Once 语义**
– 失败数据进入 **DLQ(Dead Letter Queue)**,人工或自动重放
### 4. 存储策略(分层)
| 数据类型 | 存储位置 | 保留策略 |
|——–|——–|——–|
| 原始遥测数据(变化+全量) | 时序数据库(TDengine) | 热数据7天,冷数据转对象存储(S3/OSS) |
| 最新设备状态 | PostgreSQL(带索引) | 永久 |
| 聚合指标 | InfluxDB | 30天 |
| 告警日志 | Elasticsearch | 90天 |
> 💡 TDengine 特别适合 IoT 场景:自动按设备建表、高压缩比、高性能写入(百万点/秒)
### 5. 业务端使用
– **实时查询**:从 PostgreSQL 查最新状态(<10ms) – **历史分析**:查 TDengine(支持时间范围、设备组聚合) – **大屏/报表**:从 InfluxDB 或预计算的物化视图读取 – **告警推送**:Flink 触发 → 写入 RabbitMQ → 推送企业微信/短信 — ## 五、关于“数据丢失”的应对 | 场景 | 应对措施 | |——|——–| | 网络抖动丢包 | MQTT QoS=1(至少一次),Kafka 持久化 | | 后端处理失败 | DLQ + 自动重试(指数退避) | | 全量包丢失 | 下一个全量包会覆盖,不影响最终状态;变化事件才是关键 | | 时序数据丢失 | 业务允许插值/忽略(除非是计费类数据) | > 🔒 **关键点**:只要 **变化事件不丢**,全量包只是“保险”,可接受偶尔丢失。
—
## 六、性能估算(10万设备)
– 假设每设备每小时 10 次变化 → 100万条/小时 ≈ **280 条/秒**
– 全量包:10万设备 × 每5分钟 = 2万条/分钟 ≈ **333 条/秒**
– **总吞吐 ≈ 600 条/秒**(非常轻松,Kafka+Flink 可轻松支撑 10万+/秒)
> 实际瓶颈往往在 **数据库写入**,所以要用 **时序数据库** 而非 MySQL。
—
## 七、如果你是我,我会这样设计
“`text
设备 → MQTT (EMQX)
↓
Kafka (iot.events / iot.snapshots)
↓
Flink Job(去重、状态更新、告警、聚合)
↙️ ↘️
PostgreSQL TDengine + InfluxDB
(最新状态) (历史+指标)
↓
业务系统 / 大屏 / API
“`
– **变化事件**:强保障,Exactly-Once 处理
– **全量包**:异步消费,失败可丢弃(因下次会覆盖)
– **所有原始数据落盘**,但按冷热分层降低成本
– **提供 REST API 查询最新状态**(来自 PostgreSQL)
– **支持按设备/时间段回溯历史**(来自 TDengine)
—
## 八、可选优化
– **边缘计算**:在网关做初步过滤(如只上报变化超过阈值的数据)
– **数据压缩**:Protobuf 替代 JSON,减少带宽
– **动态采样**:非关键设备在高峰期降频上报
✅ docker-compose.yml
version: '3.8'
services:
# ========== 接入层 ==========
emqx:
image: emqx/emqx:5.7
container_name: emqx
restart: always
ports:
- "18083:18083" # EMQX Dashboard
- "1883:1883" # MQTT TCP
- "8083:8083" # MQTT over WebSocket
environment:
EMQX_NAME: emqx
EMQX_HOST: emqx
networks:
- iot-net
# ========== 消息队列 ==========
zookeeper:
image: confluentinc/cp-zookeeper:7.4.0
container_name: zookeeper
restart: always
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
networks:
- iot-net
kafka:
image: confluentinc/cp-kafka:7.4.0
container_name: kafka
restart: always
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_NUM_PARTITIONS: 6
KAFKA_DEFAULT_REPLICATION_FACTOR: 1
networks:
- iot-net
# ========== 流处理引擎 ==========
jobmanager:
image: flink:1.18-scala_2.12-java11
container_name: flink-jobmanager
restart: always
expose:
- "6123"
ports:
- "8081:8081" # Flink Web UI
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 4
parallelism.default: 2
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 2048m
volumes:
- ./flink-jobs:/opt/flink/usrlib # 挂载自定义 Flink 作业 JAR
networks:
- iot-net
taskmanager:
image: flink:1.18-scala_2.12-java11
container_name: flink-taskmanager
restart: always
depends_on:
- jobmanager
expose:
- "6121"
- "6122"
command: taskmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 4
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 2048m
networks:
- iot-net
# ========== 时序数据库 ==========
influxdb:
image: influxdb:2.7
container_name: influxdb
restart: always
ports:
- "8086:8086"
environment:
DOCKER_INFLUXDB_INIT_MODE: setup
DOCKER_INFLUXDB_INIT_USERNAME: admin
DOCKER_INFLUXDB_INIT_PASSWORD: password
DOCKER_INFLUXDB_INIT_ORG: iot-org
DOCKER_INFLUXDB_INIT_BUCKET: telemetry
DOCKER_INFLUXDB_INIT_RETENTION: 7d
volumes:
- influxdb_data:/var/lib/influxdb2
networks:
- iot-net
# ========== 关系数据库 ==========
postgres:
image: postgres:15
container_name: postgres
restart: always
ports:
- "5432:5432"
environment:
POSTGRES_DB: iot_db
POSTGRES_USER: iot_user
POSTGRES_PASSWORD: iot_pass
volumes:
- postgres_data:/var/lib/postgresql/data
networks:
- iot-net
# ========== 缓存 ==========
redis:
image: redis:7-alpine
container_name: redis
restart: always
ports:
- "6379:6379"
volumes:
- redis_data:/data
networks:
- iot-net
# ========== 监控告警 ==========
prometheus:
image: prom/prometheus:v2.47.1
container_name: prometheus
restart: always
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus_data:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
- '--web.console.libraries=/etc/prometheus/console_libraries'
- '--web.console.templates=/etc/prometheus/consoles'
- '--web.enable-lifecycle'
networks:
- iot-net
alertmanager:
image: prom/alertmanager:v0.26.0
container_name: alertmanager
restart: always
ports:
- "9093:9093"
volumes:
- ./alertmanager.yml:/etc/alertmanager/alertmanager.yml
networks:
- iot-net
grafana:
image: grafana/grafana:10.1.5
container_name: grafana
restart: always
ports:
- "3000:3000"
environment:
GF_SECURITY_ADMIN_PASSWORD: admin
volumes:
- grafana_data:/var/lib/grafana
networks:
- iot-net
volumes:
influxdb_data:
postgres_data:
redis_data:
prometheus_data:
grafana_data:
networks:
iot-net:
driver: bridge
🔧 配套配置文件
. prometheus.yml(放在同目录)
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'emqx'
static_configs:
- targets: ['emqx:18083']
- job_name: 'kafka'
static_configs:
- targets: ['kafka:9092'] # 注意:需额外 exporter,此处仅为示意
- job_name: 'flink'
static_configs:
- targets: ['jobmanager:8081']
- job_name: 'influxdb'
metrics_path: /metrics
static_configs:
- targets: ['influxdb:8086']
- job_name: 'postgres'
static_configs:
- targets: ['postgres:5432'] # 需 pg_exporter
- job_name: 'redis'
static_configs:
- targets: ['redis:6379']
- job_name: 'prometheus'
static_configs:
- targets: ['localhost:9090']
alertmanager.yml(简单配置)
route:
receiver: 'email-notifications'
receivers:
- name: 'email-notifications'
email_configs:
- to: 'admin@example.com'
from: 'alertmanager@example.com'
smarthost: 'smtp.example.com:587'
auth_username: 'user'
auth_password: 'pass'
🚀 启动命令
# 启动所有服务 docker-compose up -d # 查看日志 docker-compose logs -f emqx docker-compose logs -f jobmanager # 访问各服务: # EMQX Dashboard: http://localhost:18083 (默认用户: admin / public) # Flink UI: http://localhost:8081 # Grafana: http://localhost:3000 (admin/admin) # InfluxDB: http://localhost:8086 # PostgreSQL: localhost:5432
📌 注意事项
Kafka 的 advertised.listeners 必须设为容器名(kafka:9092),否则 Flink 无法连接。
Flink 作业需自行开发并挂载到 ./flink-jobs/ 目录(JAR 包)。
生产环境建议:
使用 docker swarm 或 Kubernetes
分离监控组件(Prometheus/Grafana 可独立部署)
配置 TLS、认证、防火墙
资源分配:根据设备规模调整 Flink TaskManager 内存和 slot 数。
