要对单个 6.20TB 的超大 csv 文件保持顺序的情况下进行去除重复行,有什么好思路?显然不可能加载进内存

34 天前
 drymonfidelia
9008 次点击
所在节点    程序员
101 条回复
Dream95
33 天前
为啥关注点都在去重上面,就按照 MapReduce 的思路归并排序,再去重不就很好
zhuangzhuang1988
33 天前
直接 sqlite 试试

```python
import sqlite3

with sqlite3.connect("_temp.db") as conn:
c = conn.cursor()
c.execute(
"""
CREATE TABLE kv (
line TEXT UNIQUE
);"""
)
with open("xxx.csv", "r", encoding="utf8") as fin:
with open("xxx1.csv", "w", encoding="utf8") as fout:
for line in fin:
line = line.strip()
if line:
r = c.execute(
"insert into kv(line) select :line where :line not in (select line from kv)",
{"line": line},
)
if r.rowcount > 0:
fout.write(line + "\n")
```
ltux
32 天前
不能全部加载到内存,那没法用哈希表去重。
简单地归并排序,再顺序读取去重就完了。归并排序是稳定排序,可以保持行原来的顺序,也适合用于对超出内存限制的数据排序。
结果就是 gnu sort 就完了-_-,加 --unique 选项一步到位。

sort --stable --unique --output=OUTPUT.csv INPUT.csv
根据你 cpu 核心数量 N 加个 --parallel=N 选项就完
lsk569937453
32 天前
@zhuangzhuang1988 帮你试过了,一秒处理 600 行,202 亿行大概要处理 389 天。
chutianyao
32 天前
203 亿行,逐行 hash, 假设 hash256, 单个值占用内存 32 字节, 203 亿行差不多试用内存 604G

1. 逐行读取并进行 hash
2. 使用 hash 值构建前缀树
3. 对每一行的哈希值,有两种情况:
1) 前缀树中已经存在, 说明哈希值重复, 该行重复了. 操作: 直接忽略本行,读取并处理下一行
2) 前缀树中不存在, 说明行不重复. 操作: 新建文件 result.csv, 将该行追加到 result.csv 中, 再处理下一行

关键点:
1.所有行的哈希值占用空间 604G, 内存才 256G 无法直接存储; 使用硬盘存储后续逐行比对查找的性能太差, 所以这里使用前缀树来存储, 减少相同前缀的哈希值使用的内存空间.(具体能节省多少内存,取决于哈希值/文本行的重复比例, 极端情况 203 亿行都不重复的情况下, 前缀树估计也会把内存耗尽?)
2.发现重复行,不直接从原文件中删除, 而是新建文件保存结果. 目的是使用追加写文件的形式、减少随机读写文件造成的性能磁盘 io 损耗
MoYi123
32 天前
感觉很多人不懂“保持原先顺序是什么意思”

是[1,3,2,3] -> [1,3,2]
而不是[1,3,2,3] -> [1,2,3]
lsk569937453
32 天前
@chutianyao 1.系统的最大内存是 256G ,当所有的行都不相同的时候会占用 604G 啊。你不会假定有多少重复吧。。。。
forty
32 天前
学到了 1 个新知识: 布隆过滤器
感谢大家!

OP 的这个数据量,用哈希表也足够处理了。也可以先布隆一遍,找出一定不存在重复的,再用哈希排查不确定是否重复的。

化整为零,先用哈希进行分类,再在分类内部进行除重(省内存,时间换空间)。

用普通的编程语言,普通的 PC 即可,不依赖其他数据软件。

203 亿 介于 2^34 与 2^35 (2 的 35 次方) 之间,按 2^35 算,因此 35 比特就能表示行号,可以给它 5 个字节。

用哈希进行分类,分多少个类就写多少个文件,只记录 MD5 和行号。
全部分类文件都写完之后,依次载入 1 个分类文件到内存,用哈希表除重,输出哈希重复(应删除的行)的行号,问题就基本解决了。

如果分 65536 个类,则每个分类下约有 50 多万个数据,每个分类文件约 10MB 。

如果分 256 个类,则每个分类下约有 8 千万个数据,每个分类文件约 1.6GB ,老 PC 也能干。

如果分 16 个类,则每个分类下约有 13 亿个数据,每个分类文件约 26GB ,现在的普通 PC 都可以胜任。

如果强迫症觉得可能有哈希冲突,那就可以再加 1 个不同的哈希算法,对这个数量级来说是基本不用考虑 MD5 冲突的。
chutianyao
32 天前
@lsk569937453 所以我说了嘛,看行重复比例. 同时哈希值前缀相同, 也能节省一些内存吧.
这个方案只是存在一定可行性,但不保证
lttzzlll
32 天前
cat input.csv | sort | uniq > out.csv

