/*
 * Decompiled with CFR 0.152.
 */
package com.graphaware.tx.executor.batch;

import com.graphaware.common.log.LoggerFactory;
import com.graphaware.common.util.BlockingArrayBlockingQueue;
import com.graphaware.tx.executor.NullItem;
import com.graphaware.tx.executor.batch.DisposableBatchTransactionExecutor;
import com.graphaware.tx.executor.batch.UnitOfWork;
import com.graphaware.tx.executor.single.KeepCalmAndCarryOn;
import com.graphaware.tx.executor.single.SimpleTransactionExecutor;
import com.graphaware.tx.executor.single.TransactionExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.logging.Log;

public class IterableInputBatchTransactionExecutor<T>
extends DisposableBatchTransactionExecutor {
    private static final Log LOG = LoggerFactory.getLogger(IterableInputBatchTransactionExecutor.class);
    private final int batchSize;
    private final UnitOfWork<T> unitOfWork;
    protected final AtomicInteger totalSteps = new AtomicInteger(0);
    protected final AtomicInteger batches = new AtomicInteger(0);
    protected final AtomicInteger successfulSteps = new AtomicInteger(0);
    protected final Iterable<T> input;
    protected final TransactionExecutor executor;
    protected final AtomicBoolean finished = new AtomicBoolean(false);
    protected final BlockingArrayBlockingQueue<T> queue = new BlockingArrayBlockingQueue(10000);

    public IterableInputBatchTransactionExecutor(GraphDatabaseService database, int batchSize, Iterable<T> input, UnitOfWork<T> unitOfWork) {
        this.batchSize = batchSize;
        this.unitOfWork = unitOfWork;
        this.input = input;
        this.executor = new SimpleTransactionExecutor(database);
    }

    @Override
    protected void doExecute() {
        this.populateQueue();
        this.processQueue();
    }

    protected final void populateQueue() {
        new Thread(() -> {
            try {
                for (T input : this.input) {
                    this.queue.offer(input);
                }
            }
            catch (Exception e) {
                LOG.warn("Exception while producing input!", (Throwable)e);
            }
            finally {
                this.finished.set(true);
            }
        }).start();
    }

    protected final void processQueue() {
        while (this.notFinished()) {
            AtomicBoolean polled;
            AtomicInteger currentBatchSteps;
            NullItem result;
            int batchNo = this.batches.incrementAndGet();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Starting a transaction for batch number " + batchNo);
            }
            if ((result = this.executor.executeInTransaction(arg_0 -> this.lambda$processQueue$1(currentBatchSteps = new AtomicInteger(0), polled = new AtomicBoolean(false), batchNo, arg_0), KeepCalmAndCarryOn.getInstance())) != null) {
                this.successfulSteps.addAndGet(currentBatchSteps.get());
                if (!LOG.isDebugEnabled()) continue;
                LOG.debug("Committed transaction for batch number " + batchNo);
                continue;
            }
            LOG.warn("Rolled back transaction for batch number " + batchNo);
            if (polled.get()) continue;
            LOG.warn("Throwing away the head of the queue as the transaction seems to have failed before polling...");
            this.queue.poll();
        }
        LOG.debug("Successfully executed " + this.successfulSteps + " (out of " + this.totalSteps.get() + " ) steps in " + this.batches + " batches");
        if (this.successfulSteps.get() != this.totalSteps.get()) {
            LOG.warn("Failed to execute " + (this.totalSteps.get() - this.successfulSteps.get()) + " steps!");
        }
    }

    private boolean notFinished() {
        return !this.finished.get() || !this.queue.isEmpty();
    }

    private /* synthetic */ NullItem lambda$processQueue$1(AtomicInteger currentBatchSteps, AtomicBoolean polled, int batchNo, GraphDatabaseService database) throws Exception {
        while (this.notFinished() && currentBatchSteps.get() < this.batchSize) {
            Object next;
            try {
                next = this.queue.poll(100L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                continue;
            }
            if (next != null) {
                polled.set(true);
                this.totalSteps.incrementAndGet();
                this.unitOfWork.execute(database, next, batchNo, currentBatchSteps.incrementAndGet());
                continue;
            }
            if (this.finished.get()) break;
            LOG.warn("Waited for over 100ms but no input arrived. Still expecting more input. ");
        }
        return NullItem.getInstance();
    }
}

