/*
 * Decompiled with CFR 0.152.
 */
package org.hibernate.search.engine.backend.orchestration.spi;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicBoolean;
import org.hibernate.search.engine.common.spi.ErrorHandler;
import org.hibernate.search.util.common.AssertionFailure;
import org.hibernate.search.util.common.impl.Closer;
import org.hibernate.search.util.common.impl.Executors;

public final class BatchingExecutor<W extends WorkSet<? super P>, P extends WorkProcessor> {
    private final String name;
    private final P processor;
    private final ErrorHandler errorHandler;
    private final int maxTasksPerBatch;
    private final BlockingQueue<W> workQueue;
    private final List<W> workBuffer;
    private final AtomicBoolean processingScheduled;
    private ExecutorService executorService;
    private final Phaser phaser = new Phaser(){

        @Override
        protected boolean onAdvance(int phase, int registeredParties) {
            return false;
        }
    };

    public BatchingExecutor(String name, P processor, int maxTasksPerBatch, boolean fair, ErrorHandler errorHandler) {
        this.name = name;
        this.processor = processor;
        this.errorHandler = errorHandler;
        this.maxTasksPerBatch = maxTasksPerBatch;
        this.workQueue = new ArrayBlockingQueue<W>(maxTasksPerBatch, fair);
        this.workBuffer = new ArrayList<W>(maxTasksPerBatch);
        this.processingScheduled = new AtomicBoolean(false);
    }

    public synchronized void start() {
        this.executorService = Executors.newFixedThreadPool((int)1, (String)this.name);
    }

    public synchronized void stop() {
        try (Closer closer = new Closer();){
            closer.push(ExecutorService::shutdownNow, (Object)this.executorService);
            this.executorService = null;
            this.workQueue.clear();
            closer.push(Phaser::forceTermination, (Object)this.phaser);
        }
    }

    public void submit(W workset) throws InterruptedException {
        if (this.executorService == null) {
            throw new AssertionFailure("Attempt to submit a workset to executor '" + this.name + "', which is stopped There is probably a bug in Hibernate Search, please report it.");
        }
        this.workQueue.put(workset);
        this.ensureProcessingScheduled();
    }

    public void awaitCompletion() throws InterruptedException {
        int phaseBeforeUnarrivedPartiesCheck = this.phaser.getPhase();
        if (this.phaser.getUnarrivedParties() > 0) {
            this.phaser.awaitAdvanceInterruptibly(phaseBeforeUnarrivedPartiesCheck);
        }
    }

    private void ensureProcessingScheduled() {
        block10: {
            if (!this.processingScheduled.get()) {
                this.phaser.register();
                try {
                    if (this.processingScheduled.compareAndSet(false, true)) {
                        try {
                            this.executorService.submit(this::processBatch);
                            break block10;
                        }
                        catch (Throwable e) {
                            try {
                                this.processingScheduled.set(false);
                            }
                            catch (Throwable e2) {
                                e.addSuppressed(e2);
                            }
                            throw e;
                        }
                    }
                    this.phaser.arriveAndDeregister();
                }
                catch (Throwable e) {
                    try {
                        this.phaser.arriveAndDeregister();
                    }
                    catch (Throwable e2) {
                        e.addSuppressed(e2);
                    }
                    throw e;
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processBatch() {
        try {
            CompletableFuture<?> future;
            try {
                P p = this.processor;
                synchronized (p) {
                    this.processor.beginBatch();
                    this.workBuffer.clear();
                    this.workQueue.drainTo(this.workBuffer, this.maxTasksPerBatch);
                    for (WorkSet workset : this.workBuffer) {
                        try {
                            workset.submitTo(this.processor);
                        }
                        catch (Throwable e) {
                            workset.markAsFailed(e);
                            throw e;
                        }
                    }
                    future = this.processor.endBatch();
                }
            }
            finally {
                try {
                    this.processingScheduled.set(false);
                    if (!this.workQueue.isEmpty()) {
                        this.ensureProcessingScheduled();
                    }
                }
                catch (Throwable e) {
                    this.errorHandler.handleException("Error while ensuring the next work submitted to executor '" + this.name + "' will be processed", e);
                }
            }
            future.join();
        }
        catch (Throwable e) {
            this.errorHandler.handleException("Error while processing works in executor '" + this.name + "'", e);
        }
        finally {
            this.phaser.arriveAndDeregister();
        }
    }

    public static interface WorkSet<P extends WorkProcessor> {
        public void submitTo(P var1);

        public void markAsFailed(Throwable var1);
    }

    public static interface WorkProcessor {
        public void beginBatch();

        public CompletableFuture<?> endBatch();
    }
}

