package com.fabarta.arcgraph.data.common;

import com.fabarta.arcgraph.data.config.ImportExportConfig;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.atomic.AtomicLong;

/* loaded from: input_file:com/fabarta/arcgraph/data/common/RecordsHandler.class */
public class RecordsHandler implements Flow.Subscriber<CSVRecord> {
    private Flow.Subscription subscription;
    private final int batchSize;
    private ImportExportConfig importExportConfig;
    private final AtomicLong atomicLong = new AtomicLong();
    private ConcurrentLinkedQueue<CSVRecord> csvRecordsQueue = new ConcurrentLinkedQueue<>();
    private final SubmissionPublisher publisher = new SubmissionPublisher();

    public RecordsHandler(ImportExportConfig importExportConfig) {
        this.importExportConfig = importExportConfig;
        this.batchSize = this.importExportConfig.getBatchSize();
    }

    public RecordsHandler(int i) {
        this.batchSize = i;
    }

    public Flow.Publisher asPublisher() {
        return this.publisher;
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1L);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public synchronized void onNext(CSVRecord cSVRecord) {
        this.csvRecordsQueue.add(cSVRecord);
        this.atomicLong.addAndGet(1L);
        if (this.csvRecordsQueue.size() > this.batchSize) {
            ArrayList arrayList = new ArrayList();
            for (int i = 0; i < this.batchSize && i < this.csvRecordsQueue.size(); i++) {
                arrayList.add(this.csvRecordsQueue.poll());
            }
            this.publisher.submit(arrayList);
        }
        this.subscription.request(1L);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onError(Throwable th) {
        th.printStackTrace();
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onComplete() {
        while (!this.csvRecordsQueue.isEmpty()) {
            ArrayList arrayList = new ArrayList();
            for (int i = 0; i < this.batchSize && i < this.csvRecordsQueue.size(); i++) {
                arrayList.add(this.csvRecordsQueue.poll());
            }
            this.publisher.submit(arrayList);
        }
        this.publisher.close();
    }
}
