关于一个经典海量数据的问题

231 天前
 ComplexPug

2 个 G 的字符串,找出出现次数最多的 100 个。感觉对搜索到的答案倒不是很理解。 比如

把 2G 的文件进行分割成大小为 512KB 小文件,总共得到 2048 个小文件,避免一次性读入整个文件造成内存不足。 定义一个长度为 2048 的 hash 表数组,用来统计每个小文件中单词出现的频率。 使用多线程并行遍历 2048 个小文件,针对每个单词进行 hash 取模运算分别存储到长度为 2048 的 hash 表数组中 inthash=Math.abs(word.hashCode() %hashTableSize);hashTables[hash].merge(word, 1, Integer::sum); 接着再遍历这 2048 个 hash 表,把频率前 100 的单词存入小顶堆中 最后,小顶堆中最终得到的 100 个单词,就是 top 100 了。 这种解决方案的核心思想是将大文件分割为多个小文件,然后采用分治和堆的算法,来解决这个问题。

如果是多线程使用 hashmap ,内存中仅仅去除了重复字符串吧。

还有一种使用字典树的,但出现频次很低的时候内存占用也不小。

还有一种是按照 hash(word)%2048 来保存小文件,但不适用于单词频次高使得小文件变大。

还有一种是把大文件直接拆分成小文件,然后对每个文件求 topK ,最后求整个的 topK 。这个思路虽然时间没问题,但能保证 topK 的正确性吗。如果按照普通的小顶堆 topk 算法,使用的是出现次数,但这里因为没用 hash 分组,的一个字符的出现次数是分开的,并不准确吧,只能保证大概率正确。

我的想法是要想保证强正确性且不能保证数据安全,按照 hash(word)%2048 保存文件,小文件使用 hashmap ,大文件使用字典树。最后对所有文件的频次使用 topk 算法。

问过别人但交流了半天他没懂,想问一下我对这个问题的理解对吗,或者有什么好的解决方案。

1721 次点击
所在节点    程序员
18 条回复
jorneyr
230 天前
1. 统计每个字符串出现的次数。
2. TopK 算法,就是个小顶堆。
dusu
230 天前
安全方面细分下来应该是 topK 这个 K 取值的问题
misdake
230 天前
我感觉吧,应该是全局总共只有 2048 个 bucket ,依靠 word 的 hash 来 index ,每个 bucket 里是 hashmap 维护这个 hash 的那些单词的词频。
说白了就是类似于一个大的 concurrent hashmap 的实现,分组加锁或分组搞无锁队列来支持多线程更新。每个小文件读完,就遍历小文件的词频累加。
整个 2048bucket 结构对外暴露的其实是单个的 hashmap<word, 频数>。最后一步是这个 hashmap 遍历 kv 搞 topk 。
misdake
230 天前
@misdake 分布式的话就是每台机器只负责某些 bucket ,最后一步每个机器自己先求 topk ,然后汇总。
ComplexPug
230 天前
@misdake 那确实肯定速度快,但可能面试的时候限制你内存大小,或者让你筛选 1000G 的 top100 ,不过分布式肯定还能处理。
wxf666
230 天前
@ComplexPug 有些极端情况,咋办呢?

1. 只有俩 500GB 的不同的字符串,但它们 hash 相同。
2. 所有字符串,都只出现一次。
ComplexPug
229 天前
@wxf666 第一个问题,没有特别长的字符串,问题里串长度不会很长(确实没说清楚),第二个问题的话直接按照 hash 分组就好了,然后每组 map 统计次数之后进行 topk 算法,因为 hash 映射的很均匀,每组基本都一样大
wxf666
229 天前
@ComplexPug #7 关于第二个问题:

1. 直观上说,每个字符串都只出现一次,最后取哪 100 个字符串呢?

2. 假如说,题目保证肯定有 100 个字符串,出现次数最多。

那如果,我这样构造原始文件呢:

你分割的前 2047 个小文件里,所有字符串都只出现一次,
在第 2048 个文件里,100 个字符串出现两次。

你在构造哈希表,甚至每个分组统计 TopK 时,会不会要在内存里,保留几乎整个原始文件的字符串了?