力大出奇迹。优化阻碍发展。你先试试再说。大不了机器崩了呗。
sampeng
32 天前
这么点数据就要 spark ,mr 了?
楼上很多说的没错啊。。你倒是试试 sort |uniq 之后看看结果啊。慢是肯定慢,但是试试不比你纠结强。。

=====

rocksdb 是一个解决方案。但如果不想上东西自己算也不是不行。自己构建结构体和硬盘文件内映射关系。hash 一定要在内存里面才能对比?在文件里面就不行么。现在都是 ssd ,随机读取没啥吧。

我猛的一想就是

1.hash 直接建在硬盘上。每次对比用 seed 偏移来查找。这种业务使用最好别用布隆,毕竟不是近似求结果。而是最终求结果。

2.6T 文件。内存里只建一个够 N 条的 hash 。先读 N 条。计算 N 条里的没有重复的。保存到文件 a 。然后一直递归下去。得到 n 个小文件。然后问题就变成了 n 个小文件去重的问题。内存大,就把第一个文件读出来,去其他文件一个一个比。以此递归处理。当然,连小文件都不需要,自己规划好数据结构把 6T 文件看成 n 个小文件也是一个逻辑。这个逻辑下哪怕 1G 内存也能算出来。就看时间了。
sampeng
32 天前
我也是想多了。。哪有这么复杂。

读一条。算 hash 。然后读下一条,算下一条的 hash 。相同就扔掉。。。没有相同的就写到另一个文件里面去。一个递归好像就完事了。这应该也是 sort|uniq 的逻辑。只需要内存 (每行 byte *2 )。这就是纯粹比 ssd 的速度。想加速就是利用 cpu 的并行运算搞搞分块就好了。
lttzzlll
32 天前
@lttzzlll cat input.csv | sort -u | uniq > out.csv 你试试
james122333
32 天前
@lttzzlll

还是看我讲的吧 每行往下查有重複就删除 毕竞 global 搜索删除还是挺好的 一行处理完换下一行 也顺带保证了顺序 内存与硬盘都可以占用少
james122333
32 天前
@lttzzlll

应该算是种高明的手法
LieEar
32 天前
6.20TB 的超大 csv ,难以置信。
我觉得可以试试 DuckDB
kuagura
32 天前
换一个 10T 内存的机器 采购 你都有得赚,项目结束再卖掉
psyer
31 天前
看到这么多讨论,给出方案,有没有人给一个切实可行的代码?实践是检验真理的唯一标准。
wxf666
28 天前
C++ 新人,写个去重练练手。

- 结果:2.50 GB 文本( 900 万行,336 字/行),1GB 内存限制,6 秒保持顺序地去重完毕。
- 硬件:七年前 i5-8250U 轻薄本,读写在内存盘中(读 8G/s ,写 3G/s ,1000 元 2TB 固态都能有的速度,不过分吧?)
- 预计:4 小时能去重完毕 6.20TB ?

新人刚学会写,可能还有诸多不足之处。
写的过程中,还有很多优化点没写。比如:

1. 排序时,子范围太小,转为其他排序方式。
2. 读写文件,用的默认缓冲区大小( 4K ? 16K ?不知道多大,估计很小。。)
3. 分块时,可以去除重复行,减少稍后读写数据量。

继续改进点:

- 转用 hash 去重,大幅减少硬盘读写数据量。
- 只是要承担极小概率重复风险。但 Git 也在用这种方式。。
- 实在不行,发现重复 hash 时,再去读原文件完整比较。

## 截图


## 代码

