/*
 * Decompiled with CFR 0.152.
 */
package org.hibernate.search.backend.elasticsearch.orchestration.impl;

import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicBoolean;
import org.hibernate.search.backend.elasticsearch.logging.impl.Log;
import org.hibernate.search.backend.elasticsearch.orchestration.impl.AbstractElasticsearchSharedWorkOrchestrator;
import org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchAccumulatingWorkOrchestrator;
import org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchSharedWorkOrchestrator;
import org.hibernate.search.engine.common.spi.ErrorHandler;
import org.hibernate.search.util.common.impl.Closer;
import org.hibernate.search.util.common.impl.Executors;
import org.hibernate.search.util.common.logging.impl.LoggerFactory;

class ElasticsearchBatchingSharedWorkOrchestrator
extends AbstractElasticsearchSharedWorkOrchestrator
implements ElasticsearchSharedWorkOrchestrator,
AutoCloseable {
    private static final Log log = (Log)LoggerFactory.make(Log.class, (MethodHandles.Lookup)MethodHandles.lookup());
    private final ElasticsearchAccumulatingWorkOrchestrator delegate;
    private final ErrorHandler errorHandler;
    private final int changesetsPerBatch;
    private final BlockingQueue<AbstractElasticsearchSharedWorkOrchestrator.Changeset> changesetQueue;
    private final List<AbstractElasticsearchSharedWorkOrchestrator.Changeset> changesetBuffer;
    private final AtomicBoolean processingScheduled;
    private ExecutorService executor;
    private final Phaser phaser = new Phaser(){

        @Override
        protected boolean onAdvance(int phase, int registeredParties) {
            return false;
        }
    };

    public ElasticsearchBatchingSharedWorkOrchestrator(String name, int maxChangesetsPerBatch, boolean fair, ElasticsearchAccumulatingWorkOrchestrator delegate, ErrorHandler errorHandler) {
        super(name);
        this.delegate = delegate;
        this.errorHandler = errorHandler;
        this.changesetsPerBatch = maxChangesetsPerBatch;
        this.changesetQueue = new ArrayBlockingQueue<AbstractElasticsearchSharedWorkOrchestrator.Changeset>(maxChangesetsPerBatch, fair);
        this.changesetBuffer = new ArrayList<AbstractElasticsearchSharedWorkOrchestrator.Changeset>(maxChangesetsPerBatch);
        this.processingScheduled = new AtomicBoolean(false);
    }

    @Override
    public void start() {
        this.executor = Executors.newFixedThreadPool((int)1, (String)this.getName());
    }

    public ElasticsearchSharedWorkOrchestrator createChild(String name) {
        return new ChildOrchestrator(name);
    }

    @Override
    protected void doSubmit(AbstractElasticsearchSharedWorkOrchestrator.Changeset changeset) throws InterruptedException {
        this.changesetQueue.put(changeset);
        this.ensureProcessingScheduled();
    }

    private void ensureProcessingScheduled() {
        block10: {
            if (!this.processingScheduled.get()) {
                this.phaser.register();
                try {
                    if (this.processingScheduled.compareAndSet(false, true)) {
                        try {
                            this.executor.submit(this::processBatch);
                            break block10;
                        }
                        catch (Throwable e) {
                            try {
                                this.processingScheduled.set(false);
                            }
                            catch (Throwable e2) {
                                e.addSuppressed(e2);
                            }
                            throw e;
                        }
                    }
                    this.phaser.arriveAndDeregister();
                }
                catch (Throwable e) {
                    try {
                        this.phaser.arriveAndDeregister();
                    }
                    catch (Throwable e2) {
                        e.addSuppressed(e2);
                    }
                    throw e;
                }
            }
        }
    }

    @Override
    public void awaitCompletion() throws InterruptedException {
        int phaseBeforeUnarrivedPartiesCheck = this.phaser.getPhase();
        if (this.phaser.getUnarrivedParties() > 0) {
            this.phaser.awaitAdvanceInterruptibly(phaseBeforeUnarrivedPartiesCheck);
        }
    }

    @Override
    protected void doClose() {
        try (Closer closer = new Closer();){
            closer.push(ElasticsearchBatchingSharedWorkOrchestrator::awaitCompletionBeforeClose, (Object)this);
            closer.push(ExecutorService::shutdownNow, (Object)this.executor);
            closer.push(Phaser::forceTermination, (Object)this.phaser);
        }
    }

    private void awaitCompletionBeforeClose() {
        try {
            this.awaitCompletion();
        }
        catch (InterruptedException e) {
            log.interruptedWhileWaitingForIndexActivity(this.getName(), e);
            Thread.currentThread().interrupt();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processBatch() {
        try {
            CompletableFuture<?> future;
            try {
                ElasticsearchAccumulatingWorkOrchestrator elasticsearchAccumulatingWorkOrchestrator = this.delegate;
                synchronized (elasticsearchAccumulatingWorkOrchestrator) {
                    this.delegate.reset();
                    this.changesetBuffer.clear();
                    this.changesetQueue.drainTo(this.changesetBuffer, this.changesetsPerBatch);
                    for (AbstractElasticsearchSharedWorkOrchestrator.Changeset changeset : this.changesetBuffer) {
                        try {
                            changeset.submitTo(this.delegate);
                        }
                        catch (Throwable e) {
                            changeset.getFuture().completeExceptionally(e);
                            throw e;
                        }
                    }
                    future = this.delegate.executeSubmitted();
                }
            }
            finally {
                try {
                    this.processingScheduled.set(false);
                    if (!this.changesetQueue.isEmpty()) {
                        this.ensureProcessingScheduled();
                    }
                }
                catch (Throwable e) {
                    this.errorHandler.handleException("Error while ensuring the next submitted asynchronous Elasticsearch works will be processed", e);
                }
            }
            future.join();
        }
        catch (Throwable e) {
            this.errorHandler.handleException("Error while processing Elasticsearch works", e);
        }
        finally {
            this.phaser.arriveAndDeregister();
        }
    }

    private class ChildOrchestrator
    extends AbstractElasticsearchSharedWorkOrchestrator
    implements ElasticsearchSharedWorkOrchestrator {
        protected ChildOrchestrator(String name) {
            super(name);
        }

        @Override
        public void start() {
        }

        @Override
        protected void doSubmit(AbstractElasticsearchSharedWorkOrchestrator.Changeset changeset) {
            ElasticsearchBatchingSharedWorkOrchestrator.this.submit(changeset);
        }

        @Override
        public void awaitCompletion() throws InterruptedException {
            ElasticsearchBatchingSharedWorkOrchestrator.this.awaitCompletion();
        }

        @Override
        protected void doClose() {
            ElasticsearchBatchingSharedWorkOrchestrator.this.awaitCompletionBeforeClose();
        }
    }
}

