/*
 * Decompiled with CFR 0.152.
 */
package ai.grakn.engine.task.postprocessing;

import ai.grakn.GraknConfigKey;
import ai.grakn.GraknTxType;
import ai.grakn.Keyspace;
import ai.grakn.concept.ConceptId;
import ai.grakn.engine.GraknConfig;
import ai.grakn.engine.KeyspaceStore;
import ai.grakn.engine.factory.EngineGraknTxFactory;
import ai.grakn.engine.task.BackgroundTask;
import ai.grakn.engine.task.postprocessing.IndexPostProcessor;
import ai.grakn.kb.internal.EmbeddedGraknTx;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PostProcessingTask
implements BackgroundTask {
    private static final Logger LOG = LoggerFactory.getLogger(PostProcessingTask.class);
    private final EngineGraknTxFactory factory;
    private final IndexPostProcessor indexPostProcessor;
    private final ScheduledExecutorService threadPool;
    private final int postProcessingMaxJobs;
    private final int postprocessingDelay;

    public PostProcessingTask(EngineGraknTxFactory factory, IndexPostProcessor indexPostProcessor, GraknConfig config) {
        this.factory = factory;
        this.indexPostProcessor = indexPostProcessor;
        this.postProcessingMaxJobs = (Integer)config.getProperty(GraknConfigKey.POST_PROCESSOR_POOL_SIZE);
        this.threadPool = Executors.newScheduledThreadPool(this.postProcessingMaxJobs);
        this.postprocessingDelay = (Integer)config.getProperty(GraknConfigKey.POST_PROCESSOR_DELAY);
    }

    @Override
    public void run() {
        UUID executionId = UUID.randomUUID();
        LOG.info("starting post-processing task with ID '" + executionId + "' ... ");
        KeyspaceStore keyspaceStore = this.factory.keyspaceStore();
        if (keyspaceStore != null) {
            Set keyspaces = keyspaceStore.keyspaces();
            LOG.info("post-processing '" + executionId + "': attempting to process the following keyspaces: [" + keyspaces.stream().map(Keyspace::getValue).collect(Collectors.joining(", ")) + "]");
            keyspaces.forEach(keyspace -> this.runPostProcessing(executionId, (Keyspace)keyspace));
            LOG.info("post-processing task with ID '" + executionId + "' finished.");
        } else {
            LOG.info("post-processing '" + executionId + "': waiting for system keyspace to be ready.");
        }
    }

    private void runPostProcessing(UUID executionId, Keyspace keyspace) {
        String index;
        int limit = 0;
        do {
            index = this.indexPostProcessor.popIndex(keyspace);
            LOG.info("post-processing '" + executionId + "': working on keyspace '" + keyspace.getValue() + "'. The index to be post-processed is '" + index + "'");
            String i = index;
            if (index == null) continue;
            this.threadPool.schedule(() -> this.processIndex(keyspace, i, executionId), (long)this.postprocessingDelay, TimeUnit.SECONDS);
        } while (index != null && ++limit < this.postProcessingMaxJobs);
    }

    private void processIndex(Keyspace keyspace, String index, UUID executionId) {
        Set<ConceptId> ids = this.indexPostProcessor.popIds(keyspace, index);
        if (ids.isEmpty()) {
            LOG.info("post-processing '" + executionId + "': there " + ids.size() + " concept ids to post-process.");
            return;
        }
        LOG.info("post-processing '" + executionId + "': processing " + ids.size() + " concept ids...");
        try (EmbeddedGraknTx<?> tx = this.factory.tx(keyspace, GraknTxType.WRITE);){
            this.indexPostProcessor.mergeDuplicateConcepts(tx, index, ids);
            tx.commit();
        }
        catch (RuntimeException e) {
            String stringIds = ids.stream().map(ConceptId::getValue).collect(Collectors.joining(","));
            LOG.error(String.format("post-processing '" + executionId + "': Error during post processing index {%s} with ids {%s}", index, stringIds), (Throwable)e);
        }
    }

    @Override
    public void close() {
        LOG.info("post-processing is shutting down.");
        this.threadPool.shutdown();
    }
}

