/*
 * Decompiled with CFR 0.152.
 */
package ai.grakn.engine.postprocessing;

import ai.grakn.engine.postprocessing.Cache;
import ai.grakn.engine.postprocessing.ConceptFixer;
import ai.grakn.engine.util.ConfigProperties;
import ai.grakn.factory.GraphFactory;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PostProcessing {
    private static final String CASTING_STAGE = "Scanning for duplicate castings . . .";
    private static final String RESOURCE_STAGE = "Scanning for duplicate resources . . .";
    private static PostProcessing instance = null;
    private final AtomicBoolean isRunning = new AtomicBoolean(false);
    private final Logger LOG = LoggerFactory.getLogger(PostProcessing.class);
    private ExecutorService postpool = Executors.newFixedThreadPool(ConfigProperties.getInstance().getAvailableThreads());
    private ExecutorService statDump = Executors.newSingleThreadExecutor();
    private Set<Future> futures;
    private String currentStage;
    private Cache cache = Cache.getInstance();

    private PostProcessing() {
        this.futures = ConcurrentHashMap.newKeySet();
        this.isRunning.set(false);
    }

    public static synchronized PostProcessing getInstance() {
        if (instance == null) {
            instance = new PostProcessing();
        }
        return instance;
    }

    public void run() {
        if (!this.isRunning.get()) {
            this.LOG.info("Starting maintenance.");
            this.isRunning.set(true);
            this.statDump.submit(this::dumpStats);
            this.performTasks();
            this.futures = ConcurrentHashMap.newKeySet();
            this.isRunning.set(false);
            this.LOG.info("Maintenance completed.");
        }
    }

    public void stop() {
        if (this.isRunning.get()) {
            this.LOG.warn("Shutting down running tasks");
            this.futures.forEach(f -> f.cancel(true));
            this.postpool.shutdownNow();
            this.statDump.shutdownNow();
        }
        this.isRunning.set(false);
    }

    public void reset() {
        this.isRunning.set(false);
        this.futures.clear();
        this.postpool = Executors.newFixedThreadPool(ConfigProperties.getInstance().getAvailableThreads());
        this.statDump = Executors.newSingleThreadExecutor();
    }

    private void performTasks() {
        this.currentStage = CASTING_STAGE;
        this.LOG.info(this.currentStage);
        this.performCastingFix();
        this.waitToContinue();
        this.currentStage = RESOURCE_STAGE;
        this.LOG.info(this.currentStage);
        this.performResourceFix();
        this.waitToContinue();
    }

    private void performCastingFix() {
        this.cache.getKeyspaces().parallelStream().forEach(keyspace -> {
            try {
                HashSet<String> castingIds = new HashSet<String>();
                castingIds.addAll(this.cache.getCastingJobs((String)keyspace));
                for (String castingId : castingIds) {
                    this.futures.add(this.postpool.submit(() -> ConceptFixer.checkCasting(this.cache, GraphFactory.getInstance().getGraphBatchLoading((String)keyspace), castingId)));
                }
            }
            catch (RuntimeException e) {
                this.LOG.error("Error while trying to perform post processing on graph [" + keyspace + "]", (Throwable)e);
            }
        });
    }

    private void performResourceFix() {
        this.cache.getKeyspaces().parallelStream().forEach(keyspace -> {
            try {
                this.futures.add(this.postpool.submit(() -> ConceptFixer.checkResources(this.cache, GraphFactory.getInstance().getGraphBatchLoading((String)keyspace), this.cache.getResourceJobs((String)keyspace))));
            }
            catch (RuntimeException e) {
                this.LOG.error("Error while trying to perform post processing on graph [" + keyspace + "]", (Throwable)e);
            }
        });
    }

    private void waitToContinue() {
        for (Future future : this.futures) {
            try {
                future.get(4L, TimeUnit.HOURS);
            }
            catch (InterruptedException | ExecutionException e) {
                this.LOG.error("Error while waiting for future: ", (Throwable)e);
            }
            catch (TimeoutException e) {
                this.LOG.warn("Timeout exception waiting for future to complete", (Throwable)e);
            }
        }
        this.futures.clear();
    }

    private void dumpStats() {
        while (this.isRunning.get()) {
            this.LOG.info("--------------------Current Status of Post Processing--------------------");
            this.dumpStatsType("Casting");
            this.dumpStatsType("Resources");
            this.LOG.info("Save in Progress: " + this.cache.isSaveInProgress());
            this.LOG.info("Current Stage: " + this.currentStage);
            this.LOG.info("-------------------------------------------------------------------------");
            try {
                Thread.sleep(30000L);
            }
            catch (InterruptedException e) {
                this.LOG.error("Exception", (Throwable)e);
            }
        }
    }

    private void dumpStatsType(String typeName) {
        long total = 0L;
        this.LOG.info(typeName + " Jobs:");
        for (String keyspace : this.cache.getKeyspaces()) {
            long numJobs = 0L;
            if (typeName.equals("Casting")) {
                numJobs = this.cache.getCastingJobs(keyspace).size();
            } else if (typeName.equals("Resources")) {
                numJobs = this.cache.getCastingJobs(keyspace).size();
            }
            this.LOG.info("        Post processing step [" + typeName + " for Graph [" + keyspace + "] has jobs : " + numJobs);
            total += numJobs;
        }
        this.LOG.info("    Total " + typeName + " Jobs: " + total);
    }
}

