1
NXzCH8fP20468ML5 207 天前 via Android
duckdb 值得拥有
|
2
dcsuibian 207 天前
扔数据库不行吗?
|
3
opengps 207 天前
能想到的只有数据库
|
4
buaasoftdavid 207 天前
内存里搞个哈希表,一行一行读 csv ,哈希表碰撞了就扔掉该行,没碰撞就插入哈希表再写到磁盘
|
5
52boobs 207 天前 via Android
表的结构是怎样的,有天然的主键吗
|
6
kneo 207 天前 via Android 1
行数是多少?平均行长是多少?
去重是应该基于整行文本还是列内容?比如 1.0 和 1 是否应该算做重复? 每行前缀重复度是否够高?是否有某列( XXID )可以用于快速去重? 机器性能如何?内存有多大? |
7
securityCoding 207 天前 via Android
spark 干的活?
|
8
drymonfidelia OP |
9
phrack 207 天前 via iPhone
光就这点信息说个屁呢,一行 8 个字符,是几千亿行,一行 1M 字符,是几百万行,这能一样吗?
内存也不说,4KB 内存和 4GB 内存能一样吗? |
10
drymonfidelia OP @phrack 8 楼补充了
|
11
rrfeng 207 天前
去重后大概会有多少行知道吗
|
12
lifanxi 207 天前 2
遍历整个文件,每行都 hash 一下,把 hash 存到一个高性能 KV 里,不重复就输出当前行,重复就跳过当前行。
|
13
drymonfidelia OP @rrfeng 不知道
|
14
cndenis 207 天前
如果不是要求严格不能丢数据的话, 可以用布隆过滤器去重, 误判率有公式可以算的, 有几十 GB 级别内存的话, 误判率应该比较低的
|
15
phrack 207 天前 via iPhone
开启 zram ,256g 可以当作 512g 没问题。
sha1sum 一个占用 20 字节,200 亿差不多占用 372g ,没问题。 极低概率去掉非重复行,几乎可以忽略。 |
16
rrfeng 207 天前
那就 bloomfilter 先过一遍看看情况。
要硬算的话,就分块排序,排完序就好处理了。排序前记录下序号,最后还原一下顺序。 |
17
hbcolorful 207 天前
redis 的布隆过滤器可以考虑下
|
18
NXzCH8fP20468ML5 207 天前 via Android
@drymonfidelia 看错了,还以为是 6GB 的 csv 文件在线处理呢,那确实不适合 duckdb 。
还是上 spark 吧,硬盘配大点就行。 203 亿行 csv 有那么大吗,我们每天备份全量的 17 亿行信息,保留几十天,用 orc 存储,也就几百 G 。 |
19
kneo 207 天前 via Android
感觉可以试试 clickhouse 。
|
20
yinmin 207 天前 via iPhone
使用 apache spark ,用 python 的 PySpark 库试试,具体可以问 gpt-4
|
21
NXzCH8fP20468ML5 207 天前 via Android 1
1. hash
2. 加序号 3. 按照 hash 分区 4. 逐个处理分区 5. 分区内排序 6. 分区外归并排序 只有单机的话,可以考虑用 duckdb ,多机就用 spark 吧。 |
22
yangxin0 207 天前
分治:
1 、用空间换时间(计算) 2 、用时间(计算)换空间 针对( 1 )有 spark 集群很快的,如果预算有限那么方法( 2 ): 1 、把数据分成 N 块,并针对 N 块内进行去重 2 、从 n 块中取一块,和剩下的 n-1 块去重,取这一块建立 hash or map 都可以,n-1 按照顺序读取 3 、从剩下的 n-1 块中又进行步骤( 2 ), 直到 n=0 4 、经过上述思路处理的 csv 就包含重复 |
23
caola 207 天前
直接存入 kvrocks (硬盘版 redis)
|
24
dacapoday 207 天前
单文件这么大,文件系统压力也不小吧。多数文件系统单文件也不支持这么大吧
|
25
james122333 207 天前 via Android
sed 有往下查找一样内容行并删除的工具都可以 其它的都要内存或硬盘空间 vim 就差在它开启文件要暂存 不然也可以
|
26
YTMartian 207 天前
磁盘够用的话,先外部排序,然后直接读取,忽略与上一条相同的数据就行了吧,随机读取文件指定位置,也不用加载进内存
|
27
dode 207 天前
按顺序处理,依据一个合适长度的前缀做分区,逐行文本进行处理,写入到对应分区下面。
检索特定行文本,是否在对应分区内存在,不存在则写入,存在就返回已存在。 |
28
chen7897499 207 天前 via Android
emeditor
|
29
chen7897499 207 天前 1
|
30
NotLongNil 207 天前
上面有人提到的 Bloom Filter 应该是相对最优的解法了,实现简单,占用内存低,速度也快。唯一的问题就是要选择合适的长度,将错误率降低,这需要一定的算法知识,不过现在可以问 AI 了,让 AI 给出公式
|
31
ClericPy 207 天前
有点像 map reduce 场景,归并加快排
问 AI 是好办法 |
32
msg7086 207 天前 2
我能想到的两种不同的做法。
第一种,在内存不足的情况下,放弃掉内存,直接用 SSD 读写。 在 SSD 上开一个数据库(比如 MySQL 或者 Postgres ),把已经存在的 hash 写到数据库里。 然后流式扫描每一行,取 hash 比对数据库,如果存在 hash 就跳过,不存在就写到结果集里并添加到数据库。 要快速稳妥可以用两种不同的 hash ,比如 xxHash 做一次过滤,SHA1 做二次检验。 第二种,在内存不足的情况下,分批处理。 多次流式扫描每一行,取 hash ,每次只处理 hash 第一个 hex 字符相同的那些数据。 第一次只索引和处理 sha1hash[0] == '0',第二次只索引和处理'1',这样可以把内存需求降到 1/16 ,缺点是 hash 计算也会是 16 倍。 稍微优化一下的话,可以在第一次遍历的时候在数据上追加 sha1hash[0]作为分区标记,这样后面 15 次就不会重复计算,缺点是会每行多一两个字节,而且要多写入一次磁盘。 |
33
esee 207 天前 via Android
什么叫保级顺序?比如在一百万的位置和 200 万的位置有重复项,则删除后面重复的那个是么。然后能提供多大的内存和硬盘,
|
34
wxf666 207 天前
想到个方法,预计耗时:10 小时,准确率:100% 剔除重复行。
## 简单流程 1. 分块排序。 2. 同时循环每块,删掉非首次出现的重复行。 3. 分别循环每块,按行号顺序,输出未被删掉的行。 ## 详细流程 1. 分块 240GB 文件,每块排序后,写入固态。同时保存每行长度+原始偏移量(约 (240 << 30) / 335 * 8 / 1024 ^ 3 = 5.7 GB )。 2. 利用小根堆,流式排序(按 <string, offset> 排)所有分块每一行。非首次出现行,保存该行偏移量,到相应块的删除名单里。 3. 循环每块,排序原始偏移量、删除名单,按序输出(未被删除的)相应行,至最终文件。 ## 耗时计算 1. 顺序读写:9 小时( 3 次顺序读,2 次顺序写,假设都为 1GB/s )。 2. 内存字符串排序:< 1 小时(实测轻薄本 i5-8250U ,每秒归并排序 200W 次 335 长度的随机字符串,约 6900W 次比较)。 - 多线程快排/归并:`(每块行数 = (240 << 30) / 335) * log2(每块行数) * 块数 = 6017 亿` 次比较,我的轻薄本 8 线程需 0.3 小时。 - 单线程小根堆:`202e8 * log2(块数 = 6.2 * 1024 / 240 = 26.5) * 2 = 1910 亿` 次比较,需 0.7 小时。 |
35
wxf666 207 天前 1
34 楼纠正下数据,实测轻薄本 i5-8250U ,1.5 秒归并排序 320W 个 336 长度的随机字符串,约 6500W 次比较。
- 多线程快排/归并:总计 6017 亿次比较,我的轻薄本 8 线程需 0.5 小时。 - 单线程小根堆:总计 1910 亿次比较,单线程需 1.2 小时。 差不太远。。 |
36
wxf666 207 天前
|
37
wxf666 207 天前
@cndenis #14 ,@hbcolorful #17 ,@NotLongNil #30:
用布隆过滤,几十 GB 好像不够。 在线算了下,50 GB + 15 函数,都会有 1 / 25000 概率出错。。 250 GB + 11 函数,算完 203 亿行,才能有 83.8% 的概率,一个不出错? @phrack #15: 压缩内存,来存 hash ?好像真的可行。。 平均而言,写入 (372 << 30) / 4096 = 1 亿次,就会占满 372 GB 内存页。即,几乎一开始就会启用 zram ? 我在别处看了看,lz4 每秒能有 200W 次 IO ,算下来要 2.8 小时即可? 话说,这个和 Bloom Filter 相比,哪个出错概率小呢? |
39
hobochen 207 天前
大概想了一下,一定深度的前缀树,叶子节点是哈希表或者平衡树存原来的行号应该是一个可行的方案。
|
40
hobochen 207 天前
更正:叶子节点应当是一个哈希表/平衡树; k 是哈希值,v 是行号
|
41
noqwerty 207 天前 via iPhone
可以试试 polars streaming + file sink ? https://www.rhosignal.com/posts/streaming-in-polars/
|
44
tonywangcn 207 天前
@wxf666
你的计算好像不对吧 n = 20300000000 p = 0.000000001 (1 in 999925223) m = 875595082773 (101.93GiB) k = 30 https://hur.st/bloomfilter/?n=20300000000&p=1.0E-9&m=&k= |
45
acapla 207 天前
不限时间的话:
for (i = 1; i < N; i++) for (j = 0; j < i; j++) if line[i] == line[j]: skip line[i] |
46
HanashirodotETH 207 天前
分块 - 用 96 位哈希编号,去重,排序 - 多路归并
但是也要大概 240G 内存。 |
47
wxf666 207 天前
@dingwen07 #38 是的。37 楼有提及到,用多少哈希函数。
@dode #42 《写入到对应分区下面》这个是缓存尽可能多的文本(如 1GB ),再写入,是吗? 《检索特定行文本,是否在对应分区内存在》这个是如何做到,顺序读的呢? @tonywangcn #44 平均每十亿条,就误认为一次,某行是重复行,导致丢失该行? 那你要问 @drymonfidelia 愿不愿意丢失几十行数据了。。 |
49
wxf666 207 天前
36 37 楼,好像没 @ 成功。。再试一下。。
@opengps #3 @msg7086 #32 如果数据库,每秒写入 10W 条,总计要 203e8 / 1e5 / 3600 = 56 小时? @hbcolorful #17 @NotLongNil #30 用布隆过滤,几十 GB 好像不够。 在线算了下,50 GiB + 15 函数,都会有 1 / 26000 概率出错,大约丢失 80W 行数据? 250 GiB + 11 函数,算完 203 亿行,才能有 83.8% 的概率,不丢失任何数据,也不保留任何重复行? |
50
hguangzhen 207 天前
惊了~ 竟然没人提到 RocksDB 吗? 本地的文件型 KV 存储库,内置 bloomfilter ,磁盘空间够用,应该很简单的
|
51
mayli 207 天前
如果你只是想最简单的解法(不考虑最高效率或者多机并发)可以试试 sort+uniq
sort 是可以排序比内存大的文件的: https://vkundeti.blogspot.com/2008/03/tech-algorithmic-details-of-unix-sort.html 然后排序后的 uniq 是不怎么吃内存 不过我看有个需求是要保持文件顺序的话,你可以用 uniq --repeated 来找到重复行,如果你重复行不多,那搞个脚本直接过滤一遍源文件就好,也是线性的。 |
52
Kaiv2 207 天前
1. 先计原始文件 a.txt 算每一行 hash 保存到 hash.txt 文件
2. 复制一份 hash.txt -> hash-2.txt 用于去重计算 3. 取 hash-2.txt 文件中 10000(这个数根据内存大小预估) 个 hash 前 8 位不重复 hash_array_8 4. 重复的的写入 hash-4.txt, 剩于的写入 hash-2.1.txt -> hash-2.txt , 循环处理直到 hash-2.txt 没有记录 ```txt let limit = 10000; // 控制内存使用 let hash_array_8 = []; let cache_line = [] for(let h_line: read_line(hash_2.txt)) { if(hash_array_8.size < limit) { if(!hash_array_8.has(h_line.sub(8))) { hash_array_8.add(h_line.sub(8)) } } if(hash_array_8.has(h_line.sub(8))) { if(cache_line.has(h_line)) { write(hash-4.txt); } else { cache_line.add(h_line); } } else { write(hash-2.1.txt); } } mv(hash-2.1.txt, hash-2.txt) ``` 5. 得到 hash.txt 跟文件一一对应,hash-4.txt 是重复的记录 6. hash-4.txt (如果重复的不多)直接读取到内存,对应读取 a.txt, hash.txt 每一行,比较 hash 重复跳过,不重复写入 b.txt 没有考虑过计算量,内存不够可以考虑试试这个办法 |
53
Kaiv2 207 天前
@Kaiv2 写着写着写成了单机的,这么做多此一举,太蠢了。。。应该是 分 hash-3.1 .. n.txt 多个机器同时处理,然后合并重复数据 hash-4.1..n.txt
|
54
drymonfidelia OP |
55
msg7086 207 天前
@wxf666 #35 上 TB 的数据怎么处理都是会很慢的。(一秒 10w 条数据可能到不了)
我建议用第三方数据库纯粹是因为这样对实现的要求最低,不需要你搞大内存服务器,不需要自己开发复杂的算法,全部用已知的成熟的方案,你只要插上一堆 SSD 然后干别的事就行了,等个几天数据就都跑完了。算法简单所以要根据需求修改起来也简单,可维护性也好。(用人话说就是,工程师不需要加班,让服务器加班就行。) 现实当中从 SSD 读取数据到内存也是要花时间的,这么大的量级还要跑前后依赖的操作,我是觉得快不起来。 (如果能并行 map reduce 倒是能快不少,但这里不太行。) |
56
lmshl 207 天前 1
版本答案:RocksDB (与其他 leveldb family 产品)
解析:203 亿个 bloomfilter 在 p=0.01 下所需的内存空间约为 23.75GB 。实际上,去重所需的空间会少于 203 亿,所以在这个内存空间下,实际 p 值将进一步降低。 大部分人可能对 bloomfilter 的使用存在误解,他们只考虑在只有 bloomfilter 单一算法存在的前提下来解决需求,这显然是错误的。现代数据库对 bloomfilter 的应用主要是用来降低 miss key 对磁盘 IO 的影响。如果 bloomfilter 认为这个 key 没有出现过,那么这个 key 确实没有出现过。当 bloomfilter 认为它可能出现过,那么出现的概率为 1-p ,此时需要回表二次确认(磁盘 IO )。 假设一个典型的重复度为 10 倍的 200 亿数据表文件,在这个空间下,p 值会低至 1e-20 。 那么对这个文件去重,总共会发生 200 亿次内存 bloomfilter 读取,20 亿次 bloomfilter 写入+磁盘顺序写入,以及 180 亿次磁盘随机读取。(考虑到数据库对磁盘的批量写入优化,sstable/memtable 这个数值将会被巨幅降低) 假设一个重复度为 0.1 倍的 200 亿数据表文件,在这个空间下,p 值变化不大。 那么对这个文件去重,总共发生 200 亿次内存 bloomfilter 读取,180 亿次 bloomfilter 写入+磁盘顺序写入,以及 20 亿次磁盘随机读取。(同上) 根据网上其他人做的吞吐量测试,rocksdb 在现代硬件条件下可以稳定达到 10k*rows/s 以上的写入性能,或>1GB/s 的写入吞吐量。乐观地估计,6.2TB 的文件应该能在 2 小时到 2 天左右完成去重。 |
57
cabbage 207 天前 via Android
@wxf666 布隆过滤器单独用确实不行,免不了假阳性,即便事后检查那也逃不了随机读,不是稳定的方法。但如果纯粹作为首次顺序读的过滤器来用,应该还不错,可以降低一些输入数据量。
to 34 楼:其实没看懂第二步的堆排序怎么回事,是说在一次排序中对所有行进行排序并去重吗?如果这样的话,那比较的时候是不是还需要在内存里保留所有原始 string ?来讨教下,乐意的话可否点明一二? |
58
harmless 207 天前 via iPhone
203 亿行全部计算成 hash ,再加上行号,大概 700 多 G
1. 遍历每行,计算出 hash ,按 hash 第一位将 hash 和行号写入不同的文件,例如 hash 第一位为 0 ,则写入 hash_0.txt ,这样一共会有 16 个文件,每个文件大概 40 多 G 2. 分别对每个文件按 hash 排序,找出重复的需要删除的行号,记录到文件中 3. 遍历原始文件,删除需要删除的行 |
59
lrjia 207 天前
先 hash ,按照 hash 前缀分块成多个文件,使分块后单块的大小可以放入内存。再对每块使用 hash 表去重。最后合并多个文件,用归并排序的做法。这中间应该都是文件的顺序读写。
|
60
cloudzhou 207 天前 1
布隆过滤器,申请硬盘 db 空间作为布隆过滤器存储,按位标记,只要空间足够大,冲突就很小
在布隆过滤器冲突情况下,将冲突部分,存入到其他人提到的某种 kv db ,然后排除重复处理 |
61
wxf666 207 天前 1
@phrack #15
突然想起来,zram 可能行不通。。 sha1 的结果,是不是相当于随机数据?随机数据,压缩不了啥吧。。 即,256 GB 内存,当不了 512 GB 用?全都用来存压缩比 100% 后的数据了? @cabbage #57 需要保留 27 行原始 string ,在小根堆里。 第二步目的:检查出各块中,偏移量非最小的重复行,记录进删除名单中。 第二步时,已经有 26.5 个 240GB 的、排序好的块。 参考多路归并,可以流式构造出有序的 (string, offset, chunk_index)。 当 string 与上一个不同时,说明碰到偏移量最小的新行了(即,全文首次出现)。 当 string 与上一个相同时,说明重复行了,此时往 "to_del_${chunk_index}.txt" 里记录 offset 。 (可以攒多点再写,反正只用了 27 个字符串 + 27 * 1GB 缓冲区,还剩 200+GB 内存呢。。) 以前写过类似的,10+ MB 内存去重 13 GB 文件,里面也有用到多路归并:/t/1031275#reply15 |
62
conan257 207 天前
学习下
|
63
haha1903 207 天前
hyperloglog
|
64
peterxulove 206 天前
对每一行进行哈希,哈希后的哈希值每个位置的值为一个叶子结点建立二叉树,哈希值的位数就是二叉树的层级,判断的时候遍历二叉树即可。
|
65
XuYijie 206 天前 via Android
把数据读到数据库,然后分批处理,或者使用 easyExcel 分批读取,处理后把处理后的数据导出到一个新的 csv
|
66
winiex 206 天前
以行顺序读内容,记录一个每行的 hash 值,发生碰撞抛弃当前行
|
67
IwfWcf 206 天前
如果只是要去重那分块排序就好了,如果要保留原有顺序的话那前面有人提到的 bloom filter 命中的情况下再去数据库查询确认应该是最优方案了
|
68
qdwang 206 天前
203 亿行,每一行做 BLAKE3 ,按照 bit 为 1 的和,来进行分类,共分 256 类。
平均每类大小 20300000000 * 256 / 8 / 1024 / 1024 / 1024 / 256 = 2.36G 存成 256 个分类文件,根据你可用内存大小,载入对应数量的分类在内存里作为 cache 。未命中,就随机内存里去除一个,替换成新的。也很容易做成分布式,每台机器管理一定数量的分类。 由于 BLAKE3 目标是 128bits 安全,所以你可以缩小到 128bit 使用,256g 内存电脑可以装得下 210 个分类在 cache 里。 |
69
qdwang 206 天前
说错了,BLAKE3 使用 128bit 后,每个分类仍然是 2.36g ,但是分类数量减小到 128 个。
|
70
vivisidea 206 天前
1. 计算每一行的 hash , 保存到文件里 hashes.txt
2. 对 hashes.txt 进行外部排序,外部排序对内存要求低,压力给到磁盘 3. 遍历 hashes.txt 找出重复,因为已经有序,所以简单对比当前行和上一行即可知道是否重复 |
71
Hawthorne 206 天前
不能用内存映射吗?
|
73
psyer 206 天前 via Android
pyspark 效果如何?😁
|
74
qinrui 206 天前
emeditor 直接打开处理
|
75
cocalrush 206 天前
直接用阿里云的 odps 生成个离线表跑下是不是就行了...
|
76
Keuin 206 天前 1
awk 每行尾追加逗号和行号,整个文件每个行都追加一下,占 6.2T
unix sort 工具外排序,直接按字母表排序,占 6.2T 。重复行会变成相邻的,编号不一样。输出另占 6.2T 用 awk 配合 uniq ,去重,全内存 O(1)空间算法,输出占 6.2T ,即为最终结果 中间文件可以在不用的时候删掉,最大同时出现 2 份,也就是需要额外 2*6.2T 磁盘空间,由于都是流式算法,内存用量为很小的常数 |
78
qbmiller 206 天前
再来一个文件 csvA. 然后这 2 文件 ,双指针遍历. A 放不重复的,
基于顺序的话,是可以的 |
79
blankmiss 206 天前
或者你给个脱敏德样本数据出来看
|
80
cndenis 206 天前
@wxf666 如果需要严格不能丢数据的话, 不能单用布隆过滤器.
假设重复率比较低的的话,, 可以做两轮读取 第一轮边读边构造布隆过滤器, 把发现的冲突的行记录到数据库 第二轮先把数据库中值导入新的布隆过滤器, 然后用它来过滤原表, 对有冲突的行查用数据库确证没重复再输出 |
81
Dream95 206 天前
为啥关注点都在去重上面,就按照 MapReduce 的思路归并排序,再去重不就很好
|
82
zhuangzhuang1988 206 天前
直接 sqlite 试试
```python import sqlite3 with sqlite3.connect("_temp.db") as conn: c = conn.cursor() c.execute( """ CREATE TABLE kv ( line TEXT UNIQUE );""" ) with open("xxx.csv", "r", encoding="utf8") as fin: with open("xxx1.csv", "w", encoding="utf8") as fout: for line in fin: line = line.strip() if line: r = c.execute( "insert into kv(line) select :line where :line not in (select line from kv)", {"line": line}, ) if r.rowcount > 0: fout.write(line + "\n") ``` |
83
ltux 206 天前
不能全部加载到内存,那没法用哈希表去重。
简单地归并排序,再顺序读取去重就完了。归并排序是稳定排序,可以保持行原来的顺序,也适合用于对超出内存限制的数据排序。 结果就是 gnu sort 就完了-_-,加 --unique 选项一步到位。 sort --stable --unique --output=OUTPUT.csv INPUT.csv 根据你 cpu 核心数量 N 加个 --parallel=N 选项就完 |
84
lsk569937453 206 天前
@zhuangzhuang1988 帮你试过了,一秒处理 600 行,202 亿行大概要处理 389 天。
|
85
chutianyao 206 天前
203 亿行,逐行 hash, 假设 hash256, 单个值占用内存 32 字节, 203 亿行差不多试用内存 604G
1. 逐行读取并进行 hash 2. 使用 hash 值构建前缀树 3. 对每一行的哈希值,有两种情况: 1) 前缀树中已经存在, 说明哈希值重复, 该行重复了. 操作: 直接忽略本行,读取并处理下一行 2) 前缀树中不存在, 说明行不重复. 操作: 新建文件 result.csv, 将该行追加到 result.csv 中, 再处理下一行 关键点: 1.所有行的哈希值占用空间 604G, 内存才 256G 无法直接存储; 使用硬盘存储后续逐行比对查找的性能太差, 所以这里使用前缀树来存储, 减少相同前缀的哈希值使用的内存空间.(具体能节省多少内存,取决于哈希值/文本行的重复比例, 极端情况 203 亿行都不重复的情况下, 前缀树估计也会把内存耗尽?) 2.发现重复行,不直接从原文件中删除, 而是新建文件保存结果. 目的是使用追加写文件的形式、减少随机读写文件造成的性能磁盘 io 损耗 |
86
MoYi123 206 天前
感觉很多人不懂“保持原先顺序是什么意思”
是[1,3,2,3] -> [1,3,2] 而不是[1,3,2,3] -> [1,2,3] |
87
lsk569937453 206 天前
@chutianyao 1.系统的最大内存是 256G ,当所有的行都不相同的时候会占用 604G 啊。你不会假定有多少重复吧。。。。
|
88
forty 206 天前
学到了 1 个新知识: 布隆过滤器
感谢大家! OP 的这个数据量,用哈希表也足够处理了。也可以先布隆一遍,找出一定不存在重复的,再用哈希排查不确定是否重复的。 化整为零,先用哈希进行分类,再在分类内部进行除重(省内存,时间换空间)。 用普通的编程语言,普通的 PC 即可,不依赖其他数据软件。 203 亿 介于 2^34 与 2^35 (2 的 35 次方) 之间,按 2^35 算,因此 35 比特就能表示行号,可以给它 5 个字节。 用哈希进行分类,分多少个类就写多少个文件,只记录 MD5 和行号。 全部分类文件都写完之后,依次载入 1 个分类文件到内存,用哈希表除重,输出哈希重复(应删除的行)的行号,问题就基本解决了。 如果分 65536 个类,则每个分类下约有 50 多万个数据,每个分类文件约 10MB 。 如果分 256 个类,则每个分类下约有 8 千万个数据,每个分类文件约 1.6GB ,老 PC 也能干。 如果分 16 个类,则每个分类下约有 13 亿个数据,每个分类文件约 26GB ,现在的普通 PC 都可以胜任。 如果强迫症觉得可能有哈希冲突,那就可以再加 1 个不同的哈希算法,对这个数量级来说是基本不用考虑 MD5 冲突的。 |
89
chutianyao 206 天前
@lsk569937453 所以我说了嘛,看行重复比例. 同时哈希值前缀相同, 也能节省一些内存吧.
这个方案只是存在一定可行性,但不保证 |
90
lttzzlll 206 天前
cat input.csv | sort | uniq > out.csv
力大出奇迹。优化阻碍发展。你先试试再说。大不了机器崩了呗。 |
91
sampeng 206 天前
这么点数据就要 spark ,mr 了?
楼上很多说的没错啊。。你倒是试试 sort |uniq 之后看看结果啊。慢是肯定慢,但是试试不比你纠结强。。 ===== rocksdb 是一个解决方案。但如果不想上东西自己算也不是不行。自己构建结构体和硬盘文件内映射关系。hash 一定要在内存里面才能对比?在文件里面就不行么。现在都是 ssd ,随机读取没啥吧。 我猛的一想就是 1.hash 直接建在硬盘上。每次对比用 seed 偏移来查找。这种业务使用最好别用布隆,毕竟不是近似求结果。而是最终求结果。 2.6T 文件。内存里只建一个够 N 条的 hash 。先读 N 条。计算 N 条里的没有重复的。保存到文件 a 。然后一直递归下去。得到 n 个小文件。然后问题就变成了 n 个小文件去重的问题。内存大,就把第一个文件读出来,去其他文件一个一个比。以此递归处理。当然,连小文件都不需要,自己规划好数据结构把 6T 文件看成 n 个小文件也是一个逻辑。这个逻辑下哪怕 1G 内存也能算出来。就看时间了。 |
92
sampeng 206 天前
我也是想多了。。哪有这么复杂。
读一条。算 hash 。然后读下一条,算下一条的 hash 。相同就扔掉。。。没有相同的就写到另一个文件里面去。一个递归好像就完事了。这应该也是 sort|uniq 的逻辑。只需要内存 (每行 byte *2 )。这就是纯粹比 ssd 的速度。想加速就是利用 cpu 的并行运算搞搞分块就好了。 |
94
james122333 206 天前 via Android
|
95
james122333 206 天前 via Android
|
96
LieEar 205 天前
6.20TB 的超大 csv ,难以置信。
我觉得可以试试 DuckDB |
97
kuagura 205 天前 via Android
换一个 10T 内存的机器 采购 你都有得赚,项目结束再卖掉
|
98
psyer 205 天前 via Android
看到这么多讨论,给出方案,有没有人给一个切实可行的代码?实践是检验真理的唯一标准。
|
99
wxf666 202 天前
C++ 新人,写个去重练练手。
- 结果:2.50 GB 文本( 900 万行,336 字/行),1GB 内存限制,6 秒保持顺序地去重完毕。 - 硬件:七年前 i5-8250U 轻薄本,读写在内存盘中(读 8G/s ,写 3G/s ,1000 元 2TB 固态都能有的速度,不过分吧?) - 预计:4 小时能去重完毕 6.20TB ? 新人刚学会写,可能还有诸多不足之处。 写的过程中,还有很多优化点没写。比如: 1. 排序时,子范围太小,转为其他排序方式。 2. 读写文件,用的默认缓冲区大小( 4K ? 16K ?不知道多大,估计很小。。) 3. 分块时,可以去除重复行,减少稍后读写数据量。 继续改进点: - 转用 hash 去重,大幅减少硬盘读写数据量。 - 只是要承担极小概率重复风险。但 Git 也在用这种方式。。 - 实在不行,发现重复 hash 时,再去读原文件完整比较。 ## 截图 ## 代码 ```c++ // V 站吞空格,缩进改为全角空格了 #include <queue> #include <vector> #include <thread> #include <cstring> #include <sstream> #include <fstream> #include <iostream> #include <algorithm> #include <stdexcept> #include <filesystem> #include <string_view> #include <fcntl.h> #include <unistd.h> #include <sys/mman.h> #include <sys/stat.h> using std::ios; using std::vector, std::string_view; using std::to_string, std::ofstream; namespace fs = std::filesystem; int max_thread = 8; size_t max_memory = 1ull << 30; const auto tmpDir = fs::temp_directory_path(); struct Meta { ptrdiff_t offset; size_t length; friend ofstream& operator<< (ofstream& ofs, const Meta& self) { ofs.write(reinterpret_cast<const char*>(&self), sizeof self); return ofs; } }; struct Line { int chunkIdx{}; ptrdiff_t offset{}; string_view str{}; auto operator> (const Line& other) { return std::tie(str, chunkIdx, offset) > std::tie(other.str, other.chunkIdx, other.offset); } }; template <class T = char> class MappedFile { int fd = -1; const T* ptr{}; public: const T* data{}; size_t size{}; explicit MappedFile(const fs::path& file) { struct stat64 fs{}; fd = open64(file.c_str(), O_RDONLY); if (fd != -1 && fstat64(fd, &fs) != -1) { size = static_cast<size_t>(fs.st_size) / sizeof(T); data = ptr = static_cast<T*>(mmap64(nullptr, fs.st_size, PROT_READ, MAP_SHARED, fd, 0)); } } MappedFile(const MappedFile& other) = delete; MappedFile(MappedFile&& old) noexcept: fd(old.fd), ptr(old.ptr), data(old.data), size(old.size) { old.fd = -1; old.ptr = old.data = nullptr; } ~MappedFile() { if (data) munmap(const_cast<T*>(data), size * sizeof(T)); if (fd != -1) close(fd); } auto end() const { return data + size; } operator const T*&() { return ptr; } }; template <class Iter> void mergeSort(Iter* src, Iter* dst, size_t len, int max_thread = 1, int id = 1) { if (id == 1) std::copy_n(src, len, dst); if (len > 1) { std::thread t; size_t half = len / 2; if (id < max_thread) // 只在左子树开启新线程 t = std::thread(mergeSort<Iter>, dst, src, half, max_thread, id * 2); else mergeSort(dst, src, half, max_thread, id * 2); mergeSort(dst + half, src + half, len - half, max_thread, id * 2 + 1); if (t.joinable()) t.join(); std::merge(src, src + half, src + half, src + len, dst); } } // 步骤 1:分块,返回块数 int step1_SplitChunks(const fs::path& inFile) { // 映射源文件 MappedFile text {inFile}; if (!text) throw std::runtime_error("无法打开输入文件"); // 分块,直到源文件结束 int chunkCount = 0; for (auto chunkBegin = +text; (chunkBegin = text) < text.end();) { // 不断记录行,直到(此次遍历过的源文件大小 + 行数据数组大小 * 2 )到达内存限制 vector<string_view> lines, sortedLines; while (text < text.end() && (text - chunkBegin + sizeof(string_view) * lines.size() * 2) < max_memory) { auto lineEnd = (char*) std::memchr(text, '\n', text.end() - text); auto lineLen = (lineEnd ? lineEnd : text.end()) - text; lines.emplace_back(text, lineLen); text += lineLen + 1; } // 准备写入(排序后)分块、行数据。 ofstream chunkFile (tmpDir / (to_string(chunkCount) + ".txt"), ios::binary | ios::trunc); ofstream metaFile (tmpDir / (to_string(chunkCount) + ".meta"), ios::binary | ios::trunc); chunkCount++; // 多线程排序行数组 sortedLines.resize(lines.size()); mergeSort(lines.data(), sortedLines.data(), lines.size(), max_thread); // 保存(排序后)每行文本、偏移、长度 for (auto line: sortedLines) { chunkFile << line; metaFile << Meta{line.data() - chunkBegin, line.size()}; } // 检查 if (!chunkFile || !metaFile) { std::stringstream buf; buf << "写入第 " << chunkCount << " 分块时出错!"; throw std::runtime_error(buf.str()); } } return chunkCount; } // 步骤 2:查找重复行 void step2_FindDupLines(int chunkCount) { vector<ofstream> chunkDups; vector<MappedFile<>> chunkText; vector<MappedFile<Meta>> chunkMeta; std::priority_queue<Line, vector<Line>, std::greater<>> lines; // 映射所有分块的文本、行数据文件, // 也准备好记录各分块重复行数据的文件 for (int idx = 0; idx < chunkCount; idx++) { chunkText.emplace_back(tmpDir / (to_string(idx) + ".txt")); chunkMeta.emplace_back(tmpDir / (to_string(idx) + ".meta")); chunkDups.emplace_back(tmpDir / (to_string(idx) + ".dups"), ios::binary | ios::trunc); lines.push({idx}); } // 利用小根堆,按(行内容,分块号,偏移量)顺序,流式多路归并 string_view last{}; while (!lines.empty()) { // 与上一行相同,则将偏移量写入,对应分块待删除行名单内 auto line = lines.top(); lines.pop(); if (last == line.str && !last.empty()) chunkDups[line.chunkIdx].write((char*)&line.offset, sizeof line.offset); last = line.str; // 该分块行数据未遍历完,则继续将下一行添加进小根堆中 auto& text = chunkText[line.chunkIdx]; auto& meta = chunkMeta[line.chunkIdx]; if (meta < meta.end()) { lines.push({line.chunkIdx, (*meta).offset, {text, (*meta).length}}); text += (*meta).length; meta++; } } // 检查 for (auto&& file: chunkDups) { if (!file) { std::stringstream buf; buf << "保存第 " << chunkCount << " 分块删除名单时出错!"; throw std::runtime_error(buf.str()); } } } // 步骤 3:合并分块 void step3_MergeChunks(int chunkCount, const fs::path& outFile) { ofstream textOut {outFile, ios::binary | ios::trunc}; if (!textOut) throw std::runtime_error("无法打开输出文件"); for (int idx = 0; idx < chunkCount; idx++) { // 映射分块(排序后)文本、行数据、删除名单 MappedFile<> text {tmpDir / (to_string(idx) + ".txt")}; MappedFile<Meta> meta {tmpDir / (to_string(idx) + ".meta")}; MappedFile<decltype(Meta::offset)> dups {tmpDir / (to_string(idx) + ".dups")}; // 剔除删除名单中的行 vector<Line> lines; lines.reserve(meta.size); for (; meta < meta.end(); text += (*meta++).length) { if (dups < dups.end() && *dups == (*meta).offset) dups++; else lines.push_back({idx, (*meta).offset, {text, (*meta).length}}); } // 再按偏移量顺序排序好 std::sort(lines.begin(), lines.end(), [](auto&& a, auto&& b) { return a.offset < b.offset; }); // 逐行输出 for (auto&& line: lines) textOut << line.str << '\n'; } // 检查 if (!textOut) throw std::runtime_error("写入输出文件时出错!"); } int main(int argc, const char* argv[]) { if (argc < 3) { std::stringstream buf; buf << "大文本去重并保持顺序工具\n\n" << "用法:" << argv[0] << " 输入文件 输出文件 " << "[内存限制 MB = " << (max_memory >> 20) << "] " << "[线程限制 = " << max_thread << "]"; std::cerr << buf.str() << std::endl; return -1; } auto inFile = argv[1]; auto outFile = argv[2]; if (argc > 3) max_memory = (std::max)(std::stoull(argv[3]), 1ull) << 20ull; if (argc > 4) max_thread = (std::max)((std::min)(std::stoi(argv[4]), 256), 1); auto chunkCount = step1_SplitChunks(inFile); step2_FindDupLines(chunkCount); step3_MergeChunks(chunkCount, outFile); // 清空临时文件 for (int i = 0; i < chunkCount; i++) for (auto&& suffix: {".txt", ".meta", ".dups"}) fs::remove(tmpDir / (to_string(i) + suffix)); } ``` |
100
Keuin 201 天前
```shell
awk '{print $0","NR}' input.csv | sort | sed -E 's/,[0-9]+$//' | uniq ``` Example usage: ``` $ cat input 1,2,3,4 2,3,4,5 3,4,5,6 4,5,6,7 2,3,4,5 1,2,3,4 5,6,7,8 $ awk '{print $0","NR}' input 1,2,3,4,1 2,3,4,5,2 3,4,5,6,3 4,5,6,7,4 2,3,4,5,5 1,2,3,4,6 5,6,7,8,7 $ awk '{print $0","NR}' input | sort 1,2,3,4,1 1,2,3,4,6 2,3,4,5,2 2,3,4,5,5 3,4,5,6,3 4,5,6,7,4 5,6,7,8,7 $ awk '{print $0","NR}' input | sort | sed -E 's/,[0-9]+$//' 1,2,3,4 1,2,3,4 2,3,4,5 2,3,4,5 3,4,5,6 4,5,6,7 5,6,7,8 $ awk '{print $0","NR}' input | sort | sed -E 's/,[0-9]+$//' | uniq 1,2,3,4 2,3,4,5 3,4,5,6 4,5,6,7 5,6,7,8 ``` 不管你的电脑内存是 1T 还是 1G ,都可以正确运行并得到相同输出,因为 sort 命令用的是归并排序,是外存算法。如果你要限制用到的内存大小,把 sort 改成 sort --buffer-size=100M ,即可限制只用 100M 内存,其他命令都是行缓存算法,只会保存当前行在内存里,也就是说,最大内存用量是 max(100M, max_line_size_bytes) |