• 欢迎访问开心洋葱网站,在线教程,推荐使用最新版火狐浏览器和Chrome浏览器访问本网站,欢迎加入开心洋葱 QQ群
  • 为方便开心洋葱网用户,开心洋葱官网已经开启复制功能!
  • 欢迎访问开心洋葱网站,手机也能访问哦~欢迎加入开心洋葱多维思维学习平台 QQ群
  • 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏开心洋葱吧~~~~~~~~~~~~~!
  • 由于近期流量激增,小站的ECS没能经的起亲们的访问,本站依然没有盈利,如果各位看如果觉着文字不错,还请看官给小站打个赏~~~~~~~~~~~~~!

高并发、高频次物联网(IoT)数据上报与处理场景。面对 10万设备,变化即上报 + 每5分钟全量上报 的需求,是否每条记录都要处理?如何设计架构以兼顾 可靠性、性能、成本和业务价值?以下是我的系统性设计方案

数据库 水墨上仙 2647次浏览 0个评论

这是一个典型的高并发、高频次物联网(IoT)数据上报与处理场景。面对 **10万设备**,**变化即上报 + 每5分钟全量上报** 的需求,是否每条记录都要处理?如何设计架构以兼顾 **可靠性、性能、成本和业务价值**?以下是我的系统性设计方案。

## 一、是否每条记录都必须处理?

### 不一定!关键看业务语义:

– **变化上报**:通常包含关键状态变更(如开关、告警、异常),**必须可靠处理**。
– **全量上报**:用于对账、兜底、状态快照,**可容忍部分丢失或延迟**,但需保证最终一致性。

> ✅ **结论**:
> – 变化事件:**强一致性 + 至少一次投递**(At-Least-Once)
> – 全量快照:**最终一致性 + 可降级/采样/压缩**

## 二、整体架构设计原则

| 目标 | 策略 |
|——|——|
| 高吞吐 | 异步、批处理、分区并行 |
| 高可用 | 多副本、自动重试、死信队列 |
| 低延迟 | 流式处理(非批处理) |
| 成本可控 | 分层存储、冷热分离、压缩 |
| 可扩展 | 水平扩展、无状态服务 |

## 三、技术栈选型建议(现代云原生方案)

| 层级 | 技术选型 |
|——|——–|
| **接入层** | MQTT Broker(如 EMQX / Mosquitto) 或 HTTP API(Nginx + TLS) |
| **消息队列** | Apache Kafka / Pulsar(高吞吐、持久化、分区) |
| **流处理引擎** | Flink / Spark Streaming(推荐 Flink:低延迟+Exactly-Once) |
| **时序数据库** | InfluxDB / TimescaleDB / TDengine(高效写入+时间窗口查询) |
| **关系数据库** | PostgreSQL(存设备元信息、最新状态快照) |
| **缓存** | Redis(实时状态、去重、限流) |
| **监控告警** | Prometheus + Grafana + AlertManager |
| **部署** | Kubernetes + Helm(弹性扩缩容) |

## 四、详细流程设计

### 1. 数据上报入口
– 设备通过 **MQTT 协议** 上报(轻量、低功耗),topic 按设备ID分区:
“`
/device/{deviceId}/telemetry
“`
– 若设备不支持 MQTT,可走 HTTPS + JSON,由 API Gateway 转发到 Kafka。

### 2. 消息队列分层
– **Kafka Topic 分两类**:
– `iot.events`:变化事件(高优先级)
– `iot.snapshots`:5分钟全量快照(低优先级)

> 利用 Kafka 的分区机制,按 `deviceId` 哈希分区,保证同一设备数据有序。

### 3. 流处理(Flink Job)
#### 功能模块:
– **去重 & 过滤**:基于 Redis 缓存最近 N 秒相同值(避免抖动重复上报)
– **状态更新**:
– 写入 **时序数据库**(原始数据保留)
– 更新 **PostgreSQL 中的 latest_state 表**(最新状态,供业务快速查询)
– **异常检测**:规则引擎(如 Drools)或 ML 模型(Flink ML)实时告警
– **聚合统计**:每分钟设备在线率、平均温度等指标 → 写入 InfluxDB

#### 容错机制:
– Flink Checkpoint + Kafka offset 提交 → **Exactly-Once 语义**
– 失败数据进入 **DLQ(Dead Letter Queue)**,人工或自动重放

### 4. 存储策略(分层)

| 数据类型 | 存储位置 | 保留策略 |
|——–|——–|——–|
| 原始遥测数据(变化+全量) | 时序数据库(TDengine) | 热数据7天,冷数据转对象存储(S3/OSS) |
| 最新设备状态 | PostgreSQL(带索引) | 永久 |
| 聚合指标 | InfluxDB | 30天 |
| 告警日志 | Elasticsearch | 90天 |

> 💡 TDengine 特别适合 IoT 场景:自动按设备建表、高压缩比、高性能写入(百万点/秒)

### 5. 业务端使用
– **实时查询**:从 PostgreSQL 查最新状态(<10ms) – **历史分析**:查 TDengine(支持时间范围、设备组聚合) – **大屏/报表**:从 InfluxDB 或预计算的物化视图读取 – **告警推送**:Flink 触发 → 写入 RabbitMQ → 推送企业微信/短信 — ## 五、关于“数据丢失”的应对 | 场景 | 应对措施 | |——|——–| | 网络抖动丢包 | MQTT QoS=1(至少一次),Kafka 持久化 | | 后端处理失败 | DLQ + 自动重试(指数退避) | | 全量包丢失 | 下一个全量包会覆盖,不影响最终状态;变化事件才是关键 | | 时序数据丢失 | 业务允许插值/忽略(除非是计费类数据) | > 🔒 **关键点**:只要 **变化事件不丢**,全量包只是“保险”,可接受偶尔丢失。

## 六、性能估算(10万设备)

– 假设每设备每小时 10 次变化 → 100万条/小时 ≈ **280 条/秒**
– 全量包:10万设备 × 每5分钟 = 2万条/分钟 ≈ **333 条/秒**
– **总吞吐 ≈ 600 条/秒**(非常轻松,Kafka+Flink 可轻松支撑 10万+/秒)

> 实际瓶颈往往在 **数据库写入**,所以要用 **时序数据库** 而非 MySQL。

## 七、如果你是我,我会这样设计

“`text
设备 → MQTT (EMQX)

Kafka (iot.events / iot.snapshots)

Flink Job(去重、状态更新、告警、聚合)
↙️ ↘️
PostgreSQL TDengine + InfluxDB
(最新状态) (历史+指标)

业务系统 / 大屏 / API
“`

– **变化事件**:强保障,Exactly-Once 处理
– **全量包**:异步消费,失败可丢弃(因下次会覆盖)
– **所有原始数据落盘**,但按冷热分层降低成本
– **提供 REST API 查询最新状态**(来自 PostgreSQL)
– **支持按设备/时间段回溯历史**(来自 TDengine)

## 八、可选优化

– **边缘计算**:在网关做初步过滤(如只上报变化超过阈值的数据)
– **数据压缩**:Protobuf 替代 JSON,减少带宽
– **动态采样**:非关键设备在高峰期降频上报


✅ docker-compose.yml

version: '3.8'

services:
  # ========== 接入层 ==========
  emqx:
    image: emqx/emqx:5.7
    container_name: emqx
    restart: always
    ports:
      - "18083:18083"  # EMQX Dashboard
      - "1883:1883"    # MQTT TCP
      - "8083:8083"    # MQTT over WebSocket
    environment:
      EMQX_NAME: emqx
      EMQX_HOST: emqx
    networks:
      - iot-net

  # ========== 消息队列 ==========
  zookeeper:
    image: confluentinc/cp-zookeeper:7.4.0
    container_name: zookeeper
    restart: always
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    networks:
      - iot-net

  kafka:
    image: confluentinc/cp-kafka:7.4.0
    container_name: kafka
    restart: always
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_NUM_PARTITIONS: 6
      KAFKA_DEFAULT_REPLICATION_FACTOR: 1
    networks:
      - iot-net

  # ========== 流处理引擎 ==========
  jobmanager:
    image: flink:1.18-scala_2.12-java11
    container_name: flink-jobmanager
    restart: always
    expose:
      - "6123"
    ports:
      - "8081:8081"  # Flink Web UI
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 4
        parallelism.default: 2
        jobmanager.memory.process.size: 1600m
        taskmanager.memory.process.size: 2048m
    volumes:
      - ./flink-jobs:/opt/flink/usrlib  # 挂载自定义 Flink 作业 JAR
    networks:
      - iot-net

  taskmanager:
    image: flink:1.18-scala_2.12-java11
    container_name: flink-taskmanager
    restart: always
    depends_on:
      - jobmanager
    expose:
      - "6121"
      - "6122"
    command: taskmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 4
        jobmanager.memory.process.size: 1600m
        taskmanager.memory.process.size: 2048m
    networks:
      - iot-net

  # ========== 时序数据库 ==========
  influxdb:
    image: influxdb:2.7
    container_name: influxdb
    restart: always
    ports:
      - "8086:8086"
    environment:
      DOCKER_INFLUXDB_INIT_MODE: setup
      DOCKER_INFLUXDB_INIT_USERNAME: admin
      DOCKER_INFLUXDB_INIT_PASSWORD: password
      DOCKER_INFLUXDB_INIT_ORG: iot-org
      DOCKER_INFLUXDB_INIT_BUCKET: telemetry
      DOCKER_INFLUXDB_INIT_RETENTION: 7d
    volumes:
      - influxdb_data:/var/lib/influxdb2
    networks:
      - iot-net

  # ========== 关系数据库 ==========
  postgres:
    image: postgres:15
    container_name: postgres
    restart: always
    ports:
      - "5432:5432"
    environment:
      POSTGRES_DB: iot_db
      POSTGRES_USER: iot_user
      POSTGRES_PASSWORD: iot_pass
    volumes:
      - postgres_data:/var/lib/postgresql/data
    networks:
      - iot-net

  # ========== 缓存 ==========
  redis:
    image: redis:7-alpine
    container_name: redis
    restart: always
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    networks:
      - iot-net

  # ========== 监控告警 ==========
  prometheus:
    image: prom/prometheus:v2.47.1
    container_name: prometheus
    restart: always
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
      - prometheus_data:/prometheus
    command:
      - '--config.file=/etc/prometheus/prometheus.yml'
      - '--storage.tsdb.path=/prometheus'
      - '--web.console.libraries=/etc/prometheus/console_libraries'
      - '--web.console.templates=/etc/prometheus/consoles'
      - '--web.enable-lifecycle'
    networks:
      - iot-net

  alertmanager:
    image: prom/alertmanager:v0.26.0
    container_name: alertmanager
    restart: always
    ports:
      - "9093:9093"
    volumes:
      - ./alertmanager.yml:/etc/alertmanager/alertmanager.yml
    networks:
      - iot-net

  grafana:
    image: grafana/grafana:10.1.5
    container_name: grafana
    restart: always
    ports:
      - "3000:3000"
    environment:
      GF_SECURITY_ADMIN_PASSWORD: admin
    volumes:
      - grafana_data:/var/lib/grafana
    networks:
      - iot-net

volumes:
  influxdb_data:
  postgres_data:
  redis_data:
  prometheus_data:
  grafana_data:

networks:
  iot-net:
    driver: bridge

🔧 配套配置文件
. prometheus.yml(放在同目录)

 

global:
  scrape_interval: 15s

scrape_configs:
  - job_name: 'emqx'
    static_configs:
      - targets: ['emqx:18083']
  - job_name: 'kafka'
    static_configs:
      - targets: ['kafka:9092']  # 注意:需额外 exporter,此处仅为示意
  - job_name: 'flink'
    static_configs:
      - targets: ['jobmanager:8081']
  - job_name: 'influxdb'
    metrics_path: /metrics
    static_configs:
      - targets: ['influxdb:8086']
  - job_name: 'postgres'
    static_configs:
      - targets: ['postgres:5432']  # 需 pg_exporter
  - job_name: 'redis'
    static_configs:
      - targets: ['redis:6379']
  - job_name: 'prometheus'
    static_configs:
      - targets: ['localhost:9090']

alertmanager.yml(简单配置)

route:
  receiver: 'email-notifications'

receivers:
  - name: 'email-notifications'
    email_configs:
      - to: 'admin@example.com'
        from: 'alertmanager@example.com'
        smarthost: 'smtp.example.com:587'
        auth_username: 'user'
        auth_password: 'pass'

🚀 启动命令

# 启动所有服务
docker-compose up -d

# 查看日志
docker-compose logs -f emqx
docker-compose logs -f jobmanager

# 访问各服务:
# EMQX Dashboard: http://localhost:18083 (默认用户: admin / public)
# Flink UI: http://localhost:8081
# Grafana: http://localhost:3000 (admin/admin)
# InfluxDB: http://localhost:8086
# PostgreSQL: localhost:5432

📌 注意事项
Kafka 的 advertised.listeners 必须设为容器名(kafka:9092),否则 Flink 无法连接。
Flink 作业需自行开发并挂载到 ./flink-jobs/ 目录(JAR 包)。
生产环境建议:
使用 docker swarm 或 Kubernetes
分离监控组件(Prometheus/Grafana 可独立部署)
配置 TLS、认证、防火墙
资源分配:根据设备规模调整 Flink TaskManager 内存和 slot 数。


喜欢 (0)

您必须 登录 才能发表评论!

加载中……