超大数据量排序算法

2019-05-27 14:03:12 +08:00
 allenwuli

比如我有 50G 的数据量,我要对 50G 数据进行排序。但是我的内存只有 16G,这就需要进行分段排序。是否有这样的算法? python 实现最好,请教各位大佬。

2798 次点击
所在节点    算法
4 条回复
jmc891205
2019-05-27 14:17:29 +08:00
请以「外部排序」为关键字自行 Google
allenwuli
2019-05-27 14:29:43 +08:00
@jmc891205 谢了老板
cxtrinityy
2019-05-27 14:33:23 +08:00
虽然想到了和外部排序类似的路子归并,不过卡在最后归并那,没想起归并是稳定排序,最后可以多路分别从已排序的的不同数据段取一部分逐步归并
allenwuli
2019-05-31 13:46:04 +08:00
import os
import argparse


class FileSplitter(object):
BLOCK_FILENAME_FORMAT = 'block_{0}.dat'

def __init__(self, filename):
self.filename = filename
self.block_filenames = []

def write_block(self, data, block_number):
filename = self.BLOCK_FILENAME_FORMAT.format(block_number)
with open(filename, 'w') as file:
file.write(data)
self.block_filenames.append(filename)

def get_block_filenames(self):
return self.block_filenames

def split(self, block_size, sort_key=None):
i = 0
with open(self.filename) as file:
while True:
lines = file.readlines(block_size)

if not lines:
break

if sort_key is None:

lin = [int(aaa.split("\n")[0]) for aaa in lines]
lin.sort()
lines = [str(sss) + "\n" for sss in lin]
else:
lin = [int(aaa.split("\n")[0]) for aaa in lines]
lin.sort(key=sort_key)
lines = [str(sss) + "\n" for sss in lin]

self.write_block(''.join(lines), i)
i += 1

def cleanup(self):
map(lambda f: os.remove(f), self.block_filenames)


class NWayMerge(object):
def select(self, choices):
print(choices)
min_index = -1
min_str = None
for i in choices.keys():
if min_str is None or choices[i] < min_str:
min_index = i
min_str = choices[i]
print(min_index)
return min_index


class FilesArray(object):
def __init__(self, files):
self.files = files
self.empty = set()
self.num_buffers = len(files)
self.buffers = {i: None for i in range(self.num_buffers)}

def get_dict(self):
return {i: int(self.buffers[i].split("\n")[0]) for i in range(self.num_buffers) if i not in self.empty}

def refresh(self):
for i in range(self.num_buffers):
if self.buffers[i] is None and i not in self.empty:
self.buffers[i] = self.files[i].readline()
if self.buffers[i] == '':
self.empty.add(i)
self.files[i].close()
if len(self.empty) == self.num_buffers:
return False

return True

def unshift(self, index):
value = self.buffers[index]
self.buffers[index] = None

return value


class FileMerger(object):
def __init__(self, merge_strategy):
self.merge_strategy = merge_strategy

def merge(self, filenames, outfilename, buffer_size):
buffers = FilesArray(self.get_file_handles(filenames, buffer_size))
with open(outfilename, 'w', buffer_size) as outfile:
while buffers.refresh():
min_index = self.merge_strategy.select(buffers.get_dict())
outfile.write(buffers.unshift(min_index))

def get_file_handles(self, filenames, buffer_size):
files = {}

for i in range(len(filenames)):
files[i] = open(filenames[i], 'r', buffer_size)

return files


class ExternalSort(object):
def __init__(self, block_size):
self.block_size = block_size

def sort(self, filename, sort_key=None):
num_blocks = self.get_number_blocks(filename, self.block_size)
splitter = FileSplitter(filename)
splitter.split(self.block_size, sort_key)

merger = FileMerger(NWayMerge())
buffer_size = self.block_size / (num_blocks + 1)
merger.merge(splitter.get_block_filenames(), filename + '.out', int(buffer_size))

splitter.cleanup()

def get_number_blocks(self, filename, block_size):
return (os.stat(filename).st_size / block_size) + 1


def parse_memory(string):
if string[-1].lower() == 'k':
return int(string[:-1]) * 1024
elif string[-1].lower() == 'm':
return int(string[:-1]) * 1024 * 1024
elif string[-1].lower() == 'g':
return int(string[:-1]) * 1024 * 1024 * 1024
else:
return int(string)


def main():

# parser = argparse.ArgumentParser()
# parser.add_argument('-m',
# '--mem',
# help='amount of memory to use for sorting',
# default='100M')
# parser.add_argument('filename',
# metavar='<filename>',
# nargs=1,
# help='name of file to sort')
# args = parser.parse_args()

sorter = ExternalSort(300)
sorter.sort('num1.txt')


if __name__ == '__main__':
main()

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/568016

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX