/*
 * Decompiled with CFR 0.152.
 */
package ai.grakn.engine.postprocessing;

import ai.grakn.GraknConfigKey;
import ai.grakn.GraknTx;
import ai.grakn.Keyspace;
import ai.grakn.concept.ConceptId;
import ai.grakn.engine.GraknConfig;
import ai.grakn.engine.factory.EngineGraknTxFactory;
import ai.grakn.engine.lock.LockProvider;
import ai.grakn.engine.postprocessing.GraknTxMutators;
import ai.grakn.engine.postprocessing.RedisCountStorage;
import ai.grakn.kb.log.CommitLog;
import ai.grakn.util.Schema;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.base.Preconditions;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.util.Pool;

public class PostProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(PostProcessor.class);
    private final GraknConfig engineConfig;
    private final MetricRegistry metricRegistry;
    private final LockProvider lockProvider;
    private final RedisCountStorage redis;
    private final EngineGraknTxFactory factory;
    @Deprecated
    private static final String LOCK_KEY = "/post-processing-lock";

    private PostProcessor(GraknConfig engineConfig, Pool<Jedis> jedisPool, EngineGraknTxFactory factory, LockProvider lockProvider, MetricRegistry metricRegistry) {
        this.engineConfig = engineConfig;
        this.metricRegistry = metricRegistry;
        this.lockProvider = lockProvider;
        this.redis = RedisCountStorage.create(jedisPool, metricRegistry);
        this.factory = factory;
    }

    public static PostProcessor create(GraknConfig engineConfig, Pool<Jedis> jedisPool, EngineGraknTxFactory factory, LockProvider lockProvider, MetricRegistry metricRegistry) {
        return new PostProcessor(engineConfig, jedisPool, factory, lockProvider, metricRegistry);
    }

    public void updateCounts(Keyspace keyspace, CommitLog commitLog) {
        long shardingThreshold = (Long)this.engineConfig.getProperty(GraknConfigKey.SHARDING_THRESHOLD);
        int maxRetry = (Integer)this.engineConfig.getProperty(GraknConfigKey.LOADER_REPEAT_COMMITS);
        try (Timer.Context context = this.metricRegistry.timer(MetricRegistry.name(PostProcessor.class, (String[])new String[]{"execution"})).time();){
            Map jobs = commitLog.instanceCount();
            this.metricRegistry.histogram(MetricRegistry.name(PostProcessor.class, (String[])new String[]{"jobs"})).update(jobs.size());
            HashSet conceptToShard = new HashSet();
            jobs.forEach((key, value) -> {
                this.metricRegistry.histogram(MetricRegistry.name(PostProcessor.class, (String[])new String[]{"shard-size-increase"})).update(value.longValue());
                Timer.Context contextSingle = this.metricRegistry.timer(MetricRegistry.name(PostProcessor.class, (String[])new String[]{"execution-single"})).time();
                try {
                    if (PostProcessor.updateShardCounts(this.redis, keyspace, key, value, shardingThreshold)) {
                        conceptToShard.add(key);
                    }
                }
                finally {
                    contextSingle.stop();
                }
            });
            conceptToShard.forEach(type -> {
                Timer.Context contextSharding = this.metricRegistry.timer("sharding").time();
                try {
                    this.shardConcept(this.redis, this.factory, keyspace, (ConceptId)type, maxRetry, shardingThreshold);
                }
                finally {
                    contextSharding.stop();
                }
            });
            LOG.debug("Updating instance count successful for {} tasks", (Object)jobs.size());
        }
        catch (Exception e) {
            LOG.error("Could not terminate task", (Throwable)e);
            throw e;
        }
    }

    private static boolean updateShardCounts(RedisCountStorage redis, Keyspace keyspace, ConceptId conceptId, long value, long shardingThreshold) {
        long numInstances;
        long numShards = redis.getCount(RedisCountStorage.getKeyNumShards(keyspace, conceptId));
        if (numShards == 0L) {
            numShards = 1L;
        }
        return (numInstances = redis.adjustCount(RedisCountStorage.getKeyNumInstances(keyspace, conceptId), value)) > shardingThreshold * numShards;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void shardConcept(RedisCountStorage redis, EngineGraknTxFactory factory, Keyspace keyspace, ConceptId conceptId, int maxRetry, long shardingThreshold) {
        Lock engineLock = this.lockProvider.getLock(PostProcessor.getLockingKey(keyspace, conceptId));
        engineLock.lock();
        try {
            if (PostProcessor.updateShardCounts(redis, keyspace, conceptId, 0L, shardingThreshold)) {
                GraknTxMutators.runMutationWithRetry(factory, keyspace, maxRetry, graph -> {
                    graph.admin().shard(conceptId);
                    graph.admin().commitSubmitNoLogs();
                });
                redis.adjustCount(RedisCountStorage.getKeyNumShards(keyspace, conceptId), 1L);
            }
        }
        finally {
            engineLock.unlock();
        }
    }

    private static String getLockingKey(Keyspace keyspace, ConceptId conceptId) {
        return "/updating-instance-count-lock/" + keyspace + "/" + conceptId.getValue();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void mergeDuplicateConcepts(GraknTx tx, String conceptIndex, Set<ConceptId> conceptIds) {
        Preconditions.checkNotNull((Object)this.lockProvider, (Object)"Lock provider was null, possible race condition in initialisation");
        if (tx.admin().duplicateResourcesExist(conceptIndex, conceptIds)) {
            Lock indexLock = this.lockProvider.getLock("/post-processing-lock/" + conceptIndex);
            indexLock.lock();
            try {
                boolean commitNeeded = tx.admin().fixDuplicateResources(conceptIndex, conceptIds);
                if (commitNeeded) {
                    this.validateMerged(tx, conceptIndex, conceptIds).ifPresent(message -> {
                        throw new RuntimeException((String)message);
                    });
                    tx.admin().commitSubmitNoLogs();
                }
            }
            finally {
                indexLock.unlock();
            }
        }
    }

    private Optional<String> validateMerged(GraknTx graph, String conceptIndex, Set<ConceptId> conceptIds) {
        int numConceptFound = 0;
        for (ConceptId conceptId : conceptIds) {
            if (graph.getConcept(conceptId) == null || ++numConceptFound <= 1) continue;
            StringBuilder conceptIdValues = new StringBuilder();
            for (ConceptId id : conceptIds) {
                conceptIdValues.append(id.getValue()).append(",");
            }
            return Optional.of("Not all concept were merged. The set of concepts [" + conceptIds.size() + "] with IDs [" + conceptIdValues.toString() + "] matched more than one concept");
        }
        if (graph.admin().getConcept(Schema.VertexProperty.INDEX, (Object)conceptIndex) == null) {
            return Optional.of("The concept index [" + conceptIndex + "] did not return any concept");
        }
        return Optional.empty();
    }
}

