找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

3936

积分

0

好友

540

主题
发表于 前天 06:15 | 查看: 15| 回复: 0

那天晚上我下班前临时起意,说要给测试环境跑个全量报表,结果就被CSV教育了一晚上。

你们有没有遇到过那种 data_2024_01_01.csvdata_2024_01_02.csv 一路排到 data_2024_12_31.csv 的场景?运维同事一脸无辜:“也不大啊,就几十G而已。”我电脑风扇直接起飞,VSCode卡到像远程桌面套远程桌面。

我当时代码还挺老实的,大概这样:

import pandas as pd

df_list = []
for day in range(1, 32):
    file_name = f"data_2024_01_{day:02d}.csv"
    tmp = pd.read_csv(file_name)
    df_list.append(tmp)

df = pd.concat(df_list, ignore_index=True)
print(df.shape)

结果嘛,大家也能想象,内存直接爆掉,笔记本开始“呼——”地狂转。我那一瞬间甚至怀疑人生:我不就想算个UV吗,至于搞成离线大数据项目吗?

后来同事路过,看了眼说:“哥,你这年头还玩CSV呢?用Parquet不香吗?”我当时心里还有点不服,不就一个文件格式嘛,至于吹成这样?然后我真换了一下…就,再见了兄弟CSV,你好我亲爱的Parquet。

直观对比:更小、更快

先说个最直观的感受:同样那批数据,CSV差不多10G,转成Parquet之后不到2G。我当时还有点不信,以为是我少写了几列,来回check schema半天。结果发现不是数据少,是它列式存储+压缩太狠了。

大概就这么一段小脚本,跑完我心情都变好了:

import pandas as pd

csv_path = "data_2024_01_full.csv"
parquet_path = "data_2024_01_full.parquet"

# 读一次痛苦,换来以后一直舒服
df = pd.read_csv(csv_path)

# 注意:engine=“pyarrow” 要装 pyarrow,不要问怎么知道的…
df.to_parquet(parquet_path, engine="pyarrow", compression="snappy")

压完以后我做了个小实验,同一台破电脑上:

  • read_csv 首次加载:十几秒起步,CPU飙满
  • read_parquet:眨眼功夫就出来了,CPU也是动一下就结束那种

那一刻我有种“原来以前一直在用错姿势”的羞耻感。

列式存储:按需读取,效率翻倍

更关键的是,Parquet是列式存储。这意味着你要几列,它只给你读取那几列,不像CSV那种必须先把整行数据都读进来再说。

以前用CSV算个简单的统计,比如只要 user_idevent_time,你也得乖乖把几十列全读进来:

df = pd.read_csv("data.csv")  # 啥都得读

换用 Python 的pandas操作Parquet之后,我的代码变成这样:

import pandas as pd

df = pd.read_parquet(
    "data.parquet",
    columns=["user_id", "event_time"],  # 只要这两列,其他列一个字节都不看
    engine="pyarrow"
)

这个 columns 参数一加,你会发现:以前那个读数据要喝口水等一等的脚本,现在“咔咔”一下就跑完了。最惊喜的是,我那台8G内存的旧本子,终于不用天天被我抱怨了——问题根本不在它,是我当初选了CSV这个低效的方案。

自带Schema:数据更“安全”

说完快和轻,再说个“安全”的事。这里的安全不是指加密,而是“数据类型别给我乱来”的那种安全感。

CSV最大的问题之一,就是什么类型都能写进去——你说它是int,它说自己是string,最后只能靠程序去猜和处理。有一次业务给了我一个CSV,理论上 amount 那列都是数字,我直接:

df = pd.read_csv("order.csv")
print(df["amount"].mean())

结果平均值直接给我算出一堆NaN。一看数据,中间混进来几个 “--”“未知” 这种非数值。我真是想顺着网线去敲业务脑袋。

换Parquet之后呢,文件本身是带schema的,写入时类型就确定了:

from pyarrow import schema, int64, string

order_schema = schema([
    ("order_id", string()),
    ("user_id", string()),
    ("amount", int64()),
    ("status", string()),
])

然后你往里塞数据,如果 amount 列来了个 “未知”,pyarrow会当场报错:类型对不上。虽然一开始增加了一点严格性,但你会发现,这能让脏数据在写入阶段就被挡在门外,比那种跑到线上计算才发现字段类型错误的崩溃体验好太多了。

