/*
 * Decompiled with CFR 0.152.
 */
package com.marklogic.client.datamovement.impl;

import com.marklogic.client.DatabaseClient;
import com.marklogic.client.DatabaseClientFactory;
import com.marklogic.client.Transaction;
import com.marklogic.client.datamovement.DataMovementException;
import com.marklogic.client.datamovement.DataMovementManager;
import com.marklogic.client.datamovement.Forest;
import com.marklogic.client.datamovement.ForestConfiguration;
import com.marklogic.client.datamovement.JobTicket;
import com.marklogic.client.datamovement.WriteBatch;
import com.marklogic.client.datamovement.WriteBatchListener;
import com.marklogic.client.datamovement.WriteBatcher;
import com.marklogic.client.datamovement.WriteEvent;
import com.marklogic.client.datamovement.WriteFailureListener;
import com.marklogic.client.datamovement.impl.BatchWriteSet;
import com.marklogic.client.datamovement.impl.BatcherImpl;
import com.marklogic.client.datamovement.impl.DataMovementManagerImpl;
import com.marklogic.client.document.DocumentWriteOperation;
import com.marklogic.client.document.ServerTransform;
import com.marklogic.client.document.XMLDocumentManager;
import com.marklogic.client.impl.ClientCookie;
import com.marklogic.client.impl.DocumentWriteOperationImpl;
import com.marklogic.client.impl.Utilities;
import com.marklogic.client.io.DocumentMetadataHandle;
import com.marklogic.client.io.Format;
import com.marklogic.client.io.marker.AbstractWriteHandle;
import com.marklogic.client.io.marker.ContentHandle;
import com.marklogic.client.io.marker.DocumentMetadataWriteHandle;
import com.marklogic.client.io.marker.StructureReadHandle;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WriteBatcherImpl
extends BatcherImpl
implements WriteBatcher {
    private static Logger logger = LoggerFactory.getLogger(WriteBatcherImpl.class);
    private int transactionSize;
    private String temporalCollection;
    private ServerTransform transform;
    private ForestConfiguration forestConfig;
    private LinkedBlockingQueue<DocumentWriteOperation> queue = new LinkedBlockingQueue();
    private List<WriteBatchListener> successListeners = new ArrayList<WriteBatchListener>();
    private List<WriteFailureListener> failureListeners = new ArrayList<WriteFailureListener>();
    private AtomicLong batchNumber = new AtomicLong(0L);
    private AtomicLong batchCounter = new AtomicLong(0L);
    private AtomicLong itemsSoFar = new AtomicLong(0L);
    private HostInfo[] hostInfos;
    private boolean initialized = false;
    private CompletableThreadPoolExecutor threadPool = null;
    private final AtomicBoolean stopped = new AtomicBoolean(false);
    private final AtomicBoolean started = new AtomicBoolean(false);
    private boolean usingTransactions = false;
    private JobTicket jobTicket;
    private Calendar jobStartTime;
    private Calendar jobEndTime;
    private DocumentMetadataHandle defaultMetadata;

    public WriteBatcherImpl(DataMovementManager moveMgr, ForestConfiguration forestConfig) {
        super(moveMgr);
        this.withForestConfig(forestConfig);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void initialize() {
        if (this.initialized) {
            return;
        }
        WriteBatcherImpl writeBatcherImpl = this;
        synchronized (writeBatcherImpl) {
            if (this.initialized) {
                return;
            }
            if (this.getBatchSize() <= 0) {
                this.withBatchSize(1);
                logger.warn("batchSize should be 1 or greater--setting batchSize to 1");
            }
            if (this.transactionSize > 1) {
                this.usingTransactions = true;
            }
            if (this.getThreadCount() <= 0) {
                this.withThreadCount(this.hostInfos.length);
                logger.warn("threadCount should be 1 or greater--setting threadCount to number of hosts ({})", (Object)this.hostInfos.length);
            }
            this.threadPool = new CompletableThreadPoolExecutor(this.getThreadCount(), this.getThreadCount(), 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(this.getThreadCount() * 3));
            this.threadPool.allowCoreThreadTimeOut(true);
            this.initialized = true;
            logger.info("threadCount={}", (Object)this.getThreadCount());
            logger.info("batchSize={}", (Object)this.getBatchSize());
            if (this.usingTransactions) {
                logger.info("transactionSize={}", (Object)this.transactionSize);
            }
            this.jobStartTime = Calendar.getInstance();
            this.started.set(true);
        }
    }

    @Override
    public WriteBatcher add(String uri, AbstractWriteHandle contentHandle) {
        this.add(uri, null, contentHandle);
        return this;
    }

    @Override
    public WriteBatcher addAs(String uri, Object content) {
        return this.addAs(uri, null, content);
    }

    @Override
    public WriteBatcher add(DocumentWriteOperation writeOperation) {
        boolean timeToWriteBatch;
        if (writeOperation.getUri() == null) {
            throw new IllegalArgumentException("uri must not be null");
        }
        if (writeOperation.getContent() == null) {
            throw new IllegalArgumentException("contentHandle must not be null");
        }
        this.initialize();
        this.requireNotStopped();
        this.queue.add(writeOperation);
        logger.trace("add uri={}", (Object)writeOperation.getUri());
        long recordNum = this.batchCounter.incrementAndGet();
        boolean bl = timeToWriteBatch = recordNum % (long)this.getBatchSize() == 0L;
        if (timeToWriteBatch) {
            DocumentWriteOperation doc;
            BatchWriteSet writeSet = this.newBatchWriteSet(false);
            int minBatchSize = 0;
            if (this.defaultMetadata != null) {
                writeSet.getWriteSet().add(new DocumentWriteOperationImpl(DocumentWriteOperation.OperationType.METADATA_DEFAULT, null, this.defaultMetadata, null));
                minBatchSize = 1;
            }
            for (int i = 0; i < this.getBatchSize() && (doc = this.queue.poll()) != null; ++i) {
                writeSet.getWriteSet().add(doc);
            }
            if (writeSet.getWriteSet().size() > minBatchSize) {
                this.threadPool.submit(new BatchWriter(writeSet));
            }
        }
        return this;
    }

    @Override
    public WriteBatcher add(String uri, DocumentMetadataWriteHandle metadataHandle, AbstractWriteHandle contentHandle) {
        this.add(new DocumentWriteOperationImpl(DocumentWriteOperation.OperationType.DOCUMENT_WRITE, uri, metadataHandle, contentHandle));
        return this;
    }

    @Override
    public WriteBatcher add(WriteEvent ... docs) {
        for (WriteEvent doc : docs) {
            this.add(doc.getTargetUri(), doc.getMetadata(), doc.getContent());
        }
        return this;
    }

    @Override
    public WriteBatcher addAs(String uri, DocumentMetadataWriteHandle metadataHandle, Object content) {
        ContentHandle<?> handle;
        if (content == null) {
            throw new IllegalArgumentException("content must not be null");
        }
        Class<?> as = content.getClass();
        if (AbstractWriteHandle.class.isAssignableFrom(as)) {
            handle = (ContentHandle<?>)content;
        } else {
            ContentHandle<?> contentHandle = DatabaseClientFactory.getHandleRegistry().makeHandle(as);
            Utilities.setHandleContent(contentHandle, content);
            handle = contentHandle;
        }
        return this.add(uri, metadataHandle, handle);
    }

    private void requireInitialized() {
        if (!this.initialized) {
            throw new IllegalStateException("This operation must be called after starting this job");
        }
    }

    private void requireNotInitialized() {
        if (this.initialized) {
            throw new IllegalStateException("Configuration cannot be changed after starting this job or calling add or addAs");
        }
    }

    private void requireNotStopped() {
        if (this.isStopped()) {
            throw new IllegalStateException("This instance has been stopped");
        }
    }

    private BatchWriteSet newBatchWriteSet(boolean forceNewTransaction) {
        long batchNum = this.batchNumber.incrementAndGet();
        return this.newBatchWriteSet(forceNewTransaction, batchNum);
    }

    private BatchWriteSet newBatchWriteSet(boolean forceNewTransaction, long batchNum) {
        int hostToUse = (int)(batchNum % (long)this.hostInfos.length);
        HostInfo host = this.hostInfos[hostToUse];
        DatabaseClient hostClient = host.client;
        BatchWriteSet batchWriteSet = new BatchWriteSet(this, hostClient.newDocumentManager().newWriteSet(), hostClient, this.getTransform(), this.getTemporalCollection());
        batchWriteSet.setBatchNumber(batchNum);
        if (this.usingTransactions) {
            batchWriteSet.onBeforeWrite(() -> {
                boolean timeForNewTransaction;
                long transactionCount = host.transactionCounter.getAndIncrement();
                boolean bl = timeForNewTransaction = transactionCount % (long)this.getTransactionSize() == 0L;
                if (timeForNewTransaction) {
                    batchWriteSet.setTransactionInfo(this.transactionOpener(host, hostClient, this.transactionSize));
                } else {
                    TransactionInfo transactionInfo = host.getTransactionInfo();
                    if (transactionInfo != null) {
                        batchWriteSet.setTransactionInfo(transactionInfo);
                        transactionInfo.inProcess.incrementAndGet();
                    } else {
                        batchWriteSet.setTransactionInfo(this.transactionOpener(host, hostClient, this.transactionSize));
                    }
                }
            });
        }
        batchWriteSet.onSuccess(() -> {
            boolean timeToCommit = true;
            boolean committed = false;
            if (this.usingTransactions) {
                TransactionInfo transactionInfo = batchWriteSet.getTransactionInfo();
                long batchNumFinished = transactionInfo.batchesFinished.incrementAndGet();
                boolean bl = timeToCommit = batchNumFinished == (long)this.getTransactionSize();
                if (forceNewTransaction || timeToCommit) {
                    if (transactionInfo.alive.get()) {
                        if (transactionInfo.inProcess.get() <= 1L) {
                            host.transactionCounter.set(0L);
                            transactionInfo.transaction.commit();
                            committed = true;
                            this.sendSuccessToListeners(transactionInfo.batches);
                        } else {
                            transactionInfo.batches.add(batchWriteSet);
                            host.unfinishedTransactions.add(transactionInfo);
                            timeToCommit = false;
                        }
                    }
                } else {
                    transactionInfo.batches.add(batchWriteSet);
                }
                transactionInfo.inProcess.decrementAndGet();
            } else {
                committed = true;
            }
            if (committed) {
                this.sendSuccessToListeners(batchWriteSet);
            }
        });
        batchWriteSet.onFailure(throwable -> {
            host.transactionCounter.set(0L);
            if (this.usingTransactions) {
                TransactionInfo transactionInfo = batchWriteSet.getTransactionInfo();
                transactionInfo.throwable.set((Throwable)throwable);
                if (transactionInfo.inProcess.get() <= 1L) {
                    try {
                        logger.warn("Rolling back transaction because of throwable: {}", (Object)throwable.toString());
                        transactionInfo.transaction.rollback();
                    }
                    catch (Throwable t2) {
                        throwable.addSuppressed(t2);
                        logger.warn("Failure to rollback transaction: {}", (Object)t2.toString());
                    }
                    this.sendThrowableToListeners((Throwable)throwable, null, (Collection<BatchWriteSet>)transactionInfo.batches);
                } else {
                    host.unfinishedTransactions.add(transactionInfo);
                }
                transactionInfo.inProcess.decrementAndGet();
            }
            this.sendThrowableToListeners((Throwable)throwable, "Error writing batch: {}", batchWriteSet);
        });
        return batchWriteSet;
    }

    @Override
    public WriteBatcher onBatchSuccess(WriteBatchListener listener) {
        if (listener == null) {
            throw new IllegalArgumentException("listener must not be null");
        }
        this.successListeners.add(listener);
        return this;
    }

    @Override
    public WriteBatcher onBatchFailure(WriteFailureListener listener) {
        if (listener == null) {
            throw new IllegalArgumentException("listener must not be null");
        }
        this.failureListeners.add(listener);
        return this;
    }

    @Override
    public void retryWithFailureListeners(WriteBatch batch) {
        this.retry(batch, true);
    }

    @Override
    public void retry(WriteBatch batch) {
        this.retry(batch, false);
    }

    private void retry(WriteBatch batch, boolean callFailListeners) {
        if (this.isStopped()) {
            logger.warn("Job is now stopped, aborting the retry");
            return;
        }
        if (batch == null) {
            throw new IllegalArgumentException("batch must not be null");
        }
        boolean forceNewTransaction = true;
        BatchWriteSet writeSet = this.newBatchWriteSet(forceNewTransaction, batch.getJobBatchNumber());
        if (!callFailListeners) {
            writeSet.onFailure(throwable -> {
                if (throwable instanceof RuntimeException) {
                    throw (RuntimeException)throwable;
                }
                throw new DataMovementException("Failed to retry batch", (Throwable)throwable);
            });
        }
        for (WriteEvent doc : (WriteEvent[])batch.getItems()) {
            writeSet.getWriteSet().add(doc.getTargetUri(), doc.getMetadata(), doc.getContent());
        }
        BatchWriter runnable = new BatchWriter(writeSet);
        runnable.run();
    }

    @Override
    public WriteBatchListener[] getBatchSuccessListeners() {
        return this.successListeners.toArray(new WriteBatchListener[this.successListeners.size()]);
    }

    @Override
    public WriteFailureListener[] getBatchFailureListeners() {
        return this.failureListeners.toArray(new WriteFailureListener[this.failureListeners.size()]);
    }

    @Override
    public void setBatchSuccessListeners(WriteBatchListener ... listeners) {
        this.requireNotInitialized();
        this.successListeners.clear();
        if (listeners != null) {
            for (WriteBatchListener listener : listeners) {
                this.successListeners.add(listener);
            }
        }
    }

    @Override
    public void setBatchFailureListeners(WriteFailureListener ... listeners) {
        this.requireNotInitialized();
        this.failureListeners.clear();
        if (listeners != null) {
            for (WriteFailureListener listener : listeners) {
                this.failureListeners.add(listener);
            }
        }
    }

    @Override
    public void flushAsync() {
        this.flush(false);
    }

    @Override
    public void flushAndWait() {
        this.flush(true);
    }

    private void flush(boolean waitForCompletion) {
        this.requireInitialized();
        this.requireNotStopped();
        ArrayList docs = new ArrayList();
        long recordInBatch = this.batchCounter.getAndSet(0L);
        this.queue.drainTo(docs);
        logger.info("flushing {} queued docs", (Object)docs.size());
        Iterator iter = docs.iterator();
        boolean forceNewTransaction = true;
        int i = 0;
        while (iter.hasNext()) {
            if (this.isStopped()) {
                logger.warn("Job is now stopped, preventing the flush of {} queued docs", (Object)(docs.size() - i));
                if (waitForCompletion) {
                    this.awaitCompletion();
                }
                return;
            }
            BatchWriteSet writeSet = this.newBatchWriteSet(forceNewTransaction);
            if (this.defaultMetadata != null) {
                writeSet.getWriteSet().add(new DocumentWriteOperationImpl(DocumentWriteOperation.OperationType.METADATA_DEFAULT, null, this.defaultMetadata, null));
            }
            for (int j = 0; j < this.getBatchSize() && iter.hasNext(); ++j) {
                DocumentWriteOperation doc = (DocumentWriteOperation)iter.next();
                writeSet.getWriteSet().add(doc);
            }
            this.threadPool.submit(new BatchWriter(writeSet));
            ++i;
        }
        if (waitForCompletion) {
            this.awaitCompletion();
        }
        if (this.usingTransactions) {
            Runnable cleanupTransactions = () -> {
                this.cleanupUnfinishedTransactions();
                for (HostInfo host : this.hostInfos) {
                    TransactionInfo transactionInfo;
                    while ((transactionInfo = host.getTransactionInfoAndDrainPermits()) != null) {
                        TransactionInfo transactionInfoCopy = transactionInfo;
                        this.completeTransaction(transactionInfoCopy);
                    }
                }
            };
            if (waitForCompletion) {
                cleanupTransactions.run();
            } else {
                this.threadPool.submit(cleanupTransactions);
            }
        }
    }

    public boolean completeTransaction(TransactionInfo transactionInfo) {
        boolean completed = false;
        try {
            if (transactionInfo.alive.get() && transactionInfo.inProcess.get() <= 0L && transactionInfo.written.get()) {
                if (transactionInfo.throwable.get() != null) {
                    transactionInfo.transaction.rollback();
                    this.sendThrowableToListeners(transactionInfo.throwable.get(), "Failure during transaction: {}", transactionInfo.batches);
                } else {
                    transactionInfo.transaction.commit();
                    this.sendSuccessToListeners(transactionInfo.batches);
                }
                completed = true;
            }
        }
        catch (Throwable t) {
            transactionInfo.throwable.set(t);
            this.sendThrowableToListeners(t, "Failure to complete transaction: {}", transactionInfo.batches);
        }
        return completed;
    }

    private void sendSuccessToListeners(Collection<BatchWriteSet> batches) {
        for (BatchWriteSet batch : batches) {
            this.sendSuccessToListeners(batch);
        }
    }

    private void sendSuccessToListeners(BatchWriteSet batchWriteSet) {
        batchWriteSet.setItemsSoFar(this.itemsSoFar.addAndGet(batchWriteSet.getWriteSet().size()));
        WriteBatch batch = batchWriteSet.getBatchOfWriteEvents();
        for (WriteBatchListener successListener : this.successListeners) {
            try {
                successListener.processEvent(batch);
            }
            catch (Throwable t) {
                logger.error("Exception thrown by an onBatchSuccess listener", t);
            }
        }
    }

    private void sendThrowableToListeners(Throwable t, String message, Collection<BatchWriteSet> batches) {
        for (BatchWriteSet batchWriteSet : batches) {
            this.sendThrowableToListeners(t, null, batchWriteSet);
        }
        if (message != null) {
            logger.warn(message, (Object)t.toString());
        }
    }

    private void sendThrowableToListeners(Throwable t, String message, BatchWriteSet batchWriteSet) {
        batchWriteSet.setItemsSoFar(this.itemsSoFar.get());
        WriteBatch batch = batchWriteSet.getBatchOfWriteEvents();
        for (WriteFailureListener failureListener : this.failureListeners) {
            try {
                failureListener.processFailure(batch, t);
            }
            catch (Throwable t2) {
                logger.error("Exception thrown by an onBatchFailure listener", t2);
            }
        }
        if (message != null) {
            logger.warn(message, (Object)t.toString());
        }
    }

    public void start(JobTicket ticket) {
        this.jobTicket = ticket;
        this.initialize();
    }

    public void stop() {
        this.jobEndTime = Calendar.getInstance();
        this.stopped.set(true);
        if (this.threadPool != null) {
            this.threadPool.shutdownNow();
        }
        this.closeAllListeners();
    }

    private void closeAllListeners() {
        for (WriteBatchListener writeBatchListener : this.getBatchSuccessListeners()) {
            if (!(writeBatchListener instanceof AutoCloseable)) continue;
            try {
                ((AutoCloseable)((Object)writeBatchListener)).close();
            }
            catch (Exception e) {
                logger.error("onBatchSuccess listener cannot be closed", (Throwable)e);
            }
        }
        for (WriteFailureListener writeFailureListener : this.getBatchFailureListeners()) {
            if (!(writeFailureListener instanceof AutoCloseable)) continue;
            try {
                ((AutoCloseable)((Object)writeFailureListener)).close();
            }
            catch (Exception e) {
                logger.error("onBatchFailure listener cannot be closed", (Throwable)e);
            }
        }
    }

    @Override
    public boolean isStopped() {
        return this.stopped.get();
    }

    @Override
    public boolean isStarted() {
        return this.started.get();
    }

    @Override
    public JobTicket getJobTicket() {
        this.requireInitialized();
        return this.jobTicket;
    }

    @Override
    public Calendar getJobStartTime() {
        if (!this.isStarted()) {
            return null;
        }
        return this.jobStartTime;
    }

    @Override
    public Calendar getJobEndTime() {
        if (!this.isStopped()) {
            return null;
        }
        return this.jobEndTime;
    }

    @Override
    public boolean awaitCompletion(long timeout, TimeUnit unit) throws InterruptedException {
        return this.threadPool.awaitCompletion(timeout, unit);
    }

    @Override
    public boolean awaitCompletion() {
        try {
            return this.awaitCompletion(Long.MAX_VALUE, TimeUnit.DAYS);
        }
        catch (InterruptedException e) {
            logger.debug("awaitCompletion caught InterruptedException");
            return false;
        }
    }

    @Override
    public WriteBatcher withJobName(String jobName) {
        this.requireNotInitialized();
        super.withJobName(jobName);
        return this;
    }

    @Override
    public WriteBatcher withJobId(String jobId) {
        this.requireNotInitialized();
        super.withJobId(jobId);
        return this;
    }

    @Override
    public WriteBatcher withBatchSize(int batchSize) {
        this.requireNotInitialized();
        super.withBatchSize(batchSize);
        return this;
    }

    @Override
    public WriteBatcher withThreadCount(int threadCount) {
        this.requireNotInitialized();
        super.withThreadCount(threadCount);
        return this;
    }

    public WriteBatcher withTransactionSize(int transactionSize) {
        this.requireNotInitialized();
        this.transactionSize = transactionSize;
        return this;
    }

    public int getTransactionSize() {
        return this.transactionSize;
    }

    @Override
    public WriteBatcher withTemporalCollection(String collection) {
        this.requireNotInitialized();
        this.temporalCollection = collection;
        return this;
    }

    @Override
    public String getTemporalCollection() {
        return this.temporalCollection;
    }

    @Override
    public WriteBatcher withTransform(ServerTransform transform) {
        this.requireNotInitialized();
        this.transform = transform;
        return this;
    }

    @Override
    public DatabaseClient getPrimaryClient() {
        return this.getMoveMgr().getPrimaryClient();
    }

    @Override
    public ServerTransform getTransform() {
        return this.transform;
    }

    @Override
    public synchronized WriteBatcher withForestConfig(ForestConfiguration forestConfig) {
        super.withForestConfig(forestConfig);
        if (forestConfig == null) {
            throw new IllegalArgumentException("forestConfig must not be null");
        }
        Forest[] forests = forestConfig.listForests();
        if (forests.length == 0) {
            throw new IllegalStateException("WriteBatcher requires at least one writeable forest");
        }
        HashMap<String, Forest> hosts = new HashMap<String, Forest>();
        for (Forest forest : forests) {
            if (forest.getPreferredHost() == null) {
                throw new IllegalStateException("Hostname must not be null for any forest");
            }
            hosts.put(forest.getPreferredHost(), forest);
        }
        for (Forest forest : forests) {
            if (forest.getPreferredHostType() != Forest.HostType.REQUEST_HOST || forest.getHost().toLowerCase().equals(forest.getRequestHost().toLowerCase()) || !hosts.containsKey(forest.getHost())) continue;
            hosts.remove(forest.getHost());
        }
        HashMap<String, HostInfo> existingHostInfos = new HashMap<String, HostInfo>();
        HashMap<String, HostInfo> removedHostInfos = new HashMap<String, HostInfo>();
        if (this.hostInfos != null) {
            for (HostInfo hostInfo : this.hostInfos) {
                existingHostInfos.put(hostInfo.hostName, hostInfo);
                removedHostInfos.put(hostInfo.hostName, hostInfo);
            }
        }
        logger.info("(withForestConfig) Using forests on {} hosts for \"{}\"", hosts.keySet(), (Object)forests[0].getDatabaseName());
        HostInfo[] newHostInfos = new HostInfo[hosts.size()];
        int i = 0;
        for (String host : hosts.keySet()) {
            if (existingHostInfos.get(host) != null) {
                newHostInfos[i] = (HostInfo)existingHostInfos.get(host);
                removedHostInfos.remove(host);
            } else {
                newHostInfos[i] = new HostInfo();
                newHostInfos[i].hostName = host;
                Forest forest = (Forest)hosts.get(host);
                newHostInfos[i].client = this.getMoveMgr().getForestClient(forest);
                if (this.getMoveMgr().getConnectionType() == DatabaseClient.ConnectionType.DIRECT) {
                    logger.info("Adding DatabaseClient on port {} for host \"{}\" to the rotation", (Object)newHostInfos[i].client.getPort(), (Object)host);
                }
            }
            ++i;
        }
        this.forestConfig = forestConfig;
        this.hostInfos = newHostInfos;
        if (removedHostInfos.size() > 0) {
            DataMovementManagerImpl moveMgrImpl = this.getMoveMgr();
            String primaryHost = moveMgrImpl.getPrimaryClient().getHost();
            if (removedHostInfos.containsKey(primaryHost)) {
                int randomPos = Math.abs(primaryHost.hashCode()) % newHostInfos.length;
                moveMgrImpl.setPrimaryClient(newHostInfos[randomPos].client);
            }
            ArrayList tasks = new ArrayList();
            if (this.threadPool != null) {
                this.threadPool.getQueue().drainTo(tasks);
            }
            for (Runnable task : tasks) {
                BatchWriter writerTask;
                if (task instanceof BatchWriter && removedHostInfos.containsKey((writerTask = (BatchWriter)task).writeSet.getClient().getHost())) {
                    boolean forceNewTransaction = true;
                    BatchWriteSet writeSet = this.newBatchWriteSet(forceNewTransaction, writerTask.writeSet.getBatchNumber());
                    writeSet.onFailure(throwable -> {
                        if (throwable instanceof RuntimeException) {
                            throw (RuntimeException)throwable;
                        }
                        throw new DataMovementException("Failed to retry batch after failover", (Throwable)throwable);
                    });
                    for (WriteEvent doc : (WriteEvent[])writerTask.writeSet.getBatchOfWriteEvents().getItems()) {
                        writeSet.getWriteSet().add(doc.getTargetUri(), doc.getMetadata(), doc.getContent());
                    }
                    BatchWriter retryWriterTask = new BatchWriter(writeSet);
                    Runnable fretryWriterTask = (Runnable)((Object)this.threadPool.submit(retryWriterTask));
                    this.threadPool.replaceTask(writerTask, fretryWriterTask);
                    continue;
                }
                Runnable fTask = (Runnable)((Object)this.threadPool.submit(task));
                this.threadPool.replaceTask(task, fTask);
            }
            for (HostInfo removedHostInfo : removedHostInfos.values()) {
                this.cleanupUnfinishedTransactions(removedHostInfo);
            }
        }
        return this;
    }

    @Override
    public ForestConfiguration getForestConfig() {
        return this.forestConfig;
    }

    private void cleanupUnfinishedTransactions() {
        for (HostInfo host : this.hostInfos) {
            this.cleanupUnfinishedTransactions(host);
        }
    }

    private void cleanupUnfinishedTransactions(HostInfo host) {
        Iterator<TransactionInfo> iterator = host.unfinishedTransactions.iterator();
        while (iterator.hasNext()) {
            TransactionInfo transactionInfo = iterator.next();
            if (!transactionInfo.alive.get()) {
                iterator.remove();
                continue;
            }
            if (transactionInfo.queuedForCleanup.get() || transactionInfo.inProcess.get() > 0L) continue;
            if (transactionInfo.written.get()) {
                transactionInfo.queuedForCleanup.set(true);
                this.threadPool.submit(() -> {
                    if (this.completeTransaction(transactionInfo)) {
                        host.unfinishedTransactions.remove(transactionInfo);
                    } else {
                        transactionInfo.queuedForCleanup.set(false);
                    }
                });
                continue;
            }
            iterator.remove();
        }
    }

    public TransactionInfo transactionOpener(final HostInfo host, DatabaseClient client, int transactionSize) {
        final TransactionInfo transactionInfo = new TransactionInfo();
        transactionInfo.transactionPermits.set(transactionSize - 1);
        final Transaction realTransaction = client.openTransaction();
        logger.trace("opened transaction {}", (Object)realTransaction.getTransactionId());
        Transaction transaction = new Transaction(){

            @Override
            public void commit() {
                host.releaseTransactionInfo(transactionInfo);
                boolean alive = transactionInfo.alive.getAndSet(false);
                if (alive) {
                    realTransaction.commit();
                    logger.trace("committed transaction {}", (Object)realTransaction.getTransactionId());
                }
            }

            @Override
            public List<ClientCookie> getCookies() {
                return realTransaction.getCookies();
            }

            @Override
            public String getHostId() {
                return realTransaction.getHostId();
            }

            @Override
            public String getTransactionId() {
                return realTransaction.getTransactionId();
            }

            @Override
            public <T extends StructureReadHandle> T readStatus(T handle) {
                return realTransaction.readStatus(handle);
            }

            @Override
            public void rollback() {
                host.releaseTransactionInfo(transactionInfo);
                boolean alive = transactionInfo.alive.getAndSet(false);
                if (alive) {
                    realTransaction.rollback();
                    logger.trace("rolled back transaction {}", (Object)realTransaction.getTransactionId());
                }
            }
        };
        transactionInfo.transaction = transaction;
        transactionInfo.alive.set(true);
        transactionInfo.inProcess.incrementAndGet();
        host.addTransactionInfo(transactionInfo);
        this.cleanupUnfinishedTransactions();
        return transactionInfo;
    }

    @Override
    public WriteBatcher withDefaultMetadata(DocumentMetadataHandle handle) {
        this.defaultMetadata = handle;
        return this;
    }

    @Override
    public void addAll(Stream<? extends DocumentWriteOperation> operations) {
        operations.forEach(this::add);
    }

    @Override
    public DocumentMetadataHandle getDocumentMetadata() {
        return this.defaultMetadata;
    }

    public static class CompletableThreadPoolExecutor
    extends ThreadPoolExecutor {
        Set<Runnable> queuedAndExecutingTasks = ConcurrentHashMap.newKeySet();
        Map<Thread, ConcurrentLinkedQueue<Runnable>> activeSnapshots = new ConcurrentHashMap<Thread, ConcurrentLinkedQueue<Runnable>>();

        public CompletableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> queue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, queue, new CompletableRejectedExecutionHandler());
            ((CompletableRejectedExecutionHandler)this.getRejectedExecutionHandler()).setThreadPool(this);
        }

        @Override
        public void execute(Runnable r) {
            this.queuedAndExecutingTasks.add(r);
            super.execute(r);
        }

        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            this.taskComplete(r);
            super.afterExecute(r, t);
        }

        public ConcurrentLinkedQueue<Runnable> snapshotQueuedAndExecutingTasks() {
            ConcurrentLinkedQueue<Runnable> snapshot = new ConcurrentLinkedQueue<Runnable>();
            this.activeSnapshots.put(Thread.currentThread(), snapshot);
            snapshot.addAll(this.queuedAndExecutingTasks);
            for (Runnable task : snapshot) {
                if (this.queuedAndExecutingTasks.contains(task)) continue;
                snapshot.remove(task);
            }
            return snapshot;
        }

        public void removeSnapshot() {
            this.activeSnapshots.remove(Thread.currentThread());
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void taskComplete(Runnable r) {
            boolean removedFromASnapshot = false;
            this.queuedAndExecutingTasks.remove(r);
            for (ConcurrentLinkedQueue<Runnable> snapshot : this.activeSnapshots.values()) {
                if (!snapshot.remove(r)) continue;
                removedFromASnapshot = true;
            }
            if (removedFromASnapshot) {
                Runnable runnable = r;
                synchronized (runnable) {
                    r.notifyAll();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void replaceTask(Runnable oldTask, Runnable newTask) {
            boolean removedFromASnapshot = false;
            if (this.queuedAndExecutingTasks.remove(oldTask)) {
                this.queuedAndExecutingTasks.add(newTask);
            }
            for (ConcurrentLinkedQueue<Runnable> snapshot : this.activeSnapshots.values()) {
                if (!snapshot.remove(oldTask)) continue;
                snapshot.add(newTask);
                removedFromASnapshot = true;
            }
            if (removedFromASnapshot) {
                Runnable runnable = oldTask;
                synchronized (runnable) {
                    oldTask.notifyAll();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        public boolean awaitCompletion(long timeout, TimeUnit unit) throws InterruptedException {
            if (unit == null) {
                throw new IllegalArgumentException("unit cannot be null");
            }
            ConcurrentLinkedQueue<Runnable> snapshotQueuedAndExecutingTasks = this.snapshotQueuedAndExecutingTasks();
            try {
                long duration = unit.convert(timeout, TimeUnit.MILLISECONDS);
                Runnable task = null;
                while ((task = snapshotQueuedAndExecutingTasks.peek()) != null) {
                    Runnable runnable = task;
                    synchronized (runnable) {
                        while (snapshotQueuedAndExecutingTasks.contains(task) && this.queuedAndExecutingTasks.contains(task)) {
                            long startTime = System.currentTimeMillis();
                            task.wait(duration);
                            if ((duration -= System.currentTimeMillis() - startTime) > 0L) continue;
                            logger.debug("[awaitCompletion] timeout");
                            boolean bl = false;
                            return bl;
                        }
                    }
                }
                return true;
            }
            finally {
                this.removeSnapshot();
            }
        }

        @Override
        public List<Runnable> shutdownNow() {
            List<Runnable> tasks = super.shutdownNow();
            for (Runnable task : tasks) {
                this.taskComplete(task);
            }
            return tasks;
        }
    }

    public static class CompletableRejectedExecutionHandler
    extends ThreadPoolExecutor.CallerRunsPolicy {
        CompletableThreadPoolExecutor threadPool = null;

        public void setThreadPool(CompletableThreadPoolExecutor threadPool) {
            this.threadPool = threadPool;
        }

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            super.rejectedExecution(r, e);
            this.threadPool.taskComplete(r);
        }
    }

    public static class BatchWriter
    implements Runnable {
        private BatchWriteSet writeSet;

        public BatchWriter(BatchWriteSet writeSet) {
            if (writeSet.getWriteSet().size() == 0) {
                throw new IllegalStateException("Attempt to write an empty batch");
            }
            this.writeSet = writeSet;
        }

        @Override
        public void run() {
            block8: {
                try {
                    TransactionInfo transactionInfo;
                    Runnable onBeforeWrite = this.writeSet.getOnBeforeWrite();
                    if (onBeforeWrite != null) {
                        onBeforeWrite.run();
                    }
                    if ((transactionInfo = this.writeSet.getTransactionInfo()) == null || transactionInfo.alive.get()) {
                        Transaction transaction = null;
                        if (transactionInfo != null) {
                            transaction = transactionInfo.transaction;
                            transactionInfo.written.set(true);
                        }
                        logger.trace("begin write batch {} to forest on host \"{}\"", (Object)this.writeSet.getBatchNumber(), (Object)this.writeSet.getClient().getHost());
                        if (this.writeSet.getTemporalCollection() == null) {
                            this.writeSet.getClient().newDocumentManager().write(this.writeSet.getWriteSet(), this.writeSet.getTransform(), transaction);
                        } else {
                            XMLDocumentManager docMgr = this.writeSet.getClient().newXMLDocumentManager();
                            docMgr.setContentFormat(Format.UNKNOWN);
                            docMgr.write(this.writeSet.getWriteSet(), this.writeSet.getTransform(), transaction, this.writeSet.getTemporalCollection());
                        }
                        this.closeAllHandles();
                        Runnable onSuccess = this.writeSet.getOnSuccess();
                        if (onSuccess != null) {
                            onSuccess.run();
                        }
                        break block8;
                    }
                    throw new DataMovementException("Failed to write because transaction already underwent commit or rollback", null);
                }
                catch (Throwable t) {
                    logger.trace("failed batch sent to forest on host \"{}\"", (Object)this.writeSet.getClient().getHost());
                    Consumer<Throwable> onFailure = this.writeSet.getOnFailure();
                    if (onFailure == null) break block8;
                    onFailure.accept(t);
                }
            }
        }

        private void closeAllHandles() throws Throwable {
            Throwable lastThrowable = null;
            for (DocumentWriteOperation doc : this.writeSet.getWriteSet()) {
                try {
                    if (doc.getContent() instanceof Closeable) {
                        ((Closeable)((Object)doc.getContent())).close();
                    }
                    if (!(doc.getMetadata() instanceof Closeable)) continue;
                    ((Closeable)((Object)doc.getMetadata())).close();
                }
                catch (Throwable t) {
                    logger.error("error calling close()", t);
                    lastThrowable = t;
                }
            }
            if (lastThrowable != null) {
                throw lastThrowable;
            }
        }
    }

    public static class TransactionInfo {
        private Transaction transaction;
        public AtomicBoolean alive = new AtomicBoolean(false);
        public AtomicBoolean written = new AtomicBoolean(false);
        public AtomicReference<Throwable> throwable = new AtomicReference();
        public AtomicLong inProcess = new AtomicLong(0L);
        public AtomicLong batchesFinished = new AtomicLong(0L);
        public AtomicBoolean queuedForCleanup = new AtomicBoolean(false);
        public ConcurrentLinkedQueue<BatchWriteSet> batches = new ConcurrentLinkedQueue();
        private AtomicInteger transactionPermits = new AtomicInteger(0);
    }

    public static class HostInfo {
        public String hostName;
        public DatabaseClient client;
        public AtomicLong transactionCounter = new AtomicLong(0L);
        public ConcurrentLinkedDeque<TransactionInfo> transactionInfos = new ConcurrentLinkedDeque();
        public ConcurrentLinkedQueue<TransactionInfo> unfinishedTransactions = new ConcurrentLinkedQueue();

        private TransactionInfo getTransactionInfoAndDrainPermits() {
            TransactionInfo transactionInfo = this.transactionInfos.poll();
            if (transactionInfo == null) {
                return null;
            }
            int permits = transactionInfo.transactionPermits.getAndSet(0);
            if (permits > 0) {
                return transactionInfo;
            }
            return null;
        }

        private TransactionInfo getTransactionInfo() {
            TransactionInfo transactionInfo = this.transactionInfos.poll();
            if (transactionInfo == null) {
                return null;
            }
            int permits = transactionInfo.transactionPermits.decrementAndGet();
            if (permits >= 0) {
                if (permits > 0) {
                    this.transactionInfos.addFirst(transactionInfo);
                } else {
                    this.unfinishedTransactions.add(transactionInfo);
                }
                return transactionInfo;
            }
            this.unfinishedTransactions.add(transactionInfo);
            return this.getTransactionInfo();
        }

        public void addTransactionInfo(TransactionInfo transactionInfo) {
            this.transactionInfos.add(transactionInfo);
        }

        public void releaseTransactionInfo(TransactionInfo toRelease) {
            toRelease.transactionPermits.set(0);
            this.transactionInfos.remove(toRelease);
            this.unfinishedTransactions.remove(toRelease);
        }
    }
}