那内存会不会爆炸了。。
ComplexPug
228 天前
@wxf666 1.因为这个问题本质是按照出现次数排序,出现次数都一样,那肯定还要制定进一步的规则(比如出现次数相同的优先保留字典序小的),如果不特意指定规则,只能随便取 100 个吧。不过这个问题暂且不用考虑。保证出现多的严格在小的前面即可。2.每组 hashmap 统计出现次数保留前 100 大的。保留 KV 对,而不是 10 个字符串保留十个
wxf666
228 天前
@ComplexPug #9

假设你是单线程,计算完第一个文件里,每个字符串次数后,

你要对那 2048 长度的哈希表,做什么呢?


1. 啥也不干,继续算第二个文件?

(若原文件,每个字符串只出现一次,那你不就相当于在内存里,保留整个原文件了吗。。)


2. 遍历哈希表的,2048 个子哈希表,丢弃 100 名外的?

(若原文件,每个字符串出现两次,唯独一个字符串,出现 2048 次,但分散在每个分割后的文件里。岂不是会被你,每次遍历完一个文件后,丢弃掉?)


3. 我理解错了,等待你补充。。
ComplexPug
228 天前
@wxf666 没太懂你说的“2048 长度的哈希表”是什么,2048 应该是小文件个数,2048 长度是什么。还有你这个分组是直接切分的吗。首先你的 1 肯定是不对的,其次你的 2 可能是这个意思,感觉领悟到你的意思了。

你是觉得 2 这个做法是有问题的吗,我知道你这种构造方法不对。我发这个帖子就是询问这个问题。而且你这个做法我在贴里说明了,是有正确率问题的。不知道你懂我意思了吗感觉(感觉确实是我的表达能力的问题🥲)。

我在原文里对这种做法的评价是:并不准确吧,只能保证大概率正确。
wxf666
228 天前
@ComplexPug #11

1. 噢,说错了,是 2048 长度的哈希表《数组》。

2. 感觉你是想说,计算完一个文件后,就分别把 2048 个哈希表里的频率统计,追加到 2048 个文件里?

计算完所有文件后,再挨个频率文件,计算总频率,且始终只保留频率前 100 的字符串?

感觉这方法是准确的,但极端情况下,频率文件 > 内存限制时,会爆内存。

比如,所有字符串只出现一次,原文件大小 / 2048 > 内存限制了。。

或者,被恶意构造字符串了,使得所有 str.hash() % 2048 后,都挤在同一个文件内。。
ComplexPug
228 天前
@wxf666 我回答你比如后面的两个例子吧,第一个例子,按照 hash%2048 之后,一组一组的统计频率(单线程的),当前这组统计完后遍历一下,放到小顶堆(只有一个,不是一组一个)里面,堆大小不超过 100 ,最后答案就是堆里面的 100 个串。一组做完之后释放内存,所以同时只有一个组和小顶堆的内存。对于第二个例子,恶意 hash 吗,我觉得卡 hash 还是比较难的,hash 保证了每组分到的串的种类一样多,是种类不是频率,所以有的组会比较大(就是因为某些个串出现的次数太多了),但是你保存的是文件,文件大一点没关系,内存不是很大就可以了,内存和 map 有关,map 存的是 kv 对,频率很大也是一样的内存。

等我有空下午写个伪代码。


感觉确实我确实对帖子正文理解的有问题。直接 hash%2048 分组按照我这个评论就是对的,内存占用比较小的解法。
ComplexPug
227 天前
@wxf666
伪代码,不是 python
```code
BLOCK_SIZE = 2048
TOP_N = 100
def fun(goal_file):
files f[BLOCK_SIZE] = empty;
for str in goal_file:
f[str.hash()%BLOCK_SIZE].append(str)

heap<(value,count)> q = empty;# (ordered by count,top is smallest)
for i in range(BLOCK_SIZE):
hashmap mp = empty
for str in f[i]:
mp[str] += 1
for value,count in mp:
if q.size() > TOP_N:
if q.top().count > count:
q.pop()
q.push((value,count))
else:
q.push((value,count))
mp.clear()
return q.key()
```
wxf666
224 天前
@ComplexPug #14 我用 Lua 写了个(可限制内存)词频统计,并和 DuckDB 比了下速度,感觉还行。


1. 测试数据

