/*
 * Decompiled with CFR 0.152.
 */
package org.hibernate.search.engine.backend.orchestration.spi;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import org.hibernate.search.engine.environment.thread.spi.ThreadPoolProvider;
import org.hibernate.search.engine.reporting.FailureContext;
import org.hibernate.search.engine.reporting.FailureHandler;
import org.hibernate.search.util.common.AssertionFailure;
import org.hibernate.search.util.common.impl.Closer;
import org.hibernate.search.util.common.impl.Futures;

public final class BatchingExecutor<W extends WorkSet<? super P>, P extends WorkProcessor> {
    private final String name;
    private final P processor;
    private final FailureHandler failureHandler;
    private final int maxTasksPerBatch;
    private final BlockingQueue<W> workQueue;
    private final List<W> workBuffer;
    private final AtomicBoolean processingInProgress;
    private ExecutorService executorService;
    private volatile CompletableFuture<?> completionFuture;

    public BatchingExecutor(String name, P processor, int maxTasksPerBatch, boolean fair, FailureHandler failureHandler) {
        this.name = name;
        this.processor = processor;
        this.failureHandler = failureHandler;
        this.maxTasksPerBatch = maxTasksPerBatch;
        this.workQueue = new ArrayBlockingQueue<W>(maxTasksPerBatch, fair);
        this.workBuffer = new ArrayList<W>(maxTasksPerBatch);
        this.processingInProgress = new AtomicBoolean(false);
    }

    public synchronized void start(ThreadPoolProvider threadPoolProvider) {
        this.executorService = threadPoolProvider.newFixedThreadPool(1, this.name);
    }

    public synchronized void stop() {
        try (Closer closer = new Closer();){
            closer.push(ExecutorService::shutdownNow, (Object)this.executorService);
            this.executorService = null;
            this.workQueue.clear();
            if (this.completionFuture != null) {
                this.completionFuture.cancel(false);
                this.completionFuture = null;
            }
        }
    }

    public void submit(W workset) throws InterruptedException {
        if (this.executorService == null) {
            throw new AssertionFailure("Attempt to submit a workset to executor '" + this.name + "', which is stopped There is probably a bug in Hibernate Search, please report it.");
        }
        this.workQueue.put(workset);
        this.ensureProcessingScheduled();
    }

    public CompletableFuture<?> getCompletion() {
        CompletableFuture<?> future = this.completionFuture;
        if (future == null) {
            return CompletableFuture.completedFuture(null);
        }
        return future;
    }

    private void ensureProcessingScheduled() {
        if (this.processingInProgress.compareAndSet(false, true)) {
            try {
                if (this.completionFuture == null) {
                    this.completionFuture = new CompletableFuture();
                }
                this.executorService.submit(this::processBatch);
            }
            catch (Throwable e) {
                try {
                    CompletableFuture<?> future = this.completionFuture;
                    this.completionFuture = null;
                    this.processingInProgress.set(false);
                    future.completeExceptionally(e);
                }
                catch (Throwable e2) {
                    e.addSuppressed(e2);
                }
                throw e;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processBatch() {
        try {
            CompletableFuture<?> batchFuture;
            P p = this.processor;
            synchronized (p) {
                this.processor.beginBatch();
                this.workBuffer.clear();
                this.workQueue.drainTo(this.workBuffer, this.maxTasksPerBatch);
                for (WorkSet workset : this.workBuffer) {
                    try {
                        workset.submitTo(this.processor);
                    }
                    catch (Throwable e) {
                        workset.markAsFailed(e);
                    }
                }
                batchFuture = this.processor.endBatch();
            }
            Futures.unwrappedExceptionJoin(batchFuture);
        }
        catch (Throwable e) {
            FailureContext.Builder contextBuilder = FailureContext.builder();
            contextBuilder.throwable(e);
            contextBuilder.failingOperation("Work processing in executor '" + this.name + "'");
            this.failureHandler.handle(contextBuilder.build());
        }
        finally {
            if (this.workQueue.isEmpty()) {
                CompletableFuture<?> justFinishedQueueFuture = this.completionFuture;
                this.completionFuture = null;
                justFinishedQueueFuture.complete(null);
            }
            this.processingInProgress.set(false);
            if (!this.workQueue.isEmpty()) {
                try {
                    this.ensureProcessingScheduled();
                }
                catch (Throwable e) {
                    FailureContext.Builder contextBuilder = FailureContext.builder();
                    contextBuilder.throwable(e);
                    contextBuilder.failingOperation("Scheduling the next batch in executor '" + this.name + "'");
                    this.failureHandler.handle(contextBuilder.build());
                }
            }
        }
    }

    public static interface WorkSet<P extends WorkProcessor> {
        public void submitTo(P var1);

        public void markAsFailed(Throwable var1);
    }

    public static interface WorkProcessor {
        public void beginBatch();

        public CompletableFuture<?> endBatch();
    }
}

