App下載

Java 使用 Jackson編寫大型 JSON 文件

別動我的月亮 2021-09-24 10:38:01 瀏覽數(shù) (2100)
反饋

有時(shí)您需要將大量數(shù)據(jù)導(dǎo)出為 JSON 到一個(gè)文件。也許是“將所有數(shù)據(jù)導(dǎo)出到 JSON”,或者 GDPR“可移植性權(quán)利”,您實(shí)際上需要這樣做。

與任何大型數(shù)據(jù)集一樣,您不能將其全部放入內(nèi)存并將其寫入文件。這需要一段時(shí)間,它從數(shù)據(jù)庫中讀取大量條目,您需要小心不要使此類導(dǎo)出使整個(gè)系統(tǒng)過載或耗盡內(nèi)存。

幸運(yùn)的是,在 ?JacksonSequenceWriter?和可選的管道流的幫助下,這樣做相當(dāng)簡單。這是它的樣子:

private ObjectMapper jsonMapper =new ObjectMapper();
private ExecutorService executorService = Executors.newFixedThreadPool(5);
 
@Async
public ListenableFuture<Boolean> export(UUID customerId) {
    try (PipedInputStream in =new PipedInputStream();
            PipedOutputStream pipedOut =new PipedOutputStream(in);
            GZIPOutputStream out =new GZIPOutputStream(pipedOut)) {
     
        Stopwatch stopwatch = Stopwatch.createStarted();
 
        ObjectWriter writer = jsonMapper.writer().withDefaultPrettyPrinter();
 
        try(SequenceWriter sequenceWriter = writer.writeValues(out)) {
            sequenceWriter.init(true);
         
            Future<?> storageFuture = executorService.submit(() ->
                   storageProvider.storeFile(getFilePath(customerId), in));
 
            int batchCounter =0;
            while (true) {
                List<Record> batch = readDatabaseBatch(batchCounter++);
                for (Record record : batch) {
                    sequenceWriter.write(entry);
                }
                if (batch.isEmpty()) {
                    // if there are no more batches, stop.
                    break;
                }
            }
 
            // wait for storing to complete
            storageFuture.get();
 
            // send the customer a notification and a download link
            notifyCustomer(customerId);
        } 
 
        logger.info("Exporting took {} seconds", stopwatch.stop().elapsed(TimeUnit.SECONDS));
 
        return AsyncResult.forValue(true);
    }catch (Exception ex) {
        logger.error("Failed to export data", ex);
        return AsyncResult.forValue(false);
    }
}

代碼做了幾件事:

  • 使用 ?SequenceWriter ?連續(xù)寫入記錄。它使用 ?OutputStream ?進(jìn)行初始化,所有內(nèi)容都寫入其中。這可以是簡單的 ?FileOutputStream?,也可以是下面討論的管道流。請注意,這里的命名有點(diǎn)誤導(dǎo)——?writeValues(out)?聽起來你是在指示作者現(xiàn)在寫點(diǎn)什么;相反,它將其配置為稍后使用特定的流。
  • 用?SequenceWriter?初始化?true?,意思是“包裹在數(shù)組中”。您正在編寫許多相同的記錄,因此它們應(yīng)該在最終的 JSON 中表示一個(gè)數(shù)組。
  • 使用?PipedOutputStream?和?PipedInputStream?將?SequenceWriter?鏈接到?InputStream?然后傳遞給存儲服務(wù)的 ?an ?。如果我們明確地處理文件,就沒有必要了——只需傳遞 ?aFileOutputStream?就可以了。但是,您可能希望以不同的方式存儲文件,例如在 Amazon S3 中,并且 ?putObject ?調(diào)用需要一個(gè) ?InputStream?,從中讀取數(shù)據(jù)并將其存儲在 ?S3 ?中。因此,實(shí)際上,您正在寫入直接寫入 ?InputStream ?的 ?OutputStream?,當(dāng)嘗試從中讀取時(shí),會將所有內(nèi)容寫入另一個(gè) ?OutputStream?
  • 存儲文件是在單獨(dú)的線程中調(diào)用的,這樣寫入文件不會阻塞當(dāng)前線程,其目的是從數(shù)據(jù)庫中讀取。同樣,如果使用簡單的 ?FileOutputStream?,則不需要這樣做。
  • 整個(gè)方法被標(biāo)記為?@Async (spring) ?以便它不會阻塞執(zhí)行——它在準(zhǔn)備好時(shí)被調(diào)用并完成(使用具有有限線程池的內(nèi)部 Spring 執(zhí)行程序服務(wù))
  • 數(shù)據(jù)庫批量讀取代碼這里沒有顯示,因?yàn)樗驍?shù)據(jù)庫而異。關(guān)鍵是,您應(yīng)該批量獲取數(shù)據(jù),而不是 ?SELECT * FROM X?。
  • ?OutputStream ?被包裹在 ?GZIPOutputStream ?中,因?yàn)橄?JSON 這樣帶有重復(fù)元素的文本文件可以從壓縮中顯著受益

主要工作是由 Jackson 的 SequenceWriter 完成的,需要清楚的點(diǎn)是 - 不要假設(shè)您的數(shù)據(jù)會適合內(nèi)存。它幾乎從不這樣做,因此以批處理和增量寫入的方式進(jìn)行所有操作。


0 人點(diǎn)贊