```c++
// V 站吞空格,缩进改为全角空格了

#include <queue>
#include <vector>
#include <thread>
#include <cstring>
#include <sstream>
#include <fstream>
#include <iostream>
#include <algorithm>
#include <stdexcept>
#include <filesystem>
#include <string_view>
#include <fcntl.h>
#include <unistd.h>
#include <sys/mman.h>
#include <sys/stat.h>

using std::ios;
using std::vector, std::string_view;
using std::to_string, std::ofstream;
namespace fs = std::filesystem;

int max_thread = 8;
size_t max_memory = 1ull << 30;
const auto tmpDir = fs::temp_directory_path();

struct Meta {
   ptrdiff_t offset;
   size_t length;
   friend ofstream& operator<< (ofstream& ofs, const Meta& self) {
     ofs.write(reinterpret_cast<const char*>(&self), sizeof self);
     return ofs;
  }
};

struct Line {
   int chunkIdx{};
   ptrdiff_t offset{};
   string_view str{};
   auto operator> (const Line& other) {
     return std::tie(str, chunkIdx, offset)
      > std::tie(other.str, other.chunkIdx, other.offset);
  }
};

template <class T = char>
class MappedFile {
   int fd = -1;
   const T* ptr{};
public:
   const T* data{};
   size_t size{};

   explicit MappedFile(const fs::path& file) {
     struct stat64 fs{};
     fd = open64(file.c_str(), O_RDONLY);
     if (fd != -1 && fstat64(fd, &fs) != -1) {
       size = static_cast<size_t>(fs.st_size) / sizeof(T);
       data = ptr = static_cast<T*>(mmap64(nullptr, fs.st_size, PROT_READ, MAP_SHARED, fd, 0));
    }
  }

   MappedFile(const MappedFile& other) = delete;

   MappedFile(MappedFile&& old) noexcept:
     fd(old.fd), ptr(old.ptr), data(old.data), size(old.size) {
     old.fd = -1;
     old.ptr = old.data = nullptr;
  }

  ~MappedFile() {
     if (data) munmap(const_cast<T*>(data), size * sizeof(T));
     if (fd != -1) close(fd);
  }

   auto end() const {
     return data + size;
  }

   operator const T*&() {
     return ptr;
  }
};

template <class Iter>
void mergeSort(Iter* src, Iter* dst, size_t len, int max_thread = 1, int id = 1) {
   if (id == 1)
     std::copy_n(src, len, dst);
   if (len > 1) {
     std::thread t;
     size_t half = len / 2;
     if (id < max_thread) // 只在左子树开启新线程
       t = std::thread(mergeSort<Iter>, dst, src, half, max_thread, id * 2);
     else
       mergeSort(dst, src, half, max_thread, id * 2);
     mergeSort(dst + half, src + half, len - half, max_thread, id * 2 + 1);
     if (t.joinable())
       t.join();
     std::merge(src, src + half, src + half, src + len, dst);
  }
}

// 步骤 1:分块,返回块数
int step1_SplitChunks(const fs::path& inFile) {

  // 映射源文件
   MappedFile text {inFile};
   if (!text) throw std::runtime_error("无法打开输入文件");

  // 分块,直到源文件结束
   int chunkCount = 0;
   for (auto chunkBegin = +text; (chunkBegin = text) < text.end();) {

    // 不断记录行,直到(此次遍历过的源文件大小 + 行数据数组大小 * 2 )到达内存限制
     vector<string_view> lines, sortedLines;
     while (text < text.end() && (text - chunkBegin + sizeof(string_view) * lines.size() * 2) < max_memory) {
       auto lineEnd = (char*) std::memchr(text, '\n', text.end() - text);
       auto lineLen = (lineEnd ? lineEnd : text.end()) - text;
       lines.emplace_back(text, lineLen);
       text += lineLen + 1;
    }

    // 准备写入(排序后)分块、行数据。
     ofstream chunkFile (tmpDir / (to_string(chunkCount) + ".txt"), ios::binary | ios::trunc);
     ofstream metaFile (tmpDir / (to_string(chunkCount) + ".meta"), ios::binary | ios::trunc);
     chunkCount++;

    // 多线程排序行数组
     sortedLines.resize(lines.size());
     mergeSort(lines.data(), sortedLines.data(), lines.size(), max_thread);

    // 保存(排序后)每行文本、偏移、长度
     for (auto line: sortedLines) {
       chunkFile << line;
       metaFile << Meta{line.data() - chunkBegin, line.size()};
    }

    // 检查
     if (!chunkFile || !metaFile) {
       std::stringstream buf;
       buf << "写入第 " << chunkCount << " 分块时出错!";
       throw std::runtime_error(buf.str());
    }
  }

   return chunkCount;
}

// 步骤 2:查找重复行
void step2_FindDupLines(int chunkCount) {

   vector<ofstream> chunkDups;
   vector<MappedFile<>> chunkText;
   vector<MappedFile<Meta>> chunkMeta;
   std::priority_queue<Line, vector<Line>, std::greater<>> lines;

  // 映射所有分块的文本、行数据文件,
  // 也准备好记录各分块重复行数据的文件
   for (int idx = 0; idx < chunkCount; idx++) {
     chunkText.emplace_back(tmpDir / (to_string(idx) + ".txt"));
     chunkMeta.emplace_back(tmpDir / (to_string(idx) + ".meta"));
     chunkDups.emplace_back(tmpDir / (to_string(idx) + ".dups"), ios::binary | ios::trunc);
     lines.push({idx});
  }

  // 利用小根堆,按(行内容,分块号,偏移量)顺序,流式多路归并
   string_view last{};
   while (!lines.empty()) {

    // 与上一行相同,则将偏移量写入,对应分块待删除行名单内
     auto line = lines.top(); lines.pop();
     if (last == line.str && !last.empty())
       chunkDups[line.chunkIdx].write((char*)&line.offset, sizeof line.offset);
     last = line.str;

    // 该分块行数据未遍历完,则继续将下一行添加进小根堆中
     auto& text = chunkText[line.chunkIdx];
     auto& meta = chunkMeta[line.chunkIdx];
     if (meta < meta.end()) {
       lines.push({line.chunkIdx, (*meta).offset, {text, (*meta).length}});
       text += (*meta).length;
       meta++;
    }
  }

  // 检查
   for (auto&& file: chunkDups) {
     if (!file) {
       std::stringstream buf;
       buf << "保存第 " << chunkCount << " 分块删除名单时出错!";
       throw std::runtime_error(buf.str());
    }
  }
}

// 步骤 3:合并分块
void step3_MergeChunks(int chunkCount, const fs::path& outFile) {

   ofstream textOut {outFile, ios::binary | ios::trunc};
   if (!textOut) throw std::runtime_error("无法打开输出文件");

   for (int idx = 0; idx < chunkCount; idx++) {

    // 映射分块(排序后)文本、行数据、删除名单
     MappedFile<> text {tmpDir / (to_string(idx) + ".txt")};
     MappedFile<Meta> meta {tmpDir / (to_string(idx) + ".meta")};
     MappedFile<decltype(Meta::offset)> dups {tmpDir / (to_string(idx) + ".dups")};

    // 剔除删除名单中的行
     vector<Line> lines; lines.reserve(meta.size);
     for (; meta < meta.end(); text += (*meta++).length) {
       if (dups < dups.end() && *dups == (*meta).offset)
         dups++;
       else
         lines.push_back({idx, (*meta).offset, {text, (*meta).length}});
    }

    // 再按偏移量顺序排序好
     std::sort(lines.begin(), lines.end(), [](auto&& a, auto&& b) {
       return a.offset < b.offset;
    });

    // 逐行输出
     for (auto&& line: lines)
       textOut << line.str << '\n';
  }

  // 检查
   if (!textOut)
     throw std::runtime_error("写入输出文件时出错!");
}

int main(int argc, const char* argv[]) {

   if (argc < 3) {
     std::stringstream buf;
     buf << "大文本去重并保持顺序工具\n\n"
      << "用法:" << argv[0] << " 输入文件 输出文件 "
      << "[内存限制 MB = " << (max_memory >> 20) << "] "
      << "[线程限制 = " << max_thread << "]";
     std::cerr << buf.str() << std::endl;
     return -1;
  }

   auto inFile = argv[1];
   auto outFile = argv[2];
   if (argc > 3) max_memory = (std::max)(std::stoull(argv[3]), 1ull) << 20ull;
   if (argc > 4) max_thread = (std::max)((std::min)(std::stoi(argv[4]), 256), 1);

   auto chunkCount = step1_SplitChunks(inFile);
   step2_FindDupLines(chunkCount);
   step3_MergeChunks(chunkCount, outFile);

  // 清空临时文件
   for (int i = 0; i < chunkCount; i++)
     for (auto&& suffix: {".txt", ".meta", ".dups"})
       fs::remove(tmpDir / (to_string(i) + suffix));
}
```
Keuin
28 天前
```shell
awk '{print $0","NR}' input.csv | sort | sed -E 's/,[0-9]+$//' | uniq
```

