/*
 * Decompiled with CFR 0.152.
 */
package com.marklogic.client.datamovement.impl;

import com.marklogic.client.DatabaseClient;
import com.marklogic.client.ResourceNotFoundException;
import com.marklogic.client.datamovement.DataMovementException;
import com.marklogic.client.datamovement.DataMovementManager;
import com.marklogic.client.datamovement.Forest;
import com.marklogic.client.datamovement.ForestConfiguration;
import com.marklogic.client.datamovement.JobTicket;
import com.marklogic.client.datamovement.QueryBatch;
import com.marklogic.client.datamovement.QueryBatchException;
import com.marklogic.client.datamovement.QueryBatchListener;
import com.marklogic.client.datamovement.QueryBatcher;
import com.marklogic.client.datamovement.QueryBatcherListener;
import com.marklogic.client.datamovement.QueryEvent;
import com.marklogic.client.datamovement.QueryFailureListener;
import com.marklogic.client.datamovement.impl.BatcherImpl;
import com.marklogic.client.datamovement.impl.DataMovementManagerImpl;
import com.marklogic.client.datamovement.impl.QueryBatchImpl;
import com.marklogic.client.impl.AbstractSearchQueryDefinition;
import com.marklogic.client.impl.HandleAccessor;
import com.marklogic.client.impl.HandleImplementation;
import com.marklogic.client.impl.QueryManagerImpl;
import com.marklogic.client.impl.UrisHandle;
import com.marklogic.client.io.Format;
import com.marklogic.client.io.StringHandle;
import com.marklogic.client.io.marker.StructureWriteHandle;
import com.marklogic.client.query.RawQueryDefinition;
import com.marklogic.client.query.SearchQueryDefinition;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueryBatcherImpl
extends BatcherImpl
implements QueryBatcher {
    private static Logger logger = LoggerFactory.getLogger(QueryBatcherImpl.class);
    private String queryMethod;
    private SearchQueryDefinition query;
    private SearchQueryDefinition originalQuery;
    private Boolean filtered;
    private Iterator<String> iterator;
    private boolean threadCountSet = false;
    private List<QueryBatchListener> urisReadyListeners = new ArrayList<QueryBatchListener>();
    private List<QueryFailureListener> failureListeners = new ArrayList<QueryFailureListener>();
    private List<QueryBatcherListener> jobCompletionListeners = new ArrayList<QueryBatcherListener>();
    private QueryThreadPoolExecutor threadPool;
    private boolean consistentSnapshot = false;
    private final AtomicLong batchNumber = new AtomicLong(0L);
    private final AtomicLong resultsSoFar = new AtomicLong(0L);
    private final AtomicLong serverTimestamp = new AtomicLong(-1L);
    private final AtomicReference<List<DatabaseClient>> clientList = new AtomicReference();
    private Map<Forest, AtomicLong> forestResults = new HashMap<Forest, AtomicLong>();
    private Map<Forest, AtomicBoolean> forestIsDone = new HashMap<Forest, AtomicBoolean>();
    private Map<Forest, AtomicInteger> retryForestMap = new HashMap<Forest, AtomicInteger>();
    private AtomicBoolean runJobCompletionListeners = new AtomicBoolean(false);
    private final Object lock = new Object();
    private final Map<Forest, List<QueryTask>> blackListedTasks = new HashMap<Forest, List<QueryTask>>();
    private boolean isSingleThreaded = false;
    private long maxUris = Long.MAX_VALUE;
    private long maxBatches = Long.MAX_VALUE;
    private int maxDocToUriBatchRatio;
    private int docToUriBatchRatio;
    private int defaultDocBatchSize;
    private int maxUriBatchSize;

    QueryBatcherImpl(SearchQueryDefinition originalQuery, DataMovementManager moveMgr, ForestConfiguration forestConfig, String serializedCtsQuery, Boolean filtered, int maxDocToUriBatchRatio, int defaultDocBatchSize, int maxUriBatchSize) {
        this(moveMgr, forestConfig, maxDocToUriBatchRatio, defaultDocBatchSize, maxUriBatchSize);
        if (serializedCtsQuery != null && serializedCtsQuery.length() > 0 && originalQuery instanceof AbstractSearchQueryDefinition && ((AbstractSearchQueryDefinition)originalQuery).canSerializeQueryAsJSON()) {
            QueryManagerImpl queryMgr = (QueryManagerImpl)this.getPrimaryClient().newQueryManager();
            this.queryMethod = "POST";
            this.query = queryMgr.newRawCtsQueryDefinition(new StringHandle(serializedCtsQuery).withFormat(Format.JSON));
            this.originalQuery = originalQuery;
            if (filtered != null) {
                this.filtered = filtered;
            }
        } else {
            this.initQuery(originalQuery);
        }
    }

    public QueryBatcherImpl(SearchQueryDefinition query, DataMovementManager moveMgr, ForestConfiguration forestConfig) {
        this(moveMgr, forestConfig);
        this.initQuery(query);
    }

    public QueryBatcherImpl(Iterator<String> iterator, DataMovementManager moveMgr, ForestConfiguration forestConfig) {
        this(moveMgr, forestConfig);
        this.iterator = iterator;
    }

    private QueryBatcherImpl(DataMovementManager moveMgr, ForestConfiguration forestConfig, int maxDocToUriBatchRatio, int defaultDocBatchSize, int maxUriBatchSize) {
        this(moveMgr, forestConfig);
        this.maxDocToUriBatchRatio = maxDocToUriBatchRatio;
        this.defaultDocBatchSize = defaultDocBatchSize;
        this.maxUriBatchSize = maxUriBatchSize;
        this.withBatchSize(defaultDocBatchSize);
    }

    private QueryBatcherImpl(DataMovementManager moveMgr, ForestConfiguration forestConfig) {
        super(moveMgr);
        this.withForestConfig(forestConfig);
    }

    public ThreadPoolExecutor getThreadPool() {
        return this.threadPool;
    }

    private void initQuery(SearchQueryDefinition query) {
        if (query == null) {
            throw new IllegalArgumentException("Cannot create QueryBatcher with null query");
        }
        this.queryMethod = Long.compareUnsigned(this.getMoveMgr().getServerVersion(), Long.parseUnsignedLong("10000500")) >= 0 ? "POST" : "GET";
        this.query = query;
        if (query instanceof RawQueryDefinition) {
            RawQueryDefinition rawQuery = (RawQueryDefinition)query;
            StructureWriteHandle handle = rawQuery.getHandle();
            HandleImplementation baseHandle = HandleAccessor.checkHandle(handle, "queryBatcher");
            Format inputFormat = baseHandle.getFormat();
            switch (inputFormat) {
                case UNKNOWN: {
                    baseHandle.setFormat(Format.XML);
                    break;
                }
                case JSON: 
                case XML: {
                    break;
                }
                default: {
                    throw new UnsupportedOperationException("Only XML and JSON raw query definitions are possible.");
                }
            }
        }
    }

    @Override
    public QueryBatcherImpl onUrisReady(QueryBatchListener listener) {
        if (listener == null) {
            throw new IllegalArgumentException("listener must not be null");
        }
        this.urisReadyListeners.add(listener);
        return this;
    }

    @Override
    public QueryBatcherImpl onQueryFailure(QueryFailureListener listener) {
        if (listener == null) {
            throw new IllegalArgumentException("listener must not be null");
        }
        this.failureListeners.add(listener);
        return this;
    }

    @Override
    public void retry(QueryEvent queryEvent) {
        this.retry(queryEvent, false);
    }

    @Override
    public void retryWithFailureListeners(QueryEvent queryEvent) {
        this.retry(queryEvent, true);
    }

    private void retry(QueryEvent queryEvent, boolean callFailListeners) {
        if (this.isStopped()) {
            logger.warn("Job is now stopped, aborting the retry");
            return;
        }
        Forest retryForest = null;
        for (Forest forest : this.getForestConfig().listForests()) {
            if (!forest.equals(queryEvent.getForest())) continue;
            retryForest = forest;
            break;
        }
        if (retryForest == null) {
            throw new IllegalStateException("Forest for queryEvent (" + queryEvent.getForest().getForestName() + ") is not in current getForestConfig()");
        }
        this.forestIsDone.get(retryForest).set(false);
        this.retryForestMap.get(retryForest).incrementAndGet();
        long start = queryEvent.getForestResultsSoFar() + 1L;
        logger.trace("retryForest {} on retryHost {} at start {}", new Object[]{retryForest.getForestName(), retryForest.getPreferredHost(), start});
        QueryTask runnable = new QueryTask(this.getMoveMgr(), this, retryForest, this.queryMethod, this.query, this.filtered, queryEvent.getForestBatchNumber(), start, null, queryEvent.getLastUriForForest(), queryEvent.getJobBatchNumber(), callFailListeners);
        runnable.run();
    }

    @Override
    public void retryListener(QueryBatch batch, QueryBatchListener queryBatchListener) {
        Forest[] forests;
        DatabaseClient client = null;
        for (Forest forest : forests = batch.getBatcher().getForestConfig().listForests()) {
            if (!forest.equals(batch.getForest())) continue;
            client = this.getMoveMgr().getForestClient(forest);
        }
        QueryBatchImpl retryBatch = new QueryBatchImpl().withClient(client).withBatcher(batch.getBatcher()).withTimestamp(batch.getTimestamp()).withServerTimestamp(batch.getServerTimestamp()).withItems((String[])batch.getItems()).withJobTicket(batch.getJobTicket()).withJobBatchNumber(batch.getJobBatchNumber()).withJobResultsSoFar(batch.getJobResultsSoFar()).withForestBatchNumber(batch.getForestBatchNumber()).withForestResultsSoFar(batch.getForestResultsSoFar()).withForest(batch.getForest()).withJobTicket(batch.getJobTicket());
        queryBatchListener.processEvent(retryBatch);
    }

    @Override
    public QueryBatchListener[] getQuerySuccessListeners() {
        return this.getUrisReadyListeners();
    }

    @Override
    public QueryBatchListener[] getUrisReadyListeners() {
        return this.urisReadyListeners.toArray(new QueryBatchListener[this.urisReadyListeners.size()]);
    }

    @Override
    public QueryFailureListener[] getQueryFailureListeners() {
        return this.failureListeners.toArray(new QueryFailureListener[this.failureListeners.size()]);
    }

    @Override
    public void setUrisReadyListeners(QueryBatchListener ... listeners) {
        this.requireNotStarted();
        this.urisReadyListeners.clear();
        if (listeners != null) {
            for (QueryBatchListener listener : listeners) {
                this.urisReadyListeners.add(listener);
            }
        }
    }

    @Override
    public void setQueryFailureListeners(QueryFailureListener ... listeners) {
        this.requireNotStarted();
        this.failureListeners.clear();
        if (listeners != null) {
            for (QueryFailureListener listener : listeners) {
                this.failureListeners.add(listener);
            }
        }
    }

    @Override
    public QueryBatcher withJobName(String jobName) {
        this.requireNotStarted();
        super.withJobName(jobName);
        return this;
    }

    @Override
    public QueryBatcher withJobId(String jobId) {
        this.requireNotStarted();
        this.setJobId(jobId);
        return this;
    }

    @Override
    public QueryBatcher withBatchSize(int docBatchSize) {
        if (docBatchSize > this.maxUriBatchSize) {
            logger.debug("docBatchSize is beyond maxDocBatchSize, which is {}.", (Object)this.maxUriBatchSize);
        }
        if (docBatchSize < 1) {
            throw new IllegalArgumentException("docBatchSize cannot be less than 1");
        }
        this.requireNotStarted();
        super.withBatchSize(docBatchSize);
        this.docToUriBatchRatio = Math.min(this.maxDocToUriBatchRatio, this.maxUriBatchSize / docBatchSize);
        if (this.docToUriBatchRatio == 0) {
            this.docToUriBatchRatio = 1;
        }
        return this;
    }

    @Override
    public QueryBatcher withBatchSize(int docBatchSize, int docToUriBatchRatio) {
        if (docToUriBatchRatio > this.maxDocToUriBatchRatio) {
            throw new IllegalArgumentException("docToUriBatchRatio is beyond maxDocToUriBatchRatio");
        }
        if (docBatchSize * docToUriBatchRatio > this.maxUriBatchSize) {
            throw new IllegalArgumentException("docToUriBatchRatio is beyond maxUriBatchSize/docBatchSize");
        }
        if (docToUriBatchRatio < 1) {
            throw new IllegalArgumentException("docToUriBatchRatio is less than 1");
        }
        this.withBatchSize(docBatchSize);
        this.docToUriBatchRatio = docToUriBatchRatio;
        return this;
    }

    @Override
    public int getDocToUriBatchRatio() {
        return this.docToUriBatchRatio;
    }

    @Override
    public int getDefaultDocBatchSize() {
        return this.defaultDocBatchSize;
    }

    @Override
    public int getMaxUriBatchSize() {
        return this.maxUriBatchSize;
    }

    @Override
    public int getMaxDocToUriBatchRatio() {
        return this.maxDocToUriBatchRatio;
    }

    @Override
    public QueryBatcher withThreadCount(int threadCount) {
        if (threadCount <= 0) {
            throw new IllegalArgumentException("threadCount must be 1 or greater");
        }
        if (this.threadPool != null) {
            int currentThreadCount = this.getThreadCount();
            logger.info("Adjusting thread pool size from {} to {}", (Object)currentThreadCount, (Object)threadCount);
            if (threadCount >= currentThreadCount) {
                this.threadPool.setMaximumPoolSize(threadCount);
                this.threadPool.setCorePoolSize(threadCount);
            } else {
                this.threadPool.setCorePoolSize(threadCount);
                this.threadPool.setMaximumPoolSize(threadCount);
            }
        } else {
            this.threadCountSet = true;
        }
        super.withThreadCount(threadCount);
        return this;
    }

    @Override
    public QueryBatcher withConsistentSnapshot() {
        this.requireNotStarted();
        this.consistentSnapshot = true;
        return this;
    }

    @Override
    public Long getServerTimestamp() {
        long val = this.serverTimestamp.get();
        return val > -1L ? Long.valueOf(val) : null;
    }

    @Override
    public boolean awaitCompletion(long timeout, TimeUnit unit) throws InterruptedException {
        this.requireJobStarted();
        return this.threadPool.awaitTermination(timeout, unit);
    }

    @Override
    public boolean awaitCompletion() {
        try {
            return this.awaitCompletion(Long.MAX_VALUE, TimeUnit.DAYS);
        }
        catch (InterruptedException e) {
            return false;
        }
    }

    @Override
    public boolean isStopped() {
        return this.threadPool != null && this.threadPool.isTerminated();
    }

    @Override
    public JobTicket getJobTicket() {
        this.requireJobStarted();
        return super.getJobTicket();
    }

    private void requireJobStarted() {
        if (this.threadPool == null) {
            throw new IllegalStateException("Job not started. First call DataMovementManager.startJob(QueryBatcher)");
        }
    }

    private void requireNotStarted() {
        if (this.threadPool != null) {
            throw new IllegalStateException("Configuration cannot be changed after startJob has been called");
        }
    }

    @Override
    public synchronized void start(JobTicket ticket) {
        if (this.threadPool != null) {
            logger.warn("startJob called more than once");
            return;
        }
        if (this.getBatchSize() <= 0) {
            this.withBatchSize(1);
            logger.warn("docBatchSize should be 1 or greater--setting docBatchSize to 1");
        }
        super.setJobTicket(ticket);
        this.initialize();
        for (QueryBatchListener urisReadyListener : this.urisReadyListeners) {
            urisReadyListener.initializeListener(this);
        }
        super.setJobStartTime();
        this.setStartedToTrue();
        if (this.maxBatches < Long.MAX_VALUE) {
            this.setMaxUris(this.getMaxBatches());
        }
        if (this.query != null) {
            this.startQuerying();
        } else if (this.iterator != null) {
            this.startIterating();
        } else {
            throw new IllegalStateException("Cannot start QueryBatcher without query or iterator");
        }
    }

    private synchronized void initialize() {
        Forest[] forests = this.getForestConfig().listForests();
        if (!this.threadCountSet) {
            if (this.query != null) {
                logger.warn("threadCount not set--defaulting to number of forests ({})", (Object)forests.length);
                this.withThreadCount(forests.length * this.docToUriBatchRatio);
            } else {
                int hostCount = this.clientList.get().size();
                logger.warn("threadCount not set--defaulting to number of hosts ({})", (Object)hostCount);
                this.withThreadCount(hostCount);
            }
            this.threadCountSet = true;
        }
        if (this.getThreadCount() == 1) {
            this.isSingleThreaded = true;
        }
        logger.info("Starting job forest length={}, docBatchSize={}, docToUriBatchRatio={}, threadCount={}, onUrisReady listeners={}, failure listeners={}", new Object[]{forests.length, this.getBatchSize(), this.getDocToUriBatchRatio(), this.getThreadCount(), this.urisReadyListeners.size(), this.failureListeners.size()});
        this.threadPool = new QueryThreadPoolExecutor(this.getThreadCount(), forests.length, this.getDocToUriBatchRatio(), this);
    }

    @Override
    public synchronized QueryBatcher withForestConfig(ForestConfiguration forestConfig) {
        boolean started;
        super.withForestConfig(forestConfig);
        Forest[] forests = this.forests(forestConfig);
        HashSet<Forest> oldForests = new HashSet<Forest>(this.forestResults.keySet());
        HashMap<String, Forest> hosts = new HashMap<String, Forest>();
        HashMap<Forest, AtomicLong> newForestResults = new HashMap<Forest, AtomicLong>();
        HashMap<Forest, AtomicBoolean> newForestIsDone = new HashMap<Forest, AtomicBoolean>();
        HashMap<Forest, AtomicInteger> newRetryForestMap = new HashMap<Forest, AtomicInteger>();
        for (Forest forest : forests) {
            if (forest.getPreferredHost() == null) {
                throw new IllegalStateException("Hostname must not be null for any forest");
            }
            hosts.put(forest.getPreferredHost(), forest);
            if (newForestResults.get(forest) == null) {
                newForestResults.put(forest, new AtomicLong());
            }
            if (newForestIsDone.get(forest) == null) {
                newForestIsDone.put(forest, new AtomicBoolean(false));
            }
            if (newRetryForestMap.get(forest) != null) continue;
            newRetryForestMap.put(forest, new AtomicInteger(0));
        }
        this.forestResults = newForestResults;
        this.forestIsDone = newForestIsDone;
        this.retryForestMap = newRetryForestMap;
        Set<String> hostNames = hosts.keySet();
        logger.info("(withForestConfig) Using forests on {} hosts for \"{}\"", hostNames, (Object)forests[0].getDatabaseName());
        List<DatabaseClient> newClientList = this.clients(hostNames);
        this.clientList.set(newClientList);
        boolean bl = started = this.threadPool != null;
        if (started && oldForests.size() > 0) {
            this.calculateDeltas(oldForests, forests);
        }
        return this;
    }

    private synchronized void calculateDeltas(Set<Forest> oldForests, Forest[] forests) {
        HashSet<Forest> addedForests = new HashSet<Forest>();
        HashSet<Forest> restartedForests = new HashSet<Forest>();
        HashSet<Forest> blackListedForests = new HashSet<Forest>(oldForests);
        for (Forest forest : forests) {
            if (!oldForests.contains(forest)) {
                addedForests.add(forest);
            }
            if (this.blackListedTasks.get(forest) != null) {
                restartedForests.add(forest);
            }
            blackListedForests.remove(forest);
        }
        if (blackListedForests.size() > 0) {
            DataMovementManagerImpl moveMgrImpl = this.getMoveMgr();
            String primaryHost = moveMgrImpl.getPrimaryClient().getHost();
            if (this.getHostNames(blackListedForests).contains(primaryHost)) {
                int randomPos = new Random().nextInt(this.clientList.get().size());
                moveMgrImpl.setPrimaryClient(this.clientList.get().get(randomPos));
            }
        }
        this.cleanupExistingTasks(addedForests, restartedForests, blackListedForests);
    }

    private synchronized void cleanupExistingTasks(Set<Forest> addedForests, Set<Forest> restartedForests, Set<Forest> blackListedForests) {
        if (blackListedForests.size() > 0) {
            logger.warn("removing jobs related to hosts [{}] from the queue", this.getHostNames(blackListedForests));
            ArrayList tasks = new ArrayList();
            this.threadPool.getQueue().drainTo(tasks);
            for (Runnable task : tasks) {
                QueryTask queryTask;
                if (task instanceof QueryTask && blackListedForests.contains((queryTask = (QueryTask)task).forest)) {
                    List<QueryTask> blackListedTaskList = this.blackListedTasks.get(queryTask.forest);
                    if (blackListedTaskList == null) {
                        blackListedTaskList = new ArrayList<QueryTask>();
                        this.blackListedTasks.put(queryTask.forest, blackListedTaskList);
                    }
                    blackListedTaskList.add(queryTask);
                    continue;
                }
                this.threadPool.execute(task);
            }
        }
        if (addedForests.size() > 0) {
            logger.warn("adding jobs for forests [{}] to the queue", this.getForestNames(addedForests));
        }
        for (Forest forest : addedForests) {
            this.threadPool.execute(new QueryTask(this.getMoveMgr(), this, forest, this.queryMethod, this.query, this.filtered, 1L, 1L, null));
        }
        if (restartedForests.size() > 0) {
            logger.warn("re-adding jobs related to forests [{}] to the queue", this.getForestNames(restartedForests));
        }
        for (Forest forest : restartedForests) {
            List<QueryTask> blackListedTaskList = this.blackListedTasks.get(forest);
            if (blackListedTaskList == null) continue;
            for (QueryTask task : blackListedTaskList) {
                this.threadPool.execute(task);
            }
            blackListedTaskList.clear();
        }
    }

    private List<String> getForestNames(Collection<Forest> forests) {
        return forests.stream().map(forest -> forest.getForestName()).collect(Collectors.toList());
    }

    private List<String> getHostNames(Collection<Forest> forests) {
        return forests.stream().map(forest -> forest.getPreferredHost()).distinct().collect(Collectors.toList());
    }

    private synchronized void startQuerying() {
        Forest[] forests = this.getForestConfig().listForests();
        boolean runInApplicationThread = this.consistentSnapshot && forests.length > 1;
        for (Forest forest : forests) {
            QueryTask runnable = new QueryTask(this.getMoveMgr(), this, forest, this.queryMethod, this.query, this.filtered, 1L, 1L, null);
            if (runInApplicationThread) {
                runInApplicationThread = false;
                runnable.run();
                continue;
            }
            this.threadPool.execute(runnable);
        }
    }

    private void shutdownIfAllForestsAreDone() {
        for (AtomicBoolean isDone : this.forestIsDone.values()) {
            if (isDone.get()) continue;
            return;
        }
        if (this.runJobCompletionListeners.compareAndSet(false, true)) {
            this.runJobCompletionListeners();
        }
        this.threadPool.shutdown();
    }

    private void runJobCompletionListeners() {
        for (QueryBatcherListener listener : this.jobCompletionListeners) {
            try {
                listener.processEvent(this);
            }
            catch (Throwable e) {
                logger.error("Exception thrown by an onJobCompletion listener", e);
            }
        }
        super.setJobEndTime();
    }

    private void startIterating() {
        this.threadPool.execute(new IteratorTask(this));
    }

    @Override
    public void stop() {
        this.setStoppedToTrue();
        if (this.threadPool != null) {
            this.threadPool.shutdownNow();
        }
        super.setJobEndTime();
        if (this.query != null) {
            for (AtomicBoolean isDone : this.forestIsDone.values()) {
                if (isDone.get()) continue;
                logger.warn("QueryBatcher instance \"{}\" stopped before all results were retrieved", (Object)this.getJobName());
                break;
            }
        } else if (this.iterator != null && this.iterator.hasNext()) {
            logger.warn("QueryBatcher instance \"{}\" stopped before all results were processed", (Object)this.getJobName());
        }
        this.closeAllListeners();
    }

    private void closeAllListeners() {
        for (QueryBatchListener queryBatchListener : this.getUrisReadyListeners()) {
            if (!(queryBatchListener instanceof AutoCloseable)) continue;
            try {
                ((AutoCloseable)((Object)queryBatchListener)).close();
            }
            catch (Exception e) {
                logger.error("onUrisReady listener cannot be closed", (Throwable)e);
            }
        }
        for (QueryFailureListener queryFailureListener : this.getQueryFailureListeners()) {
            if (!(queryFailureListener instanceof AutoCloseable)) continue;
            try {
                ((AutoCloseable)((Object)queryFailureListener)).close();
            }
            catch (Exception e) {
                logger.error("onQueryFailure listener cannot be closed", (Throwable)e);
            }
        }
    }

    protected void finalize() {
        if (!this.isStoppedTrue()) {
            logger.warn("QueryBatcher instance \"{}\" was never cleanly stopped.  You should call dataMovementManager.stopJob.", (Object)this.getJobName());
        }
    }

    @Override
    public QueryBatcher onJobCompletion(QueryBatcherListener listener) {
        if (listener == null) {
            throw new IllegalArgumentException("listener must not be null");
        }
        this.jobCompletionListeners.add(listener);
        return this;
    }

    @Override
    public QueryBatcherListener[] getQueryJobCompletionListeners() {
        return this.jobCompletionListeners.toArray(new QueryBatcherListener[this.jobCompletionListeners.size()]);
    }

    @Override
    public void setQueryJobCompletionListeners(QueryBatcherListener ... listeners) {
        this.requireNotStarted();
        this.jobCompletionListeners.clear();
        if (listeners != null) {
            for (QueryBatcherListener listener : listeners) {
                this.jobCompletionListeners.add(listener);
            }
        }
    }

    @Override
    public Calendar getJobStartTime() {
        if (!this.isStarted()) {
            return null;
        }
        return super.getJobStartTime();
    }

    @Override
    public Calendar getJobEndTime() {
        if (!this.isStopped()) {
            return null;
        }
        return super.getJobEndTime();
    }

    @Override
    public void setMaxBatches(long maxBatches) {
        Long max_limit = Long.MAX_VALUE / (long)this.getBatchSize();
        if (maxBatches > max_limit) {
            throw new IllegalArgumentException("Number of batches cannot be more than " + max_limit);
        }
        this.maxBatches = maxBatches;
        if (this.isStarted()) {
            this.setMaxUris(maxBatches);
        }
    }

    private void setMaxUris(long maxBatches) {
        this.maxUris = maxBatches * (long)this.getBatchSize();
    }

    @Override
    public long getMaxBatches() {
        return this.maxBatches;
    }

    @Override
    public void setMaxBatches() {
        this.maxBatches = -1L;
        if (this.isStarted()) {
            this.setMaxUris(this.getMaxBatches());
        }
    }

    private class QueryThreadPoolExecutor
    extends ThreadPoolExecutor {
        private Object objectToNotifyFrom;

        QueryThreadPoolExecutor(int threadCount, int forestsLength, int docToUriBatchRatio, Object objectToNotifyFrom) {
            super(threadCount, threadCount, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(forestsLength * docToUriBatchRatio * 2 + threadCount), new BlockingRunsPolicy());
            this.objectToNotifyFrom = objectToNotifyFrom;
        }

        @Override
        public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
            boolean returnValue = super.awaitTermination(timeout, unit);
            logger.info("Job complete, jobBatchNumber={}, jobResultsSoFar={}", (Object)QueryBatcherImpl.this.batchNumber.get(), (Object)QueryBatcherImpl.this.resultsSoFar.get());
            return returnValue;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            super.afterExecute(r, t);
            Object object = QueryBatcherImpl.this.lock;
            synchronized (object) {
                QueryBatcherImpl.this.lock.notify();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected void terminated() {
            super.terminated();
            Object object = this.objectToNotifyFrom;
            synchronized (object) {
                this.objectToNotifyFrom.notifyAll();
            }
            object = QueryBatcherImpl.this.lock;
            synchronized (object) {
                QueryBatcherImpl.this.lock.notify();
            }
        }
    }

    private class BlockingRunsPolicy
    implements RejectedExecutionHandler {
        private BlockingRunsPolicy() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
            if (executor.isShutdown()) {
                return;
            }
            try {
                Object object = QueryBatcherImpl.this.lock;
                synchronized (object) {
                    while (executor.getQueue().remainingCapacity() == 0) {
                        QueryBatcherImpl.this.lock.wait();
                    }
                    if (!executor.isShutdown()) {
                        executor.execute(runnable);
                    }
                }
            }
            catch (InterruptedException e) {
                logger.warn("Thread interrupted while waiting for the work queue to become empty" + e);
            }
        }
    }

    private class IteratorTask
    implements Runnable {
        private QueryBatcher batcher;

        IteratorTask(QueryBatcher batcher) {
            this.batcher = batcher;
        }

        @Override
        public void run() {
            try {
                boolean lastBatch = false;
                ArrayList uriQueue = new ArrayList(QueryBatcherImpl.this.getBatchSize());
                while (QueryBatcherImpl.this.iterator.hasNext() && !lastBatch) {
                    uriQueue.add(QueryBatcherImpl.this.iterator.next());
                    if (!QueryBatcherImpl.this.iterator.hasNext()) {
                        lastBatch = true;
                    }
                    if (uriQueue.size() != QueryBatcherImpl.this.getBatchSize() && QueryBatcherImpl.this.iterator.hasNext() && !lastBatch) continue;
                    final ArrayList uris = uriQueue;
                    final boolean finalLastBatch = lastBatch;
                    final long results = QueryBatcherImpl.this.resultsSoFar.addAndGet(uris.size());
                    if (QueryBatcherImpl.this.maxUris <= results) {
                        lastBatch = true;
                    }
                    uriQueue = new ArrayList(QueryBatcherImpl.this.getBatchSize());
                    Runnable processBatch = new Runnable(){

                        @Override
                        public void run() {
                            QueryBatchImpl batch = new QueryBatchImpl().withBatcher(IteratorTask.this.batcher).withTimestamp(Calendar.getInstance()).withJobTicket(QueryBatcherImpl.this.getJobTicket());
                            try {
                                long currentBatchNumber = QueryBatcherImpl.this.batchNumber.incrementAndGet();
                                List currentClientList = (List)QueryBatcherImpl.this.clientList.get();
                                int clientIndex = (int)(currentBatchNumber % (long)currentClientList.size());
                                DatabaseClient client = (DatabaseClient)currentClientList.get(clientIndex);
                                batch = batch.withJobBatchNumber(currentBatchNumber).withClient(client).withJobResultsSoFar(results).withItems(uris.toArray(new String[uris.size()]));
                                logger.trace("batch size={}, jobBatchNumber={}, jobResultsSoFar={}", new Object[]{uris.size(), batch.getJobBatchNumber(), batch.getJobResultsSoFar()});
                                for (QueryBatchListener listener : QueryBatcherImpl.this.urisReadyListeners) {
                                    try {
                                        listener.processEvent(batch);
                                    }
                                    catch (Throwable e) {
                                        logger.error("Exception thrown by an onUrisReady listener", e);
                                    }
                                }
                            }
                            catch (Throwable t) {
                                batch = batch.withItems(uris.toArray(new String[uris.size()]));
                                for (QueryFailureListener listener : QueryBatcherImpl.this.failureListeners) {
                                    try {
                                        listener.processFailure(new QueryBatchException(batch, t));
                                    }
                                    catch (Throwable e) {
                                        logger.error("Exception thrown by an onQueryFailure listener", e);
                                    }
                                }
                                logger.warn("Error iterating to queue uris: {}", (Object)t.toString());
                            }
                            if (finalLastBatch) {
                                QueryBatcherImpl.this.runJobCompletionListeners();
                            }
                        }
                    };
                    QueryBatcherImpl.this.threadPool.execute(processBatch);
                    if (!QueryBatcherImpl.this.isSingleThreaded || QueryBatcherImpl.this.threadPool.getQueue().remainingCapacity() > 2 || !QueryBatcherImpl.this.iterator.hasNext()) continue;
                    QueryBatcherImpl.this.threadPool.execute(new IteratorTask(this.batcher));
                    return;
                }
            }
            catch (Throwable t) {
                for (QueryFailureListener listener : QueryBatcherImpl.this.failureListeners) {
                    QueryBatchImpl batch = new QueryBatchImpl().withItems(new String[0]).withClient((DatabaseClient)((List)QueryBatcherImpl.this.clientList.get()).get(0)).withBatcher(this.batcher).withTimestamp(Calendar.getInstance()).withJobResultsSoFar(0L);
                    try {
                        listener.processFailure(new QueryBatchException(batch, t));
                    }
                    catch (Throwable e) {
                        logger.error("Exception thrown by an onQueryFailure listener", e);
                    }
                }
                logger.warn("Error iterating to queue uris: {}", (Object)t.toString());
            }
            QueryBatcherImpl.this.threadPool.shutdown();
        }
    }

    private class QueryTask
    implements Runnable {
        private DataMovementManager moveMgr;
        private QueryBatcherImpl batcher;
        private Forest forest;
        private String queryMethod;
        private SearchQueryDefinition query;
        private Boolean filtered;
        private long forestBatchNum;
        private long start;
        private long retryBatchNumber;
        private boolean callFailListeners;
        private String afterUri;
        private String nextAfterUri;
        private QueryBatchImpl batch;
        private int totalProcessedCount = 0;

        QueryTask(DataMovementManager moveMgr, QueryBatcherImpl batcher, Forest forest, String queryMethod, SearchQueryDefinition query, Boolean filtered, long forestBatchNum, long start, QueryBatchImpl batch) {
            this(moveMgr, batcher, forest, queryMethod, query, filtered, forestBatchNum, start, batch, null, -1L, true);
        }

        QueryTask(DataMovementManager moveMgr, QueryBatcherImpl batcher, Forest forest, String queryMethod, SearchQueryDefinition query, Boolean filtered, long forestBatchNum, long start, QueryBatchImpl batch, String afterUri) {
            this(moveMgr, batcher, forest, queryMethod, query, filtered, forestBatchNum, start, batch, afterUri, -1L, true);
        }

        QueryTask(DataMovementManager moveMgr, QueryBatcherImpl batcher, Forest forest, String queryMethod, SearchQueryDefinition query, Boolean filtered, long forestBatchNum, long start, QueryBatchImpl batch, String afterUri, long retryBatchNumber, boolean callFailListeners) {
            this.moveMgr = moveMgr;
            this.batcher = batcher;
            this.forest = forest;
            this.queryMethod = queryMethod;
            this.query = query;
            this.filtered = filtered;
            this.forestBatchNum = forestBatchNum;
            this.start = start;
            this.retryBatchNumber = retryBatchNumber;
            this.callFailListeners = callFailListeners;
            this.batch = batch;
            if (Long.compareUnsigned(QueryBatcherImpl.this.getMoveMgr().getServerVersion(), Long.parseUnsignedLong("9000900")) >= 0) {
                this.afterUri = afterUri;
            }
        }

        @Override
        public void run() {
            if (this.batcher.isStoppedTrue()) {
                logger.warn("Cancelling task to query forest '{}' forestBatchNum {} with start {} after the job is stopped", new Object[]{this.forest.getForestName(), this.forestBatchNum, this.start});
                return;
            }
            DatabaseClient client = QueryBatcherImpl.this.getMoveMgr().getForestClient(this.forest);
            Calendar queryStart = Calendar.getInstance();
            ArrayList uris = new ArrayList();
            AtomicBoolean isDone = (AtomicBoolean)QueryBatcherImpl.this.forestIsDone.get(this.forest);
            boolean hasLastBatch = false;
            int lastBatchNum = 0;
            if (this.batch == null) {
                this.batch = new QueryBatchImpl().withBatcher(this.batcher).withClient(client).withTimestamp(queryStart).withJobTicket(QueryBatcherImpl.this.getJobTicket()).withForestBatchNumber(this.forestBatchNum).withForest(this.forest);
                QueryManagerImpl queryMgr = (QueryManagerImpl)client.newQueryManager();
                queryMgr.setPageLength(QueryBatcherImpl.this.getBatchSize() * QueryBatcherImpl.this.getDocToUriBatchRatio());
                UrisHandle handle = new UrisHandle();
                if (QueryBatcherImpl.this.consistentSnapshot && QueryBatcherImpl.this.serverTimestamp.get() > -1L) {
                    handle.setPointInTimeQueryTimestamp(QueryBatcherImpl.this.serverTimestamp.get());
                }
                try (UrisHandle results = queryMgr.uris(this.queryMethod, this.query, this.filtered, handle, this.start, this.afterUri, this.forest.getForestName());){
                    if (QueryBatcherImpl.this.consistentSnapshot && QueryBatcherImpl.this.serverTimestamp.get() == -1L && QueryBatcherImpl.this.serverTimestamp.compareAndSet(-1L, results.getServerTimestamp())) {
                        logger.info("Consistent snapshot timestamp=[{}]", (Object)QueryBatcherImpl.this.serverTimestamp);
                    }
                    for (int i = 0; i < QueryBatcherImpl.this.getDocToUriBatchRatio(); ++i) {
                        uris.add(new ArrayList());
                    }
                    this.totalProcessedCount = 0;
                    for (String uri : results) {
                        int batchNum = this.totalProcessedCount / QueryBatcherImpl.this.getBatchSize();
                        ((List)uris.get(batchNum)).add(uri);
                        ++this.totalProcessedCount;
                    }
                    if (this.totalProcessedCount == 0) {
                        isDone.set(true);
                    } else if (this.totalProcessedCount != QueryBatcherImpl.this.docToUriBatchRatio * QueryBatcherImpl.this.getBatchSize()) {
                        hasLastBatch = true;
                        lastBatchNum = this.totalProcessedCount % QueryBatcherImpl.this.getBatchSize() == 0 ? this.totalProcessedCount / QueryBatcherImpl.this.getBatchSize() - 1 : this.totalProcessedCount / QueryBatcherImpl.this.getBatchSize();
                    }
                }
                catch (ResourceNotFoundException e) {
                    isDone.set(true);
                    QueryBatcherImpl.this.shutdownIfAllForestsAreDone();
                    return;
                }
                catch (Throwable t) {
                    logger.error("Query for URIs failed, stopping job; cause: " + t.getMessage(), t);
                    isDone.set(true);
                    QueryBatcherImpl.this.shutdownIfAllForestsAreDone();
                    return;
                }
                this.batch = this.batch.withItems(((List)uris.get(0)).toArray(new String[((List)uris.get(0)).size()])).withServerTimestamp(QueryBatcherImpl.this.serverTimestamp.get()).withJobResultsSoFar(QueryBatcherImpl.this.resultsSoFar.addAndGet(((List)uris.get(0)).size())).withForestResultsSoFar(((AtomicLong)QueryBatcherImpl.this.forestResults.get(this.forest)).addAndGet(((List)uris.get(0)).size()));
                if (hasLastBatch && lastBatchNum == 0) {
                    this.batch.withIsLastBatch(true);
                }
                for (int i = 1; i < QueryBatcherImpl.this.getDocToUriBatchRatio(); ++i) {
                    if (((List)uris.get(i)).size() == 0) continue;
                    QueryBatchImpl docBatch = new QueryBatchImpl().withBatcher(this.batcher).withClient(client).withTimestamp(queryStart).withJobTicket(QueryBatcherImpl.this.getJobTicket()).withForestBatchNumber(this.forestBatchNum).withForest(this.forest).withItems(((List)uris.get(i)).toArray(new String[((List)uris.get(i)).size()])).withServerTimestamp(QueryBatcherImpl.this.serverTimestamp.get()).withJobResultsSoFar(QueryBatcherImpl.this.resultsSoFar.addAndGet(((List)uris.get(i)).size())).withForestResultsSoFar(((AtomicLong)QueryBatcherImpl.this.forestResults.get(this.forest)).addAndGet(((List)uris.get(i)).size()));
                    if (hasLastBatch && lastBatchNum == i) {
                        docBatch.withIsLastBatch(true);
                    }
                    QueryBatcherImpl.this.threadPool.execute(new QueryTask(this.moveMgr, this.batcher, this.forest, QueryBatcherImpl.this.queryMethod, this.query, this.filtered, this.forestBatchNum + (long)i, this.start, docBatch, this.nextAfterUri));
                }
            }
            if (((String[])this.batch.getItems()).length != 0) {
                this.processDocs(this.batch);
            }
            if (QueryBatcherImpl.this.maxUris <= QueryBatcherImpl.this.resultsSoFar.longValue()) {
                isDone.set(true);
            }
            if (this.totalProcessedCount == QueryBatcherImpl.this.getBatchSize() * QueryBatcherImpl.this.getDocToUriBatchRatio()) {
                this.nextAfterUri = (String)((List)uris.get(QueryBatcherImpl.this.getDocToUriBatchRatio() - 1)).get(QueryBatcherImpl.this.getBatchSize() - 1);
                this.launchNextTask();
            }
            if (isDone.get()) {
                QueryBatcherImpl.this.shutdownIfAllForestsAreDone();
            }
        }

        private void processDocs(QueryBatchImpl batch) {
            AtomicBoolean isDone = (AtomicBoolean)QueryBatcherImpl.this.forestIsDone.get(this.forest);
            try {
                batch = this.retryBatchNumber != -1L ? batch.withJobBatchNumber(this.retryBatchNumber) : batch.withJobBatchNumber(QueryBatcherImpl.this.batchNumber.incrementAndGet());
                logger.trace("Uri batch size={}, jobBatchNumber={}, jobResultsSoFar={}, forest={}", new Object[]{((String[])batch.getItems()).length, batch.getJobBatchNumber(), batch.getJobResultsSoFar(), this.forest.getForestName()});
                for (QueryBatchListener listener : QueryBatcherImpl.this.urisReadyListeners) {
                    try {
                        listener.processEvent(batch);
                    }
                    catch (Throwable t) {
                        logger.error("Exception thrown by an onUrisReady listener", t);
                    }
                }
                if (((String[])batch.getItems()).length != QueryBatcherImpl.this.getBatchSize()) {
                    isDone.set(true);
                }
                if (batch.getIsLastBatch()) {
                    isDone.set(true);
                }
            }
            catch (Throwable t) {
                if (this.callFailListeners) {
                    batch = batch.withJobResultsSoFar(QueryBatcherImpl.this.resultsSoFar.get()).withForestResultsSoFar(((AtomicLong)QueryBatcherImpl.this.forestResults.get(this.forest)).get());
                    for (QueryFailureListener listener : QueryBatcherImpl.this.failureListeners) {
                        try {
                            listener.processFailure(new QueryBatchException(batch, t));
                        }
                        catch (Throwable e2) {
                            logger.error("Exception thrown by an onQueryFailure listener", e2);
                        }
                    }
                    if (((AtomicInteger)QueryBatcherImpl.this.retryForestMap.get(this.forest)).get() == 0) {
                        isDone.set(true);
                    } else {
                        ((AtomicInteger)QueryBatcherImpl.this.retryForestMap.get(this.forest)).decrementAndGet();
                    }
                }
                if (t instanceof RuntimeException) {
                    throw (RuntimeException)t;
                }
                throw new DataMovementException("Failed to retry batch", t);
            }
        }

        private void launchNextTask() {
            if (this.batcher.isStoppedTrue()) {
                return;
            }
            AtomicBoolean isDone = (AtomicBoolean)QueryBatcherImpl.this.forestIsDone.get(this.forest);
            if (isDone.get()) {
                QueryBatcherImpl.this.shutdownIfAllForestsAreDone();
                return;
            }
            long nextStart = this.start + (long)(QueryBatcherImpl.this.getBatchSize() * QueryBatcherImpl.this.getDocToUriBatchRatio());
            QueryBatcherImpl.this.threadPool.execute(new QueryTask(this.moveMgr, this.batcher, this.forest, QueryBatcherImpl.this.queryMethod, this.query, this.filtered, this.forestBatchNum + (long)QueryBatcherImpl.this.getBatchSize(), nextStart, null, this.nextAfterUri));
        }
    }
}