谓词下推:查询性能的“魔法”

还有个特别爽的点,我当时是在一个埋点系统里用Parquet的。埋点事件字段繁多,什么 page, button, os_version,加着加着就几十列上百列了。我们经常会遇到这种需求:产品突然说:“能不能看一下上个月安卓用户,在订单页点了某个按钮的点击分布?”

如果用CSV,大概操作是:读全量CSV(几十G),再用 df[df['os'] == 'android' & (df['page'] == 'order') & ...] 硬生生用算法给机器凿出一条路来。

换成Parquet,并利用其谓词下推(Predicate Pushdown)特性,操作就高效多了:

import pyarrow.dataset as ds

dataset = ds.dataset("events_parquet_dir", format="parquet")

android_order = dataset.to_table(
    filter=(
        (ds.field("os") == "android") &
        (ds.field("page") == "order")
    ),
    columns=["user_id", "button_id", "event_time"]
)

这里的关键细节是:filter 这个条件,会尽量在“扫描磁盘数据之前”就生效,也就是所谓的“将过滤条件下推到数据源”。翻译成人话就是:以前是“先把整仓库里的箱子都扛出来,再数里面有几个苹果”;现在是“进仓库前就跟管理员说:我只要苹果箱子”。这就是为什么同样是筛选数据,Parquet会比CSV快得多的一个重要原因,这也是现代大数据处理引擎的常见优化手段。

本地调试:从此变得轻松

再说个很接地气的场景:本地调试。以前我为了在本地复现线上Bug,只能让运维帮我切一份“缩小版CSV”下来,什么“抽样1%”啊、“只要最近7天”啊,搞完还得验证这样本是不是有代表性。

现在我直接从线上Hive或对象存储拉Parquet到本地,十几G的数据压缩后可能就几G,然后我在本地随手整两句:

import pandas as pd

df = pd.read_parquet("events_7d.parquet", engine="pyarrow")

sample = (
    df.query("os == 'android' and page == 'order'")
      .sample(n=5000, random_state=42)
)

sample.to_parquet("debug_sample.parquet", index=False)

以后谁再让我用CSV当调试样本,我会非常真诚地建议他试试Parquet,这真是从血泪教训里总结出来的效率提升。

注意事项与适用场景

当然Parquet也不是没有注意事项,简单提两句:

  • 你需要安装 pyarrowfastparquet 库,有时在特定环境下的编译可能会有点折腾。
  • 文本编辑器直接打开“看一眼”的那种爽感没了(毕竟是二进制格式)。
  • 和一些非常古旧的系统联动时,对方可能只认CSV。

但只要你的场景符合:数据量不小 + 以Python等语言处理为主 + 经常需要进行分析/统计/筛选,那我真的强烈建议你试一次。人生苦短,别再和CSV硬刚大数据了。

我的新工作流

我现在新项目的习惯基本定型了:

  1. 线上流水数据落地就存为Parquet。
  2. 临时需要给别人导出报表时,再临时转成一次性的CSV发给他。
  3. 内部所有的分析脚本、离线任务,统统只读取Parquet格式。

那天我清理老项目的时候,看到一堆 data_xxx.csv,手一抖,写了个小脚本全给它们“转正”了:

import pandas as pd
from pathlib import Path

data_dir = Path("legacy_csv")

for csv_file in data_dir.glob("*.csv"):
    df = pd.read_csv(csv_file)
    parquet_file = csv_file.with_suffix(".parquet")
    df.to_parquet(parquet_file, engine="pyarrow", compression="snappy")
    print(f"migrate {csv_file.name} -> {parquet_file.name}")

命令敲完,看着终端里一行行 migrate xxx.csv -> xxx.parquet,那种感觉就像给老旧系统做完一次“换心手术”。性能、稳定性和数据质量,从此可以交给更现代的方案来负责了。如果你也在为数据处理的效率问题头疼,不妨来云栈社区看看其他开发者的实战经验,或许能有新的启发。




上一篇:42岁程序员被裁获40万赔偿,该不该请散伙饭?附贪吃蛇算法避坑指南
下一篇:程序员求职避坑指南:盘点国内大厂面试那些“卷”点
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-3-10 11:55 , Processed in 0.520179 second(s), 42 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表