Example usage:

```
$ cat input
1,2,3,4
2,3,4,5
3,4,5,6
4,5,6,7
2,3,4,5
1,2,3,4
5,6,7,8
$ awk '{print $0","NR}' input
1,2,3,4,1
2,3,4,5,2
3,4,5,6,3
4,5,6,7,4
2,3,4,5,5
1,2,3,4,6
5,6,7,8,7
$ awk '{print $0","NR}' input | sort
1,2,3,4,1
1,2,3,4,6
2,3,4,5,2
2,3,4,5,5
3,4,5,6,3
4,5,6,7,4
5,6,7,8,7
$ awk '{print $0","NR}' input | sort | sed -E 's/,[0-9]+$//'
1,2,3,4
1,2,3,4
2,3,4,5
2,3,4,5
3,4,5,6
4,5,6,7
5,6,7,8
$ awk '{print $0","NR}' input | sort | sed -E 's/,[0-9]+$//' | uniq
1,2,3,4
2,3,4,5
3,4,5,6
4,5,6,7
5,6,7,8
```

不管你的电脑内存是 1T 还是 1G ,都可以正确运行并得到相同输出,因为 sort 命令用的是归并排序,是外存算法。如果你要限制用到的内存大小,把 sort 改成 sort --buffer-size=100M ,即可限制只用 100M 内存,其他命令都是行缓存算法,只会保存当前行在内存里,也就是说,最大内存用量是 max(100M, max_line_size_bytes)

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/1046023

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX