/*
 * Decompiled with CFR 0.152.
 */
package com.marklogic.ps.xqsync;

import com.marklogic.ps.FileFinder;
import com.marklogic.ps.Session;
import com.marklogic.ps.SimpleLogger;
import com.marklogic.ps.timing.TimedEvent;
import com.marklogic.ps.xqsync.Configuration;
import com.marklogic.ps.xqsync.FatalException;
import com.marklogic.ps.xqsync.InputPackage;
import com.marklogic.ps.xqsync.Monitor;
import com.marklogic.ps.xqsync.PackageTaskFactory;
import com.marklogic.ps.xqsync.SyncException;
import com.marklogic.ps.xqsync.TaskFactory;
import com.marklogic.ps.xqsync.UriQueue;
import com.marklogic.xcc.AdhocQuery;
import com.marklogic.xcc.ContentbaseMetaData;
import com.marklogic.xcc.Request;
import com.marklogic.xcc.RequestOptions;
import com.marklogic.xcc.ResultSequence;
import com.marklogic.xcc.exceptions.StreamingResultException;
import com.marklogic.xcc.exceptions.XQueryException;
import com.marklogic.xcc.exceptions.XccException;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class XQSyncManager {
    protected static SimpleLogger logger;
    private static final String ERROR_CODE_MISSING_URI_LEXICON = "XDMP-URILXCNNOTFOUND";
    public static final String NAME;
    private static final String START_VARIABLE_NAME = "start";
    private static final String START_POSITION_PREDICATE = "[position() ge $start]\n";
    private static final String START_POSITION_DEFINE_VARIABLE = "declare variable $start as xs:integer external;\n";
    private Session inputSession;
    private final Configuration configuration;
    private long itemsQueued;
    private UriQueue uriQueue;
    private UriQueue lastUriQueue;
    private Monitor monitor;

    public XQSyncManager(Configuration config) {
        this.configuration = config;
        logger = this.configuration.getLogger();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run() {
        TaskFactory factory = null;
        try {
            int threads = this.configuration.getThreadCount();
            logger.finer("threads = " + threads);
            ArrayBlockingQueue<Runnable> workQueue = null;
            this.inputSession = this.configuration.newInputSession();
            int queueSize = this.configuration.getQueueSize();
            logger.info("starting pool of " + threads + " threads, queue size = " + queueSize);
            workQueue = new ArrayBlockingQueue<Runnable>(queueSize);
            CallerBlocksPolicy policy = new CallerBlocksPolicy();
            ThreadPoolExecutor pool = new ThreadPoolExecutor(threads, threads, 16L, TimeUnit.SECONDS, workQueue, policy);
            ExecutorCompletionService<TimedEvent[]> completionService = new ExecutorCompletionService<TimedEvent[]>(pool);
            this.monitor = new Monitor(this.configuration, pool, completionService, this.configuration.isFatalErrors());
            this.monitor.setPriority(6);
            this.monitor.start();
            factory = new TaskFactory(this.configuration, this.monitor);
            this.newUriQueue(completionService, pool, factory, this.monitor);
            try (Session outputSession = this.configuration.newOutputSession();){
                if (null != outputSession) {
                    ContentbaseMetaData meta = outputSession.getContentbaseMetaData();
                    logger.info("output version info: client " + meta.getDriverVersionString() + ", server " + meta.getServerVersionString());
                }
            }
            if (null != this.inputSession) {
                ContentbaseMetaData meta = this.inputSession.getContentbaseMetaData();
                logger.info("input version info: client " + meta.getDriverVersionString() + ", server " + meta.getServerVersionString());
                this.itemsQueued = this.queueFromInputConnection();
            } else {
                this.itemsQueued = null != this.configuration.getInputPackagePath() ? this.queueFromInputPackage(this.configuration.getInputPackagePath()) : this.queueFromInputPath(this.configuration.getInputPath());
            }
            while (this.monitor.getTaskCount() != this.itemsQueued) {
                Thread.sleep(125L);
                Thread.yield();
                if (this.monitor.getTaskCount() <= this.itemsQueued) continue;
                throw new FatalException("task count mismatch: " + this.itemsQueued + " < " + this.monitor.getTaskCount());
            }
            this.monitor.setFinalTaskCount(this.itemsQueued);
            logger.info("final queue count " + this.itemsQueued);
            this.uriQueue.shutdown();
            logger.fine("queue is shutdown with queue size " + this.uriQueue.getQueueSize());
            do {
                Thread.sleep(125L);
                Thread.yield();
            } while (this.uriQueue.getQueueSize() > 0);
            logger.info("pool ready to shutdown, queue size " + this.uriQueue.getQueueSize());
            pool.shutdown();
            logger.info("waiting for monitor to exit");
            do {
                logger.finest("waiting for monitor " + this.monitor + " " + (null != this.monitor) + " " + this.monitor.isAlive());
                try {
                    Thread.yield();
                    this.monitor.join();
                }
                catch (InterruptedException e) {
                    Thread.interrupted();
                    logger.logException("interrupted", e);
                }
                logger.finest("waiting for monitor " + this.monitor + " " + (null != this.monitor) + " " + this.monitor.isAlive());
            } while (null != this.monitor && this.monitor.isAlive());
        }
        catch (Throwable t) {
            logger.logException("fatal error", t);
            if (null != this.uriQueue) {
                this.uriQueue.halt();
            }
            if (null != this.monitor) {
                logger.info("halting monitor");
                this.monitor.halt(t);
            }
        }
        finally {
            if (null != factory) {
                logger.info("closing factory");
                factory.close();
                factory = null;
            }
            if (null != this.configuration) {
                this.configuration.close();
            }
        }
        logger.fine("exiting");
    }

    private void newUriQueue(CompletionService<TimedEvent[]> completionService, ThreadPoolExecutor pool, TaskFactory factory, Monitor monitor) {
        this.uriQueue = new UriQueue(this.configuration, completionService, pool, factory, monitor, new LinkedBlockingQueue<String>());
        this.uriQueue.start();
        while (!this.uriQueue.isActive()) {
            Thread.yield();
        }
    }

    private long queueFromInputPackage(String path) throws IOException, SyncException {
        logger.fine(path);
        File file = new File(path);
        if (!file.exists()) {
            throw new IOException("missing expected input package path: " + path);
        }
        if (!file.canRead()) {
            throw new IOException("cannot read from input package path: " + path);
        }
        if (file.isFile()) {
            return this.queueFromInputPackageFile(file);
        }
        if (!file.isDirectory()) {
            throw new IOException("unexpected file type: " + file.getCanonicalPath());
        }
        long total = 0L;
        String extension = Configuration.getPackageFileExtension();
        FileFilter filter = pathname -> pathname.isDirectory() || pathname.isFile() && pathname.getName().endsWith(extension);
        Object[] children = file.listFiles(filter);
        Arrays.sort(children);
        for (Object child : children) {
            String childPath = ((File)child).getCanonicalPath();
            total += this.queueFromInputPackage(childPath);
        }
        return total;
    }

    private long queueFromInputPackageFile(File path) throws IOException, SyncException {
        logger.fine("listing package " + path);
        while (null != this.lastUriQueue && 0 != this.lastUriQueue.getQueueSize()) {
            try {
                Thread.sleep(125L);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                logger.warning("interrupted, will continue");
            }
        }
        InputPackage inputPackage = new InputPackage(path.getCanonicalPath(), this.configuration);
        inputPackage.addReference();
        logger.fine("listing package " + path + " (" + inputPackage.size() + ")");
        if (null != this.uriQueue) {
            this.uriQueue.shutdown();
        }
        this.lastUriQueue = this.uriQueue;
        this.newUriQueue(this.uriQueue, new PackageTaskFactory(this.configuration, this.monitor, inputPackage));
        logger.fine("uriQueue = " + this.uriQueue + ", last = " + this.lastUriQueue);
        Iterator<String> iter = inputPackage.list().iterator();
        long count = 0L;
        while (iter.hasNext()) {
            String inputPackagePath = iter.next();
            logger.finest("queuing " + count + ": " + inputPackagePath);
            inputPackage.addReference();
            this.uriQueue.add(inputPackagePath);
            ++count;
        }
        this.uriQueue.shutdown();
        inputPackage.closeReference();
        logger.info("queued " + count + " from " + path);
        return count;
    }

    private void newUriQueue(UriQueue old, TaskFactory factory) {
        this.newUriQueue(old.getCompletionService(), old.getPool(), factory, old.getMonitor());
    }

    private long queueFromInputConnection() throws XccException {
        try {
            return this.queueFromInputConnection(true);
        }
        catch (XQueryException e) {
            String code = e.getCode();
            if (ERROR_CODE_MISSING_URI_LEXICON.equals(code)) {
                logger.warning("Enable the document uri lexicon on " + this.inputSession.getContentBaseName() + " to speed up synchronization.");
                return this.queueFromInputConnection(false);
            }
            logger.logException("error queuing from input connection", e);
            throw e;
        }
    }

    private long queueFromInputConnection(boolean useLexicon) throws XccException {
        Long startPosition;
        String[] collectionUris = this.configuration.getInputCollectionUris();
        String[] directoryUris = this.configuration.getInputDirectoryUris();
        String[] documentUris = this.configuration.getInputDocumentUris();
        String[] userQuery = this.configuration.getInputQuery();
        if (null != documentUris) {
            if (null != collectionUris || null != directoryUris || null != userQuery) {
                logger.warning("conflicting properties: only using INPUT_DOCUMENT_URIS");
            }
        } else if (null != collectionUris) {
            if (null != directoryUris || null != userQuery) {
                logger.warning("conflicting properties: only using INPUT_COLLECTION_URI");
            }
        } else if (null != directoryUris && null != userQuery) {
            logger.warning("conflicting properties: only using INPUT_DIRECTORY_URI");
        }
        if (null != (startPosition = this.configuration.getStartPosition())) {
            logger.info("using INPUT_START_POSITION=" + startPosition);
        }
        long count = 0L;
        if (null != documentUris) {
            for (int i = 0; i < documentUris.length; ++i) {
                if (null != startPosition && (long)i < startPosition) continue;
                this.uriQueue.add(documentUris[i]);
                ++count;
            }
            return count;
        }
        RequestOptions opts = this.inputSession.getDefaultRequestOptions();
        logger.fine("buffer size = " + opts.getResultBufferSize() + ", caching = " + opts.getCacheResult());
        opts.setCacheResult(this.configuration.isInputQueryCachable());
        opts.setResultBufferSize(this.configuration.inputQueryBufferSize());
        logger.info("buffer size = " + opts.getResultBufferSize() + ", caching = " + opts.getCacheResult());
        int size = 1;
        if (null != collectionUris && collectionUris.length > size) {
            size = collectionUris.length;
        } else if (null != directoryUris && directoryUris.length > size) {
            size = directoryUris.length;
        } else if (null != userQuery && userQuery.length > size) {
            size = userQuery.length;
        }
        try {
            for (int i = 0; i < size; ++i) {
                Request request = this.getRequest(null == collectionUris ? null : collectionUris[i], null == directoryUris ? null : directoryUris[i], null == userQuery ? null : userQuery[i], startPosition, useLexicon);
                request.setOptions(opts);
                try (ResultSequence rs = this.inputSession.submitRequest(request);){
                    while (rs.hasNext()) {
                        String uri = rs.next().asString();
                        if (0L == count) {
                            logger.info("queuing first task: " + uri);
                        }
                        logger.finest("queuing " + count + ": " + uri);
                        this.uriQueue.add(uri);
                        ++count;
                    }
                    continue;
                }
            }
        }
        catch (StreamingResultException e) {
            logger.info("count = " + count);
            logger.warning("Listing input URIs probably timed out: try setting INPUT_QUERY_CACHABLE or INPUT_QUERY_BUFFER_BYTES");
            throw e;
        }
        return count;
    }

    /*
     * Unable to fully structure code
     */
    private Request getRequest(String collectionUri, String directoryUri, String userQuery, Long startPosition, boolean useLexicon) throws XccException {
        v0 = hasStart = startPosition != null && startPosition > 1L;
        if (collectionUri != null) {
            request = this.getCollectionRequest(collectionUri, hasStart, useLexicon);
            if (this.configuration.isDeleteOutputCollection()) {
                outputSession = this.configuration.newOutputSession();
                var9_9 = null;
                try {
                    if (outputSession == null) ** GOTO lbl38
                    XQSyncManager.logger.info("deleting collection " + collectionUri + " on output connection");
                    outputSession.deleteCollection(collectionUri);
                }
                catch (Throwable var10_11) {
                    var9_9 = var10_11;
                    throw var10_11;
                }
                finally {
                    if (outputSession != null) {
                        if (var9_9 != null) {
                            try {
                                outputSession.close();
                            }
                            catch (Throwable var10_10) {
                                var9_9.addSuppressed(var10_10);
                            }
                        } else {
                            outputSession.close();
                        }
                    }
                }
            }
        } else if (directoryUri != null) {
            request = this.getDirectoryRequest(directoryUri, hasStart, useLexicon);
        } else if (userQuery != null) {
            XQSyncManager.logger.info("listing query: " + userQuery);
            if (hasStart) {
                XQSyncManager.logger.warning("ignoring start value in user-supplied query");
                hasStart = false;
            }
            request = this.inputSession.newAdhocQuery(userQuery);
        } else {
            request = this.getUrisRequest(hasStart, useLexicon);
        }
lbl38:
        // 6 sources

        if (hasStart) {
            request.setNewIntegerVariable("start", startPosition.longValue());
        }
        return request;
    }

    private Request getUrisRequest(boolean hasStart, boolean useLexicon) {
        String query = "xquery version \"1.0-ml\";\n" + (hasStart ? START_POSITION_DEFINE_VARIABLE : "");
        if (useLexicon) {
            logger.info("listing all documents (with uri lexicon)");
            query = query + "cts:uris('', 'document')" + (hasStart ? START_POSITION_PREDICATE : "");
        } else {
            logger.info("listing all documents (no uri lexicon)");
            query = query + "for $i in doc()" + (hasStart ? START_POSITION_PREDICATE : "") + " return string(xdmp:node-uri($i))";
        }
        logger.fine(query);
        return this.inputSession.newAdhocQuery(query);
    }

    private Request getCollectionRequest(String uri, boolean hasStart, boolean useLexicon) {
        logger.info("listing collection " + uri);
        String query = "xquery version \"1.0-ml\";\ndeclare variable $uri as xs:string external;\n" + (hasStart ? START_POSITION_DEFINE_VARIABLE : "");
        query = useLexicon ? query + "cts:uris('', 'document', cts:collection-query($uri))\n" + (hasStart ? START_POSITION_PREDICATE : "") : query + "for $i in collection($uri)\n" + (hasStart ? START_POSITION_PREDICATE : "") + "return string(xdmp:node-uri($i))\n";
        AdhocQuery request = this.inputSession.newAdhocQuery(query);
        request.setNewStringVariable("uri", uri);
        return request;
    }

    private Request getDirectoryRequest(String uri, boolean hasStart, boolean useLexicon) {
        logger.info("listing directory " + uri);
        String query = "xquery version \"1.0-ml\";\ndeclare variable $uri as xs:string external;\n" + (hasStart ? START_POSITION_DEFINE_VARIABLE : "");
        query = useLexicon ? query + "cts:uris('', 'document', cts:directory-query($uri, 'infinity'))\n" + (hasStart ? START_POSITION_PREDICATE : "") : query + "for $i in xdmp:directory($uri, 'infinity')\n" + (hasStart ? START_POSITION_PREDICATE : "") + "return string(xdmp:node-uri($i))\n";
        logger.fine(query);
        AdhocQuery request = this.inputSession.newAdhocQuery(query);
        String tempUri = uri;
        if (!uri.endsWith("/")) {
            tempUri = uri + "/";
        }
        request.setNewStringVariable("uri", tempUri);
        return request;
    }

    private long queueFromInputPath(String inputPath) throws IOException {
        logger.info("listing from " + inputPath + ", excluding " + "^.+\\.metadata$");
        FileFinder ff = new FileFinder(inputPath, null, "^.+\\.metadata$");
        ff.find();
        Iterator<File> iter = ff.list().iterator();
        long count = 0L;
        while (iter.hasNext()) {
            File file = iter.next();
            String canonicalPath = file.getCanonicalPath();
            logger.finer("queuing " + ++count + ": " + canonicalPath);
            this.uriQueue.add(canonicalPath);
        }
        return count;
    }

    public long getItemsQueued() {
        return this.itemsQueued;
    }

    static {
        NAME = XQSyncManager.class.getName();
    }

    public static class CallerBlocksPolicy
    implements RejectedExecutionHandler {
        private BlockingQueue<Runnable> queue;
        private boolean warning = false;

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            if (null == this.queue) {
                this.queue = executor.getQueue();
            }
            try {
                if (!this.warning) {
                    logger.fine("queue is full: size = " + this.queue.size() + " (will only appear once!)");
                    this.warning = true;
                }
                this.queue.put(r);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RejectedExecutionException(e);
            }
        }
    }
}

