package com.fabarta.arcgraph.data.importer;

import com.fabarta.arcgraph.data.common.FileHandler;
import com.fabarta.arcgraph.data.common.ImportEdgeDataJob;
import com.fabarta.arcgraph.data.common.ImportVertexDataJob;
import com.fabarta.arcgraph.data.common.RecordsHandler;
import com.fabarta.arcgraph.data.common.SchemaCache;
import com.fabarta.arcgraph.data.config.ImportExportConfig;
import com.fabarta.arcgraph.data.exporter.ExportJob;
import com.fabarta.arcgraph.data.utils.FileUtils;
import com.fabarta.arcgraph.driver.AuthTokens;
import com.fabarta.arcgraph.driver.Driver;
import com.fabarta.arcgraph.driver.GraphDatabase;
import com.fabarta.arcgraph.driver.Session;
import com.fabarta.arcgraph.driver.SessionConfig;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/fabarta/arcgraph/data/importer/ImportJob.class */
public class ImportJob {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) ExportJob.class);
    private ImportExportConfig importExportConfig;
    private List<ImportVertexItemConfig> vertexItemConfigs;
    private List<ImportEdgeItemConfig> edgeItemConfigs;
    private List<Thread> vertexTasks;
    private List<Thread> edgeTasks;
    private final String csvSuffixName;
    private final char dataSplit;
    private final String csvNameSplit;
    private final Map<String, CountDownLatch> vertexEdgeHandleStatus;
    private final boolean useCustomizedImportConfig;
    private final Driver driver;
    private final Session session;
    private List<Session> createdSessions;

    public ImportJob(ImportExportConfig importExportConfig) {
        this.importExportConfig = importExportConfig;
        this.useCustomizedImportConfig = (importExportConfig.getVertexImportMapping().isEmpty() && importExportConfig.getEdgeImportMapping().isEmpty()) ? false : true;
        this.csvSuffixName = importExportConfig.getCsvConfig().getCsvSuffixName();
        this.dataSplit = importExportConfig.getCsvConfig().getDataSplit();
        this.csvNameSplit = importExportConfig.getCsvConfig().getCsvNameSplit();
        this.vertexItemConfigs = new ArrayList();
        this.edgeItemConfigs = new ArrayList();
        this.vertexTasks = new ArrayList();
        this.edgeTasks = new ArrayList();
        this.vertexEdgeHandleStatus = new HashMap();
        this.createdSessions = new ArrayList();
        this.driver = GraphDatabase.driver(this.importExportConfig.getServerConfig().getServerAddress(), AuthTokens.basic(this.importExportConfig.getServerConfig().getUserName(), this.importExportConfig.getServerConfig().getPassword()));
        this.session = this.driver.session(this.importExportConfig.getGraphName(), SessionConfig.builder().withGrpcTimeout(3600L).build());
    }

    public void run() {
        checkFiles();
        checkSchemas();
        generateJobs();
        runJobs();
        close();
    }

    private void close() {
        this.createdSessions.forEach((v0) -> {
            v0.close();
        });
        this.session.close();
        this.driver.close();
    }

    private void abort(Exception exc) {
        exc.printStackTrace();
        close();
    }

    private void checkFiles() {
        if (this.useCustomizedImportConfig) {
            logger.info("CSV status check started from user defined paths");
            Map<String, ImportVertexItemConfig> vertexImportMapping = this.importExportConfig.getVertexImportMapping();
            vertexImportMapping.keySet().forEach(str -> {
                this.vertexItemConfigs.add((ImportVertexItemConfig) vertexImportMapping.get(str));
            });
            Map<String, ImportEdgeItemConfig> edgeImportMapping = this.importExportConfig.getEdgeImportMapping();
            edgeImportMapping.keySet().forEach(str2 -> {
                this.edgeItemConfigs.add((ImportEdgeItemConfig) edgeImportMapping.get(str2));
            });
        } else {
            logger.info("CSV status check started from directory: " + this.importExportConfig.getDataFolder());
            new ArrayList(FileUtils.getFilesWithSuffixName(this.importExportConfig.getDataFolder(), this.csvSuffixName)).forEach(str3 -> {
                if (FileUtils.parseFormatFilename(str3, this.csvSuffixName, this.csvNameSplit) == 0) {
                    this.vertexItemConfigs.add(FileUtils.getImportVertexItemConfigFromFilePath(str3, this.csvSuffixName, this.csvNameSplit));
                } else if (FileUtils.parseFormatFilename(str3, this.csvSuffixName, this.csvNameSplit) == 1) {
                    this.edgeItemConfigs.add(FileUtils.getImportEdgeItemConfigFromFilePath(str3, this.csvSuffixName, this.csvNameSplit));
                }
            });
        }
        logger.info("CSV status check finished");
    }

    private void checkSchemas() {
        logger.info("Schema status check started");
        SchemaCache schemaCache = SchemaCache.getInstance();
        this.vertexItemConfigs.forEach(importVertexItemConfig -> {
            schemaCache.getVertexSchemaByName(this.session, importVertexItemConfig.getVertexTypeName());
        });
        this.edgeItemConfigs.forEach(importEdgeItemConfig -> {
            schemaCache.getVertexSchemaByName(this.session, importEdgeItemConfig.getSourceVertexTypeName());
            schemaCache.getEdgeSchemaByName(this.session, importEdgeItemConfig.getEdgeTypeName());
            schemaCache.getVertexSchemaByName(this.session, importEdgeItemConfig.getTargetVertexTypeName());
        });
        logger.info("Schema status check finished");
    }

    public void generateJobs() {
        logger.info("Generate jobs started");
        this.vertexItemConfigs.forEach(importVertexItemConfig -> {
            CountDownLatch countDownLatch = new CountDownLatch(1);
            String filePath = importVertexItemConfig.getFilePath();
            this.vertexEdgeHandleStatus.put(filePath, countDownLatch);
            FileHandler fileHandler = new FileHandler(filePath, this.importExportConfig.getCsvConfig().isSkipCsvHeader(), this.dataSplit);
            RecordsHandler recordsHandler = new RecordsHandler(this.importExportConfig);
            fileHandler.asPublisher().subscribe(recordsHandler);
            try {
                Session session = this.driver.session(this.importExportConfig.getGraphName(), SessionConfig.builder().withGrpcTimeout(3600L).build());
                this.createdSessions.add(session);
                recordsHandler.asPublisher().subscribe(new ImportVertexDataJob(session, this.importExportConfig, importVertexItemConfig.getVertexTypeName(), countDownLatch));
            } catch (IOException e) {
                abort(e);
            }
            this.vertexTasks.add(new Thread(fileHandler));
        });
        this.edgeItemConfigs.forEach(importEdgeItemConfig -> {
            CountDownLatch countDownLatch = new CountDownLatch(1);
            String filePath = importEdgeItemConfig.getFilePath();
            this.vertexEdgeHandleStatus.put(filePath, countDownLatch);
            FileHandler fileHandler = new FileHandler(filePath, this.importExportConfig.getCsvConfig().isSkipCsvHeader(), this.dataSplit);
            RecordsHandler recordsHandler = new RecordsHandler(this.importExportConfig);
            fileHandler.asPublisher().subscribe(recordsHandler);
            try {
                Session session = this.driver.session(this.importExportConfig.getGraphName(), SessionConfig.builder().withGrpcTimeout(3600L).build());
                this.createdSessions.add(session);
                recordsHandler.asPublisher().subscribe(new ImportEdgeDataJob(session, this.importExportConfig, importEdgeItemConfig.getSourceVertexTypeName(), importEdgeItemConfig.getEdgeTypeName(), importEdgeItemConfig.getTargetVertexTypeName(), countDownLatch));
            } catch (IOException e) {
                abort(e);
            }
            this.edgeTasks.add(new Thread(fileHandler));
        });
        logger.info("Generate jobs finished");
    }

    private void runJobs() {
        logger.info("Run jos started");
        for (Thread thread : this.vertexTasks) {
            thread.start();
            try {
                thread.join();
            } catch (Exception e) {
            }
        }
        this.vertexItemConfigs.forEach(importVertexItemConfig -> {
            String filePath = importVertexItemConfig.getFilePath();
            logger.info("Wait for file: " + filePath);
            try {
                this.vertexEdgeHandleStatus.get(filePath).await();
            } catch (Exception e2) {
                abort(e2);
            }
        });
        for (Thread thread2 : this.edgeTasks) {
            thread2.start();
            try {
                thread2.join();
            } catch (Exception e2) {
            }
        }
        this.edgeItemConfigs.forEach(importEdgeItemConfig -> {
            String filePath = importEdgeItemConfig.getFilePath();
            logger.info("Wait for file: " + filePath);
            try {
                this.vertexEdgeHandleStatus.get(filePath).await();
            } catch (Exception e3) {
                abort(e3);
            }
        });
        logger.info("Run jobs finished!");
    }
}
