/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.query.impl.massindex;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.hibernate.search.engine.spi.EntityIndexBinding;
import org.hibernate.search.spi.IndexedTypeIdentifier;
import org.hibernate.search.spi.SearchIntegrator;
import org.hibernate.search.spi.impl.PojoIndexedTypeIdentifier;
import org.infinispan.AdvancedCache;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.time.TimeService;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.jmx.annotations.MBean;
import org.infinispan.jmx.annotations.ManagedOperation;
import org.infinispan.manager.ClusterExecutor;
import org.infinispan.query.MassIndexer;
import org.infinispan.query.backend.KeyTransformationHandler;
import org.infinispan.query.impl.massindex.IndexUpdater;
import org.infinispan.query.impl.massindex.IndexWorker;
import org.infinispan.query.impl.massindex.MassIndexStrategy;
import org.infinispan.query.impl.massindex.MassIndexStrategyFactory;
import org.infinispan.query.logging.Log;
import org.infinispan.remoting.transport.Address;
import org.infinispan.util.concurrent.CompletableFutures;
import org.infinispan.util.concurrent.CompletionStages;
import org.infinispan.util.function.TriConsumer;
import org.infinispan.util.logging.LogFactory;

@MBean(objectName="MassIndexer", description="Component that rebuilds the index from the cached data")
public class DistributedExecutorMassIndexer
implements MassIndexer {
    private static final Log LOG = (Log)LogFactory.getLog(DistributedExecutorMassIndexer.class, Log.class);
    private final AdvancedCache<?, ?> cache;
    private final SearchIntegrator searchIntegrator;
    private final IndexUpdater indexUpdater;
    private final ClusterExecutor executor;
    private final ExecutorService localExecutor;

    public DistributedExecutorMassIndexer(AdvancedCache cache, SearchIntegrator searchIntegrator, KeyTransformationHandler keyTransformationHandler, TimeService timeService) {
        this.cache = cache;
        this.searchIntegrator = searchIntegrator;
        this.indexUpdater = new IndexUpdater(searchIntegrator, keyTransformationHandler, timeService);
        this.executor = cache.getCacheManager().executor();
        this.localExecutor = (ExecutorService)cache.getCacheManager().getGlobalComponentRegistry().getComponent(ExecutorService.class, "org.infinispan.executors.persistence");
    }

    @Override
    @ManagedOperation(description="Starts rebuilding the index", displayName="Rebuild index")
    public void start() {
        CompletionStages.join(this.executeInternal());
    }

    @Override
    public CompletableFuture<Void> startAsync() {
        return this.executeInternal();
    }

    @Override
    public CompletableFuture<Void> reindex(Object ... keys) {
        CompletableFuture<Void> future = null;
        HashSet<Object> everywhereSet = new HashSet<Object>();
        HashSet<Object> primeownerSet = new HashSet<Object>();
        for (Object key : keys) {
            if (this.cache.containsKey(key)) {
                Class<?> indexedType = this.cache.get(key).getClass();
                EntityIndexBinding entityIndexBinding = (EntityIndexBinding)this.searchIntegrator.getIndexBindings().get((IndexedTypeIdentifier)new PojoIndexedTypeIdentifier(indexedType));
                MassIndexStrategy strategy = MassIndexStrategyFactory.calculateStrategy(entityIndexBinding, this.cache.getCacheConfiguration());
                MassIndexStrategy.IndexingExecutionMode indexingStrategy = strategy.getIndexingStrategy();
                switch (indexingStrategy) {
                    case ALL: {
                        everywhereSet.add(key);
                        break;
                    }
                    case PRIMARY_OWNER: {
                        primeownerSet.add(key);
                    }
                }
                continue;
            }
            LOG.warn("cache contains no mapping for the key");
        }
        TriConsumer triConsumer = (a, v, t) -> {
            if (t != null) {
                throw new CacheException(t);
            }
        };
        if (everywhereSet.size() > 0) {
            IndexWorker indexWorkEverywhere = new IndexWorker(this.cache.getName(), null, false, false, false, everywhereSet);
            future = this.executor.submitConsumer((Function)indexWorkEverywhere, triConsumer);
        }
        if (primeownerSet.size() > 0) {
            HashMap targets = new HashMap();
            DistributionManager distributionManager = this.cache.getDistributionManager();
            if (distributionManager != null) {
                ArrayList<CompletableFuture> futures;
                LocalizedCacheTopology localizedCacheTopology = this.cache.getDistributionManager().getCacheTopology();
                for (Object e : primeownerSet) {
                    Address primary = localizedCacheTopology.getDistribution(e).primary();
                    HashSet keysForAddress = (HashSet)targets.get(primary);
                    if (keysForAddress == null) {
                        keysForAddress = new HashSet();
                        targets.put(primary, keysForAddress);
                    }
                    keysForAddress.add(e);
                }
                if (future != null) {
                    futures = new ArrayList(targets.size() + 1);
                    futures.add(future);
                } else {
                    futures = new ArrayList<CompletableFuture>(targets.size());
                }
                for (Map.Entry entry : targets.entrySet()) {
                    IndexWorker indexWorkEverywhere = new IndexWorker(this.cache.getName(), null, false, false, false, (Set)entry.getValue());
                    futures.add(this.executor.filterTargets(Collections.singleton((Address)entry.getKey())).submitConsumer((Function)indexWorkEverywhere, triConsumer));
                }
                future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
            } else {
                IndexWorker indexWorkEverywhere = new IndexWorker(this.cache.getName(), null, false, false, false, primeownerSet);
                CompletableFuture localFuture = this.executor.submitConsumer((Function)indexWorkEverywhere, triConsumer);
                future = future != null ? CompletableFuture.allOf(future, localFuture) : localFuture;
            }
        }
        return future != null ? future : CompletableFutures.completedNull();
    }

    private CompletableFuture<Void> executeInternal() {
        ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>();
        LinkedList<IndexedTypeIdentifier> toFlush = new LinkedList<IndexedTypeIdentifier>();
        TriConsumer triConsumer = (a, v, t) -> {
            if (t != null) {
                throw new CacheException(t);
            }
        };
        for (IndexedTypeIdentifier indexedType : this.searchIntegrator.getIndexBindings().keySet()) {
            EntityIndexBinding indexBinding = (EntityIndexBinding)this.searchIntegrator.getIndexBindings().get(indexedType);
            MassIndexStrategy strategy = MassIndexStrategyFactory.calculateStrategy(indexBinding, this.cache.getCacheConfiguration());
            boolean workerClean = true;
            boolean workerFlush = true;
            if (strategy.getCleanStrategy() == MassIndexStrategy.CleanExecutionMode.ONCE_BEFORE) {
                this.indexUpdater.purge(indexedType);
                workerClean = false;
            }
            if (strategy.getFlushStrategy() == MassIndexStrategy.FlushExecutionMode.ONCE_AFTER) {
                toFlush.add(indexedType);
                workerFlush = false;
            }
            MassIndexStrategy.IndexingExecutionMode indexingStrategy = strategy.getIndexingStrategy();
            IndexWorker indexWork = new IndexWorker(this.cache.getName(), indexedType, workerFlush, workerClean, indexingStrategy == MassIndexStrategy.IndexingExecutionMode.PRIMARY_OWNER, null);
            futures.add(this.executor.submitConsumer((Function)indexWork, triConsumer));
        }
        CompletableFuture<Void> compositeFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
        BiConsumer<Void, Throwable> flushIfNeeded = (v, t) -> {
            for (IndexedTypeIdentifier type : toFlush) {
                this.indexUpdater.flush(type);
            }
        };
        return compositeFuture.whenCompleteAsync((BiConsumer)flushIfNeeded, (Executor)this.localExecutor);
    }
}

