如果读文件的速度比处理的快怎么办

2020-02-21 21:59:55 +08:00
 23571113

我要处理一个大文件, 用的是 libuv(reactor 模型)来处理异步 I/O,但是一个问题是,我非阻塞分段读了文件之后(文件很大),放到线程池中做一些修改,然后按顺序非阻塞写到另外一个文件里.
问题一是 写文件是远程写,而读文件是本地读,这样非阻塞 I/O 会导致内存爆炸.
问题二是 既然是非阻塞 I/O 我怎么,输出怎么拼回去.

6405 次点击
所在节点    C++
40 条回复
23571113
2020-02-22 11:30:10 +08:00
@secondwtq
@catror
@msg7086
@laminux29
@FrankHB
@boywhp
@Cbdy
@rapiz
@abutter
对了,客户还要能够恢复数据. 我现在愁死了,不知道如何下手. 大佬们帮帮忙看看我该从哪里开始.
abutter
2020-02-22 11:38:32 +08:00
你的需求我是没有完全看懂,啥叫能够“然后能够做到把不同文件的相似数据块给去掉”?不重复传递一样的数据?

云存储提供的 API 不同,可能实现方式不同。
23571113
2020-02-22 11:43:00 +08:00
@abutter 比如两个文件有 4KB 的数据是相同的,我只存 4KB 数据到云上,而不是存 8KB.
abutter
2020-02-22 11:56:22 +08:00
librsync 应该有你需要的,用来确定文件是否变化,以及哪些部分变化。

还有一个问题,要看你是的云服务商提供的 API。否则需要加 meta 文件来解决。
FrankHB
2020-02-22 13:07:48 +08:00
Content-agnostic target deduplication... 没做过不清楚细节,之前找到过的也是文件系统级的。不过这种需求如果不是服务商给特别的支持,可以考虑用户侧自己把要传输的文件都压缩了,顺便也会 dedup。
恢复数据是指什么?你这备份数据的时候不会把原件给扔了吧。如果是指传输中断可以恢复进度,做日志实现事务。
laminux29
2020-02-22 14:23:33 +08:00
@FrankHB 做编程就像建房砌砖一样。小房子,不做设计,先砌砖当然可行。但大型工程,必然要先有设计图。
Samuelcc
2020-02-22 18:23:00 +08:00
背压下
Sasasu
2020-02-22 18:30:18 +08:00
异步里很常见简单一个问题,除非你的程序是单纯的数据库 gui 的 api 都会有这个问题。

常见的思路是把线程分两组,一组用于异步事件轮询,一组用于阻塞任务。无论是 CPU 密集还是 IO 密集都可以这样搞,IO 密集阻塞如果去不掉的话就开一堆线程去做,CPU 密集就少开点线程。
比如
http://docs.libuv.org/en/v1.x/threadpool.html This thread pool is internally used to run all file system operations, as well as getaddrinfo and getnameinfo requests.
https://docs.rs/tokio-threadpool/0.1.18/tokio_threadpool/ Backup threads are intended only to support the blocking API.
asio 应该可以兼容手写的 thread poll

更 "工程" 的思路是实现抢占,比如 Go,只有一个 pool 然后一堆协程去抢,当然就会出现程序性能下限高上线极低。
Mutoo
2020-02-22 18:38:52 +08:00
生产者消费者 + 流模型,可以参考一下 node 的 steam 的实现,readable 和 writable 设计的非常好。
对其设定缓冲区(buffer/highWaterMark)。之间用 pipe 串连起来,完全是异步实现。
Sasasu
2020-02-22 18:41:23 +08:00
> 然后按顺序非阻塞写到另外一个文件里.

接受者一个持有一个当前 index,发送者把自己的 index - 1 一个 CAS 到接受者上
失败就休眠,等发送者唤醒自己
成功就返回成功,并唤醒发送者

发送者写完一个块就唤醒等在自己身上的接受者
Sasasu
2020-02-22 18:43:38 +08:00
writer 一开始一个持有一个当前 index = 0

worker 把自己的 index - 1 用 CAS 推到 writer 上
> 失败就休眠,等 writer 唤醒自己
> 成功就返回成功,并唤醒 writer

writer 写完一个块就唤醒等在自己身上的 worker
23571113
2020-02-22 19:54:56 +08:00
@Sasasu 我觉得 writer 不应该和线程池之间共用 index 之类的概念,因为他们是不同的东西. 我觉得按顺序完成应该由线程池来保证的. 比如线程池内部有两个变量, 一个是递增的给任务分配的任务号, 一个用来判断当前可以执行结束的任务号.
Sasasu
2020-02-22 21:06:23 +08:00
原理都差不多,但是没见任何 runtime 有现成的功能....
outoftimeerror
2020-02-22 22:01:43 +08:00
reactive stream 了解一下,akka-stream 提供了实现方式
DelayNoMore
2020-02-22 22:50:42 +08:00
如果是 go,用 waitgroup 搞定
reus
2020-02-23 08:11:37 +08:00
你们都是纸上谈兵……
实际上一个 content addressable 系统,一百来行就能实现核心功能
https://play.golang.org/p/LJWU2ut6_TH
https://gist.github.com/reusee/fac296fd9b19d1b5a78650816518ebe2
reus
2020-02-23 12:21:30 +08:00
这个实际上就是和 bt 类似的东西,将文件或者文件夹分成多个块,每个块计算哈希,然后种子文件里只记录哈希和文件夹结构,这样就能用种子去下载整个文件。存储就直接用 kv 类就行,键是哈希,值是哈希对应的内容,这样相同的内容自然就只存一份了。
git 也是类似的思路,这些系统有个统称是 content addressable system。不难做的,用 go 吧。你看我花个十几二十分钟都做出个原型了,你们还在讨论 io 模型,讨论流程图……
23571113
2020-02-23 12:35:33 +08:00
@reus 基本是这样的,但是有些策略, 比如 chunk 划分不应该用定长的, 如果两份相同的数据, 把一份前面加一个字节, 定长的 chunk 就找不到重复的. (你可以搜一下 content defined chunking).
reus
2020-02-23 16:01:24 +08:00
@23571113 学习了
hardwork
2020-03-02 11:29:17 +08:00
大致的模型,主线程分段读,每读一段分发到 uv_queue_work 去处理,设置完成回调,回调中做发送处理。
问题 1, 读处理要控制内存,可以简单控制一下任务数量,发出去一个任务加一,发送完成一个减一,达到上限就休息一下,更精细的就记录内存使用量。精确等待控制可以用 cond signal 来做。否则 sleep 一下也可以。
问题 2, 顺序写回在回调里做,弄个数组存完成任务序号,回调里顺序检测数组从头开始发送,碰到还没完成的回调返回即可。记录一下上次顺序完成序号,下次检测从上次完成序号开始。

跑回调的线程和发任务的线程是同一个,所以上面检测都是线程安全的。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/646485

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX