本文涉及技术:Snowpipe Streaming V2 高速接口、REST 协议、树莓派、Apache Iceberg、Snowflake、亚马逊云服务(AWS)


项目地址:https://github.com/tspannhw/RPIWeatherStreaming
对于物联网领域的数据工程师而言,真正的“入门第一课”或许不是点亮LED,而是构建一套能将传感器数据实时、稳定地传输到云端并产生价值的数据管道。本文将详细介绍 RPIWeatherStreaming 项目,它能够将一台普通的树莓派改造为高性能气象站,并把采集到的环境数据,通过 Snowpipe Streaming 这个低延迟API,直接写入 Snowflake 数据云。
这套方案摒弃了传统的数据处理流程,无需先将数据保存为 CSV 文件再上传到 S3,也省去了搭建和维护复杂 Kafka 集群的烦恼。下面,我们来拆解其实现原理与具体搭建步骤。
系统架构
项目的核心优势在于架构的简洁性,它跳过了物联网数据接入中常见的中间件环节,整个流程可以简化为三步:
- 边缘端(数据源头):搭载了环境传感器(如 BME280 或 BME680)的树莓派,负责采集温度、湿度、气压等数据。BME680 还额外支持气体电阻的采集。
- 数据接入(传输层):由
Python 脚本通过 Snowflake Ingest SDK,将传感器读数打包成微批数据。
- 目标端(存储层):数据通过加密通道,借助 Snowpipe Streaming 高速 V2 版 REST API,被直接推送至 Snowflake 的数据库表中。
为何选择 Snowpipe Streaming V2 API?
传统的 Snowpipe 需要监控 S3 等“暂存区”来发现新文件,这种方式虽然实现了数据接入,但通常会产生数分钟的延迟,并且带来了额外的文件管理开销。
Snowpipe Streaming 则颠覆了这一模式,其核心优势包括:
- 基于数据行传输:直接发送数据行,无需封装成文件。
- 超低延迟:数据进入系统后,几秒钟内即可用于查询和分析。
- 有序传输:通过偏移令牌机制,确保数据的“恰好一次”交付,避免数据重复或丢失。
硬件与软件准备
要复现这个项目,你需要准备以下硬件:
- 树莓派(4代或5代均可)。
- 气象传感器:BME280(采集温度、湿度、气压)或 BME680(额外支持气体电阻)。
- 树莓派上需安装 Python 3.8 或更高版本。
项目的核心逻辑集中在 weather_main.py 文件中,其工作流如下:
- 传感器初始化:脚本连接树莓派的 I2C 总线,读取传感器的原始读数。
- 身份认证:通过 JWT(密钥对认证)与 Snowflake 建立安全连接。这种方式特别适合无人值守的自动化脚本,无需手动输入密码。
- 数据缓存与发送:脚本不会逐条发送数据(那样效率太低),而是会根据可配置的批次大小缓存记录,并每隔数秒将缓存数据批量写入 Snowflake。
核心配置与运行
运行项目的命令非常简单,你可以通过参数控制数据传输的吞吐量:
python3 weather_main.py --batch-size 100 --interval 10.0 --fast
--batch-size:累计采集多少条气象数据后,执行一次批量发送。
--interval:传感器采集数据的时间间隔(单位:秒)。
--fast:性能优化标识,开启后可提升传感器数据读取速度。
Snowflake 平台配置
在运行 Python 脚本之前,需要在 Snowflake 平台上完成基础配置。项目仓库中的 build.sql 文件包含了完整的数据库模式定义,核心步骤如下:
创建用户与角色
CREATE ROLE IF NOT EXISTS WEATHER_ROLE;
CREATE USER IF NOT EXISTS RPI_BOT DEFAULT_ROLE = WEATHER_ROLE;
配置公钥认证
在树莓派上生成 RSA 密钥对,并将公钥绑定到创建的用户上:
ALTER USER RPI_BOT SET RSA_PUBLIC_KEY='<your_public_key>';
创建数据表、视图与数据管道
创建用于存储数据的主表,以及便于查询的视图和负责数据摄入的管道:
create or replace TABLE DEMO.DEMO.WEATHER_DATA (
UUID VARCHAR(16777216),
ROWID VARCHAR(16777216),
HOSTNAME VARCHAR(16777216),
HOST VARCHAR(16777216),
IPADDRESS VARCHAR(16777216),
MACADDRESS VARCHAR(16777216),
SYSTEMTIME VARCHAR(16777216),
STARTTIME VARCHAR(16777216),
ENDTIME VARCHAR(16777216),
TS NUMBER(38,0),
TE VARCHAR(16777216),
RUNTIME NUMBER(38,0),
CPUTEMPF NUMBER(38,0),
CPU FLOAT,
MEMORY FLOAT,
DISKUSAGE VARCHAR(16777216),
TEMPERATURE FLOAT,
HUMIDITY FLOAT,
PRESSURE FLOAT,
DEVICETEMPERATURE FLOAT,
DEWPOINT FLOAT,
LUX FLOAT,
DATETIMESTAMP TIMESTAMP_NTZ(9)
);
CREATE OR REPLACE VIEW DEMO.DEMO.WEATHER_LATEST AS
SELECT
hostname,
temperature,
humidity,
pressure,
lux,
dewpoint,
devicetemperature,
systemtime,
ts
FROM DEMO.DEMO.WEATHER_DATA
QUALIFY ROW_NUMBER() OVER (PARTITION BY hostname ORDER BY ts DESC) = 1;
CREATE OR REPLACE VIEW DEMO.DEMO.WEATHER_RECENT AS
SELECT *
FROM DEMO.DEMO.WEATHER_DATA
WHERE ts >= DATE_PART(epoch_second, DATEADD(hour, -1, CURRENT_TIMESTAMP()));
SHOW TABLES LIKE 'WEATHER_DATA' IN SCHEMA DEMO.DEMO;
DESC TABLE DEMO.DEMO.WEATHER_DATA;
-- Check grants
SHOW GRANTS ON TABLE DEMO.DEMO.WEATHER_DATA;
CREATE OR REPLACE PIPE WEATHER_SENSOR_PIPE
COMMENT = 'Snowpipe Streaming v2 pipe for Raspberry Pi Weather HAT sensor data'
AS
COPY INTO WEATHER_DATA (
uuid,
rowid,
hostname,
host,
ipaddress,
macaddress,
systemtime,
starttime,
endtime,
ts,
te,
runtime,
cputempf,
cpu,
memory,
diskusage,
temperature,
humidity,
pressure,
lux,
devicetemperature,
dewpoint
)
FROM (
SELECT
$1:uuid::VARCHAR as uuid,
$1:rowid::VARCHAR as rowid,
$1:hostname::VARCHAR as hostname,
$1:host::VARCHAR as host,
$1:ipaddress::VARCHAR as ipaddress,
$1:macaddress::VARCHAR as macaddress,
$1:systemtime::VARCHAR as systemtime,
$1:starttime::VARCHAR as starttime,
$1:endtime::VARCHAR as endtime,
$1:ts::NUMBER as ts,
$1:te::VARCHAR as te,
$1:runtime::NUMBER as runtime,
$1:cputempf::NUMBER as cputempf,
$1:cpu::FLOAT as cpu,
$1:memory::FLOAT as memory,
$1:diskusage::VARCHAR as diskusage,
$1:temperature::FLOAT as temperature,
$1:humidity::FLOAT as humidity,
$1:pressure::FLOAT as pressure,
$1:lux::FLOAT as lux,
$1:devicetemperature::FLOAT as devicetemperature,
$1:dewpoint::FLOAT as dewpoint
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
describe pipe WEATHER_SENSOR_PIPE;
GRANT OPERATE ON PIPE WEATHER_SENSOR_PIPE TO ROLE ACCOUNTADMIN;
GRANT MONITOR ON PIPE WEATHER_SENSOR_PIPE TO ROLE ACCOUNTADMIN;
-- ============================================================================
-- Sample Queries
-- ============================================================================
select * from weather_data order by systemtime desc;
select count(*) from weather_data;
-- View latest reading from each device
SELECT * FROM DEMO.DEMO.WEATHER_LATEST;
-- View recent weather data
SELECT
hostname,
temperature,
humidity,
pressure,
lux,
systemtime
FROM DEMO.DEMO.WEATHER_RECENT
ORDER BY ts DESC
LIMIT 100;
-- Average readings per minute
SELECT
DATE_TRUNC('minute', TO_TIMESTAMP(ts)) as minute,
hostname,
AVG(temperature) as avg_temp_f,
AVG(humidity) as avg_humidity_pct,
AVG(pressure) as avg_pressure_hpa,
AVG(lux) as avg_lux,
COUNT(*) as reading_count
FROM DEMO.DEMO.WEATHER_DATA
WHERE ts >= UNIX_TIMESTAMP(DATEADD(hour, -24, CURRENT_TIMESTAMP()))
GROUP BY 1, 2
ORDER BY 1 DESC, 2;
-- Temperature trend (hourly)
SELECT
DATE_TRUNC('hour', TO_TIMESTAMP(ts)) as hour,
hostname,
MIN(temperature) as min_temp,
AVG(temperature) as avg_temp,
MAX(temperature) as max_temp,
AVG(humidity) as avg_humidity,
AVG(pressure) as avg_pressure
FROM DEMO.DEMO.WEATHER_DATA
WHERE ts >= UNIX_TIMESTAMP(DATEADD(day, -7, CURRENT_TIMESTAMP()))
GROUP BY 1, 2
ORDER BY 1 DESC, 2;
项目运行步骤
- 克隆项目代码仓库
git clone https://github.com/tspannhw/RPIWeatherStreaming.git
cd RPIWeatherStreaming
- 安装项目依赖
你需要安装 Snowflake SDK 和传感器库:
pip3 install snowflake-ingest bme680
- 配置认证信息
更新 snowflake_config.json 文件,填入你的 Snowflake 账户 ID、用户名以及私钥文件路径(例如 rsa_key.p8)。
- 启动项目
python3 weather_main.py --batch-size 100 --interval 5.0
- 查看运行日志
启动成功后,日志中会出现“Channel Opened”(通道已打开)的提示,后续持续显示的“Offset Commits”(偏移量提交成功)则表明数据正在正常传输。
2025-12-09 14:43:41,725 [INFO] __main__ - PRODUCTION MODE: Real Weather HAT + Snowpipe Streaming REST API only
2025-12-09 14:43:41,725 [INFO] __main__ - ======================================================================
2025-12-09 14:43:41,725 [INFO] __main__ - Weather HAT Streaming Application - PRODUCTION MODE
2025-12-09 14:43:41,725 [INFO] __main__ - Raspberry Pi Weather HAT -> Snowflake via Snowpipe Streaming v2
2025-12-09 14:43:41,725 [INFO] __main__ - ======================================================================
2025-12-09 14:43:41,726 [INFO] __main__ - PRODUCTION CONFIGURATION:
2025-12-09 14:43:41,726 [INFO] __main__ - - Real Weather HAT sensor data ONLY
2025-12-09 14:43:41,726 [INFO] __main__ - - Snowpipe Streaming high-speed REST API ONLY
2025-12-09 14:43:41,726 [INFO] __main__ - - Asynchronous sensor reading for maximum performance
2025-12-09 14:43:41,726 [INFO] __main__ - ======================================================================
2025-12-09 14:43:41,726 [INFO] __main__ - Initializing REAL Weather HAT...
2025-12-09 14:43:41,726 [INFO] weather_sensor - PRODUCTION MODE: Real sensors required and enforced
2025-12-09 14:43:41,819 [INFO] weather_sensor - Weather HAT sensor initialized
2025-12-09 14:43:41,819 [INFO] weather_sensor - Weather HAT initialized successfully
2025-12-09 14:43:41,819 [INFO] weather_sensor - [OK] Sensor verification passed - Weather HAT available
2025-12-09 14:43:41,820 [INFO] weather_sensor - Weather sensor update loop started
2025-12-09 14:43:41,820 [INFO] weather_sensor - Background weather sensor update thread started (updates every 5 seconds)
2025-12-09 14:43:41,821 [INFO] __main__ - Initializing Snowpipe Streaming REST API client...
数据分析
脚本运行后,你可以随时登录 Snowflake 的 Snowsight 界面,对实时流入的气象数据进行查询分析。一个基础的查询示例如下:
SELECT
timestamp,
temperature,
humidity
FROM weather_data
ORDER BY timestamp DESC
LIMIT 10;
基于这个持续更新的数据集,你还可以实现更多高级功能:例如,使用 Streamlit 构建实时数据仪表盘,通过 Snowflake Cortex 进行机器学习驱动的异常检测,或者在 Snowflake 平台内直接配置数据告警规则。这为大数据分析和智能数据与云应用提供了坚实的基础。
总结
RPIWeatherStreaming 项目生动地展示了现代数据流技术的门槛已显著降低。你不再需要搭建企业级的 Kafka 集群或部署庞大的 ETL 服务器。仅需一台几十美元的树莓派,配合 Snowpipe Streaming 开发工具包,就能在短时间内构建出一套稳定、高效的实时数据管道。对于希望涉足物联网与Python数据流处理的开发者而言,这是一个极佳的实践起点。
项目完整源码地址:https://github.com/tspannhw/RPIWeatherStreaming
欢迎在云栈社区分享你的实践心得或提出技术问题,与更多开发者交流物联网与数据工程的实战经验。