[知乎回答]( https://www.zhihu.com/answer/1906560411 ) 里,分享的《英文维基百科(仅文本版)》,并简单用脚本预处理了下(去标点、每词一行、转小写)。

解压后:13.33 GB
处理后:12.84 GB ,23 亿词(不重复的有 854W )


2. 测试结果

- DuckDB (不限制内存):4 分半( 1.0X )
- Lua(最多存100M文本):7 分半( 1.6X ),01 个共 103MB 临时文件
- Lua(最多存 10M文本):8 分钟( 1.7X ),23 个共 306MB 临时文件


3. 测试硬件:i5-8250U 低压 CPU 轻薄本。。


4. 运行截图




5. 预处理脚本

```shell
unzip -p enwiki_text.zip | tr -cs "[:alnum:]'-" '\n' | tr [:upper:] [:lower:]
```


6. Lua 脚本( V 站吞空格,所以将行首缩进,都转为全角空格了)

```lua
#!luajit
local TOP_NUM = 1000
local MAX_SIZE = tonumber(arg[1] or 16) * 2 ^ 20

local dict = {}
local dict_size = 0
local files = {}
local words = {}
local word = ''
local freq = 0

function heap_swap(heap, comp, lhs, rhs, func, next)
  if comp(heap[lhs], heap[rhs]) then
   heap[lhs], heap[rhs] = heap[rhs], heap[lhs]
   func(heap, comp, next)
  end
end

function heap_up(heap, comp, idx)
  if idx > 1 then
   local pa = math.floor(idx / 2)
   heap_swap(heap, comp, idx, pa, heap_up, pa)
  end
end

function heap_down(heap, comp, idx)
  if idx <= #heap / 2 then
   local left, right = idx * 2, idx * 2 + 1
   local son = right > #heap and left or
   (comp(heap[left], heap[right]) and left or right)
   heap_swap(heap, comp, son, idx, heap_down, son)
  end
end

function heap_push(heap, comp, val)
  table.insert(heap, val)
  heap_up(heap, comp, #heap)
end

function heap_pop(heap, comp)
  heap[1] = heap[#heap]
  table.remove(heap)
  heap_down(heap, comp, 1)
end

function heap_adjust(heap, comp, idx)
  heap_down(heap, comp, idx)
  heap_up(heap, comp, idx)
end

function comp(lhs, rhs)
  return lhs[1] < rhs[1]
end

function sorted_keys(dict)
  local keys = {}
  for k in pairs(dict) do
   table.insert(keys, k)
  end
  table.sort(keys)
  return keys
end

function push_word()
  if #word > 0 then
   if #words < TOP_NUM then
    heap_push(words, comp, {freq, word})
   elseif freq > words[1][1] then
    words[1] = {freq, word}
    heap_adjust(words, comp, 1)
   end
  end
end

function try_save_dict(new_key)
  dict_size = dict_size + (new_key and #new_key or 0)
  if not new_key or dict_size > MAX_SIZE then
   local file = io.tmpfile()
   for idx, key in ipairs(sorted_keys(dict)) do
    file:write(dict[key], ' ', key, '\n')
   end
   dict = {}
   file:seek('set')
   dict_size = new_key and #new_key
   table.insert(files, {'', 0, file})
  end
end

for word in io.lines() do
  dict[word] = (dict[word] or try_save_dict(word) or 0) + 1
end
try_save_dict()

while #files > 0 do
  local file, read_ok = files[1]
  if word == file[1] then
   freq = freq + file[2]
  else
   push_word()
   word, freq = file[1], file[2]
  end
  file[2], read_ok, file[1] = file[3]:read('n', 1, 'l');
 (read_ok and heap_adjust or heap_pop)(files, comp, 1)
end
push_word()

table.sort(words, comp)
for i = #words, 1, -1 do
  io.write(string.format('%d %s\n', words[i][1], words[i][2]))
end
```
ComplexPug
222 天前
明天看看
ComplexPug
222 天前
看不懂 lua😵,尤其是那个 while 。不过看着像是没按照 hash 分,是直接读取,hash 里面的<K,V>太多就保存起来。那你怎么合并的不同组的相同的 K 的,比如一组里面有<K,2>,另外一组有<K,3>,怎么合并到一起。
ComplexPug
222 天前
不过看你评测,实际跑起来这个算法还是很不错的嘛。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/1031275

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX