方法client.downloadFile
会不断发射FilePart
,需要collec
写入到文件中即可。
在接收FilePart
期间会有网络等其他异常,现在直接用onErrorResume
从 offset 开始请求返回新的 Flux 会有一个问题。
第一次异常会进入onErrorResume
返回新的 Flux ,由于新的 Flux 没有声明onErrorResume
就噶了
我也不可能在新的 Flux 里声明onErrorResume
,无限套娃了属于是。
client.downloadFile(fileReferenceId)
.publishOn(Schedulers.fromExecutor(Executors.newVirtualThreadPerTaskExecutor()))
.timeout(Duration.ofMinutes(3))
.onErrorResume(RpcException::class.java) {
if (it.error.errorCode() == TIMEOUT_CODE && monitoredChannel.isDone().not()) {
log.warn("Download timeout, resuming: $fileDownloadPath")
return@onErrorResume client.downloadFile(
fileReferenceId,
monitoredChannel.getDownloadedBytes(),
MAX_FILE_PART_SIZE,
true
)
}
Flux.error(it)
}
.collect({ monitoredChannel }, { fc, filePart ->
fc.write(filePart.bytes.nioBuffer())
})
.doOnSuccess {
tempDownloadPath.moveTo(fileDownloadPath)
downloadCounting.incrementAndGet()
log.info("Downloaded file: $fileDownloadPath")
}
.doOnError {
log.error("Error downloading file:$fileDownloadPath", it)
}
.onErrorMap {
wrapRetryableExceptionIfNeeded(it)
}
.doFinally {
runCatching {
closePath(fileDownloadPath)
}.onFailure {
log.error("Error closing file channel", it)
}
hashingPathMapping.remove(hashing)
}
.block()
这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。
V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。
V2EX is a community of developers, designers and creative people.