有時您需要將大量數(shù)據(jù)導出為 JSON 到一個文件。也許是“將所有數(shù)據(jù)導出到 JSON”,或者 GDPR“可移植性權(quán)利”,您實際上需要這樣做。
與任何大型數(shù)據(jù)集一樣,您不能將其全部放入內(nèi)存并將其寫入文件。這需要一段時間,它從數(shù)據(jù)庫中讀取大量條目,您需要小心不要使此類導出使整個系統(tǒng)過載或耗盡內(nèi)存。
幸運的是,在 ?JacksonSequenceWriter
?和可選的管道流的幫助下,這樣做相當簡單。這是它的樣子:
private ObjectMapper jsonMapper =new ObjectMapper();
private ExecutorService executorService = Executors.newFixedThreadPool(5);
@Async
public ListenableFuture<Boolean> export(UUID customerId) {
try (PipedInputStream in =new PipedInputStream();
PipedOutputStream pipedOut =new PipedOutputStream(in);
GZIPOutputStream out =new GZIPOutputStream(pipedOut)) {
Stopwatch stopwatch = Stopwatch.createStarted();
ObjectWriter writer = jsonMapper.writer().withDefaultPrettyPrinter();
try(SequenceWriter sequenceWriter = writer.writeValues(out)) {
sequenceWriter.init(true);
Future<?> storageFuture = executorService.submit(() ->
storageProvider.storeFile(getFilePath(customerId), in));
int batchCounter =0;
while (true) {
List<Record> batch = readDatabaseBatch(batchCounter++);
for (Record record : batch) {
sequenceWriter.write(entry);
}
if (batch.isEmpty()) {
// if there are no more batches, stop.
break;
}
}
// wait for storing to complete
storageFuture.get();
// send the customer a notification and a download link
notifyCustomer(customerId);
}
logger.info("Exporting took {} seconds", stopwatch.stop().elapsed(TimeUnit.SECONDS));
return AsyncResult.forValue(true);
}catch (Exception ex) {
logger.error("Failed to export data", ex);
return AsyncResult.forValue(false);
}
}
代碼做了幾件事:
- 使用 ?
SequenceWriter
?連續(xù)寫入記錄。它使用 ?OutputStream
?進行初始化,所有內(nèi)容都寫入其中。這可以是簡單的 ?FileOutputStream
?,也可以是下面討論的管道流。請注意,這里的命名有點誤導——?writeValues(out)
?聽起來你是在指示作者現(xiàn)在寫點什么;相反,它將其配置為稍后使用特定的流。 - 用?
SequenceWriter
?初始化?true
?,意思是“包裹在數(shù)組中”。您正在編寫許多相同的記錄,因此它們應該在最終的 JSON 中表示一個數(shù)組。 - 使用?
PipedOutputStream
?和?PipedInputStream
?將?SequenceWriter
?鏈接到?InputStream
?然后傳遞給存儲服務的 ?an
?。如果我們明確地處理文件,就沒有必要了——只需傳遞 ?aFileOutputStream
?就可以了。但是,您可能希望以不同的方式存儲文件,例如在 Amazon S3 中,并且 ?putObject
?調(diào)用需要一個 ?InputStream
?,從中讀取數(shù)據(jù)并將其存儲在 ?S3
?中。因此,實際上,您正在寫入直接寫入 ?InputStream
?的 ?OutputStream
?,當嘗試從中讀取時,會將所有內(nèi)容寫入另一個 ?OutputStream
? - 存儲文件是在單獨的線程中調(diào)用的,這樣寫入文件不會阻塞當前線程,其目的是從數(shù)據(jù)庫中讀取。同樣,如果使用簡單的 ?
FileOutputStream
?,則不需要這樣做。 - 整個方法被標記為?
@Async (spring)
?以便它不會阻塞執(zhí)行——它在準備好時被調(diào)用并完成(使用具有有限線程池的內(nèi)部 Spring 執(zhí)行程序服務) - 數(shù)據(jù)庫批量讀取代碼這里沒有顯示,因為它因數(shù)據(jù)庫而異。關(guān)鍵是,您應該批量獲取數(shù)據(jù),而不是 ?
SELECT * FROM X
?。 - ?
OutputStream
?被包裹在 ?GZIPOutputStream
?中,因為像 JSON 這樣帶有重復元素的文本文件可以從壓縮中顯著受益
主要工作是由 Jackson 的 SequenceWriter 完成的,需要清楚的點是 - 不要假設您的數(shù)據(jù)會適合內(nèi)存。它幾乎從不這樣做,因此以批處理和增量寫入的方式進行所有操作。