V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
allenwuli
V2EX  ›  算法

超大数据量排序算法

  •  
  •   allenwuli · 2019-05-27 14:03:12 +08:00 · 2719 次点击
    这是一个创建于 1790 天前的主题,其中的信息可能已经有所发展或是发生改变。

    比如我有 50G 的数据量,我要对 50G 数据进行排序。但是我的内存只有 16G,这就需要进行分段排序。是否有这样的算法? python 实现最好,请教各位大佬。

    4 条回复    2019-05-31 13:46:04 +08:00
    jmc891205
        1
    jmc891205  
       2019-05-27 14:17:29 +08:00   ❤️ 1
    请以「外部排序」为关键字自行 Google
    allenwuli
        2
    allenwuli  
    OP
       2019-05-27 14:29:43 +08:00
    @jmc891205 谢了老板
    cxtrinityy
        3
    cxtrinityy  
       2019-05-27 14:33:23 +08:00 via Android
    虽然想到了和外部排序类似的路子归并,不过卡在最后归并那,没想起归并是稳定排序,最后可以多路分别从已排序的的不同数据段取一部分逐步归并
    allenwuli
        4
    allenwuli  
    OP
       2019-05-31 13:46:04 +08:00
    import os
    import argparse


    class FileSplitter(object):
    BLOCK_FILENAME_FORMAT = 'block_{0}.dat'

    def __init__(self, filename):
    self.filename = filename
    self.block_filenames = []

    def write_block(self, data, block_number):
    filename = self.BLOCK_FILENAME_FORMAT.format(block_number)
    with open(filename, 'w') as file:
    file.write(data)
    self.block_filenames.append(filename)

    def get_block_filenames(self):
    return self.block_filenames

    def split(self, block_size, sort_key=None):
    i = 0
    with open(self.filename) as file:
    while True:
    lines = file.readlines(block_size)

    if not lines:
    break

    if sort_key is None:

    lin = [int(aaa.split("\n")[0]) for aaa in lines]
    lin.sort()
    lines = [str(sss) + "\n" for sss in lin]
    else:
    lin = [int(aaa.split("\n")[0]) for aaa in lines]
    lin.sort(key=sort_key)
    lines = [str(sss) + "\n" for sss in lin]

    self.write_block(''.join(lines), i)
    i += 1

    def cleanup(self):
    map(lambda f: os.remove(f), self.block_filenames)


    class NWayMerge(object):
    def select(self, choices):
    print(choices)
    min_index = -1
    min_str = None
    for i in choices.keys():
    if min_str is None or choices[i] < min_str:
    min_index = i
    min_str = choices[i]
    print(min_index)
    return min_index


    class FilesArray(object):
    def __init__(self, files):
    self.files = files
    self.empty = set()
    self.num_buffers = len(files)
    self.buffers = {i: None for i in range(self.num_buffers)}

    def get_dict(self):
    return {i: int(self.buffers[i].split("\n")[0]) for i in range(self.num_buffers) if i not in self.empty}

    def refresh(self):
    for i in range(self.num_buffers):
    if self.buffers[i] is None and i not in self.empty:
    self.buffers[i] = self.files[i].readline()
    if self.buffers[i] == '':
    self.empty.add(i)
    self.files[i].close()
    if len(self.empty) == self.num_buffers:
    return False

    return True

    def unshift(self, index):
    value = self.buffers[index]
    self.buffers[index] = None

    return value


    class FileMerger(object):
    def __init__(self, merge_strategy):
    self.merge_strategy = merge_strategy

    def merge(self, filenames, outfilename, buffer_size):
    buffers = FilesArray(self.get_file_handles(filenames, buffer_size))
    with open(outfilename, 'w', buffer_size) as outfile:
    while buffers.refresh():
    min_index = self.merge_strategy.select(buffers.get_dict())
    outfile.write(buffers.unshift(min_index))

    def get_file_handles(self, filenames, buffer_size):
    files = {}

    for i in range(len(filenames)):
    files[i] = open(filenames[i], 'r', buffer_size)

    return files


    class ExternalSort(object):
    def __init__(self, block_size):
    self.block_size = block_size

    def sort(self, filename, sort_key=None):
    num_blocks = self.get_number_blocks(filename, self.block_size)
    splitter = FileSplitter(filename)
    splitter.split(self.block_size, sort_key)

    merger = FileMerger(NWayMerge())
    buffer_size = self.block_size / (num_blocks + 1)
    merger.merge(splitter.get_block_filenames(), filename + '.out', int(buffer_size))

    splitter.cleanup()

    def get_number_blocks(self, filename, block_size):
    return (os.stat(filename).st_size / block_size) + 1


    def parse_memory(string):
    if string[-1].lower() == 'k':
    return int(string[:-1]) * 1024
    elif string[-1].lower() == 'm':
    return int(string[:-1]) * 1024 * 1024
    elif string[-1].lower() == 'g':
    return int(string[:-1]) * 1024 * 1024 * 1024
    else:
    return int(string)


    def main():

    # parser = argparse.ArgumentParser()
    # parser.add_argument('-m',
    # '--mem',
    # help='amount of memory to use for sorting',
    # default='100M')
    # parser.add_argument('filename',
    # metavar='<filename>',
    # nargs=1,
    # help='name of file to sort')
    # args = parser.parse_args()

    sorter = ExternalSort(300)
    sorter.sort('num1.txt')


    if __name__ == '__main__':
    main()
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   我们的愿景   ·   实用小工具   ·   2913 人在线   最高记录 6543   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 30ms · UTC 08:38 · PVG 16:38 · LAX 01:38 · JFK 04:38
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.