/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.query.impl.massindex;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.hibernate.search.engine.spi.EntityIndexBinding;
import org.hibernate.search.spi.SearchIntegrator;
import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.distexec.DefaultExecutorService;
import org.infinispan.distexec.DistributedExecutorService;
import org.infinispan.distexec.DistributedTask;
import org.infinispan.query.MassIndexer;
import org.infinispan.query.impl.massindex.IndexUpdater;
import org.infinispan.query.impl.massindex.IndexWorker;
import org.infinispan.query.impl.massindex.MassIndexStrategy;
import org.infinispan.query.impl.massindex.MassIndexStrategyFactory;
import org.infinispan.query.logging.Log;
import org.infinispan.util.logging.LogFactory;

public class DistributedExecutorMassIndexer
implements MassIndexer {
    private static final Log LOG = (Log)LogFactory.getLog(DistributedExecutorMassIndexer.class, Log.class);
    private final AdvancedCache cache;
    private final SearchIntegrator searchIntegrator;
    private final IndexUpdater indexUpdater;
    private final DistributedExecutorService executor;

    public DistributedExecutorMassIndexer(AdvancedCache cache, SearchIntegrator searchIntegrator) {
        this.cache = cache;
        this.searchIntegrator = searchIntegrator;
        this.indexUpdater = new IndexUpdater((Cache<?, ?>)cache);
        this.executor = new DefaultExecutorService((Cache)cache);
    }

    @Override
    public void start() {
        CompletableFuture<Void> executionResult = this.executeInternal(false);
        executionResult.join();
    }

    @Override
    public CompletableFuture<Void> startAsync() {
        return this.executeInternal(true);
    }

    private CompletableFuture<Void> executeInternal(boolean asyncFlush) {
        ArrayList futures = new ArrayList();
        LinkedList toFlush = new LinkedList();
        for (Class<?> indexedType : this.searchIntegrator.getIndexedTypes()) {
            MassIndexStrategy.IndexingExecutionMode indexingStrategy;
            EntityIndexBinding indexBinding = this.searchIntegrator.getIndexBinding(indexedType);
            MassIndexStrategy strategy = MassIndexStrategyFactory.calculateStrategy(indexBinding, this.cache.getCacheConfiguration());
            boolean workerClean = true;
            boolean workerFlush = true;
            if (strategy.getCleanStrategy() == MassIndexStrategy.CleanExecutionMode.ONCE_BEFORE) {
                this.indexUpdater.purge(indexedType);
                workerClean = false;
            }
            if (strategy.getFlushStrategy() == MassIndexStrategy.FlushExecutionMode.ONCE_AFTER) {
                toFlush.add(indexedType);
                workerFlush = false;
            }
            IndexWorker indexWork = new IndexWorker(indexedType, workerFlush, workerClean, (indexingStrategy = strategy.getIndexingStrategy()) == MassIndexStrategy.IndexingExecutionMode.PRIMARY_OWNER);
            DistributedTask task = this.executor.createDistributedTaskBuilder((Callable)((Object)indexWork)).timeout(0L, TimeUnit.NANOSECONDS).build();
            List futureList = this.executor.submitEverywhere(task);
            futureList.forEach(f -> futures.add(f.exceptionally(t -> {
                if (t instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                    return null;
                }
                if (t instanceof CompletionException) {
                    Throwable cause = t.getCause();
                    throw LOG.errorExecutingMassIndexer(cause);
                }
                throw LOG.errorExecutingMassIndexer((Throwable)t);
            })));
        }
        CompletionStage<Void> compositeFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
        BiConsumer<Void, Throwable> consumer = (v, t) -> {
            for (Class type : toFlush) {
                this.indexUpdater.flush(type);
            }
        };
        compositeFuture = asyncFlush ? compositeFuture.whenCompleteAsync((BiConsumer)consumer, (Executor)Executors.newSingleThreadExecutor()) : compositeFuture.whenComplete((BiConsumer)consumer);
        return compositeFuture;
    }
}

