/*
 * Decompiled with CFR 0.152.
 */
package apoc.periodic;

import apoc.Pools;
import apoc.periodic.BatchAndTotalCollector;
import apoc.periodic.BatchAndTotalResult;
import apoc.periodic.BatchMode;
import apoc.util.Util;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.ToLongFunction;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.QueryStatistics;
import org.neo4j.graphdb.Result;
import org.neo4j.graphdb.Transaction;
import org.neo4j.logging.Log;
import org.neo4j.procedure.Description;
import org.neo4j.procedure.TerminationGuard;

public class PeriodicUtils {
    private PeriodicUtils() {
    }

    private static long executeAndReportErrors(Transaction tx, BiFunction<Transaction, Map<String, Object>, QueryStatistics> consumer, Map<String, Object> params, List<Map<String, Object>> batch, int returnValue, AtomicLong localCount, BatchAndTotalCollector collector) {
        try {
            QueryStatistics statistics = consumer.apply(tx, params);
            if (localCount != null) {
                localCount.getAndIncrement();
            }
            collector.updateStatistics(statistics);
            return returnValue;
        }
        catch (Exception e) {
            collector.incrementFailedOps(batch.size());
            collector.amendFailedParamsMap(batch);
            PeriodicUtils.recordError(collector.getOperationErrors(), e);
            throw e;
        }
    }

    public static void recordError(Map<String, Long> executionErrors, Exception e) {
        String msg = ExceptionUtils.getRootCause((Throwable)e).getMessage();
        executionErrors.compute(msg, (s, i) -> i == null ? 1L : i + 1L);
    }

    public static Pair<String, Boolean> prepareInnerStatement(String cypherAction, BatchMode batchMode, List<String> columns, String iteratorVariableName) {
        String names = columns.stream().map(Util::quote).collect(Collectors.joining("|"));
        boolean withCheck = PeriodicUtils.regNoCaseMultiLine("[{$](" + names + ")\\}?\\s+AS\\s+").matcher(cypherAction).find();
        if (withCheck) {
            return Pair.of((Object)cypherAction, (Object)false);
        }
        switch (batchMode) {
            case SINGLE: {
                return Pair.of((Object)(Util.withMapping(columns.stream(), c -> Util.param(c) + " AS " + Util.quote(c)) + cypherAction), (Object)false);
            }
            case BATCH: {
                if (PeriodicUtils.regNoCaseMultiLine("UNWIND\\s+[{$]" + iteratorVariableName + "\\}?\\s+AS\\s+").matcher(cypherAction).find()) {
                    return Pair.of((Object)cypherAction, (Object)true);
                }
                String with = Util.withMapping(columns.stream(), c -> Util.quote(iteratorVariableName) + "." + Util.quote(c) + " AS " + Util.quote(c));
                return Pair.of((Object)("UNWIND " + Util.param(iteratorVariableName) + " AS " + Util.quote(iteratorVariableName) + with + " " + cypherAction), (Object)true);
            }
            case BATCH_SINGLE: {
                return Pair.of((Object)cypherAction, (Object)true);
            }
        }
        throw new IllegalArgumentException("Unrecognised batch mode: [" + batchMode + "]");
    }

    public static Pattern regNoCaseMultiLine(String pattern) {
        return Pattern.compile(pattern, 42);
    }

    public static Stream<BatchAndTotalResult> iterateAndExecuteBatchedInSeparateThread(GraphDatabaseService db, TerminationGuard terminationGuard, Log log, Pools pools, int batchsize, boolean parallel, boolean iterateList, long retries, Iterator<Map<String, Object>> iterator, BiFunction<Transaction, Map<String, Object>, QueryStatistics> consumer, int concurrency, int failedParams, String periodicId) {
        boolean wasTerminated;
        ExecutorService pool = parallel ? pools.getDefaultExecutorService() : pools.getSingleExecutorService();
        ArrayList<Future<Long>> futures = new ArrayList<Future<Long>>(concurrency);
        BatchAndTotalCollector collector = new BatchAndTotalCollector(terminationGuard, failedParams);
        AtomicInteger activeFutures = new AtomicInteger(0);
        while (!Util.transactionIsTerminated(terminationGuard)) {
            if (activeFutures.get() < concurrency || !parallel) {
                activeFutures.incrementAndGet();
                if (log.isDebugEnabled()) {
                    log.debug("Execute, in periodic iteration with id %s, no %d batch size ", new Object[]{periodicId, batchsize});
                }
                List<Map<String, Object>> batch = Util.take(iterator, batchsize);
                long currentBatchSize = batch.size();
                ExecuteBatch executeBatch = iterateList ? new ListExecuteBatch(terminationGuard, collector, batch, consumer) : new OneByOneExecuteBatch(terminationGuard, collector, batch, consumer);
                futures.add(Util.inTxFuture(log, pool, db, executeBatch, retries, retryCount -> collector.incrementRetried(), onComplete -> {
                    collector.incrementBatches();
                    executeBatch.release();
                    activeFutures.decrementAndGet();
                }));
                collector.incrementCount(currentBatchSize);
                if (log.isDebugEnabled()) {
                    log.debug("Processed in periodic iteration with id %s, %d iterations of %d total", new Object[]{periodicId, batchsize, collector.getCount()});
                }
            } else {
                LockSupport.parkNanos(1000L);
            }
            if (iterator.hasNext()) continue;
        }
        ToLongFunction<Future> toLongFunction = (wasTerminated = Util.transactionIsTerminated(terminationGuard)) ? f -> Util.getFutureOrCancel(f, collector.getBatchErrors(), collector.getFailedBatches(), 0L) : f -> Util.getFuture(f, collector.getBatchErrors(), collector.getFailedBatches(), 0L);
        collector.incrementSuccesses(futures.stream().mapToLong(toLongFunction).sum());
        Util.logErrors("Error during iterate.commit:", collector.getBatchErrors(), log);
        Util.logErrors("Error during iterate.execute:", collector.getOperationErrors(), log);
        if (log.isDebugEnabled()) {
            log.debug("Terminated periodic iteration with id %s with %d executions", new Object[]{periodicId, collector.getCount()});
        }
        return Stream.of(collector.getResult());
    }

    public static Stream<JobInfo> submitProc(String name, String statement, Map<String, Object> config, GraphDatabaseService db, Log log, Pools pools) {
        Map params = config.getOrDefault("params", Collections.emptyMap());
        JobInfo info = PeriodicUtils.submitJob(name, () -> {
            try {
                db.executeTransactionally(statement, params, Result::resultAsString);
            }
            catch (Exception e) {
                log.warn("in background task via submit", (Throwable)e);
                throw new RuntimeException(e);
            }
        }, log, pools);
        return Stream.of(info);
    }

    public static <T> JobInfo submitJob(String name, Runnable task, Log log, Pools pools) {
        JobInfo info = new JobInfo(name);
        Future future = pools.getJobList().remove(info);
        if (future != null && !future.isDone()) {
            future.cancel(false);
        }
        Runnable wrappingTask = PeriodicUtils.wrapTask(name, task, log);
        Future<?> newFuture = pools.getScheduledExecutorService().submit(wrappingTask);
        pools.getJobList().put(info, newFuture);
        return info;
    }

    public static Runnable wrapTask(String name, Runnable task, Log log) {
        return () -> {
            log.debug("Executing task " + name);
            try {
                task.run();
            }
            catch (Exception e) {
                log.error("Error while executing task " + name + " because of the following exception (the task will be killed):", (Throwable)e);
                throw e;
            }
            log.debug("Executed task " + name);
        };
    }

    static class ListExecuteBatch
    extends ExecuteBatch {
        ListExecuteBatch(TerminationGuard terminationGuard, BatchAndTotalCollector collector, List<Map<String, Object>> batch, BiFunction<Transaction, Map<String, Object>, QueryStatistics> consumer) {
            super(terminationGuard, collector, batch, consumer);
        }

        @Override
        public final Long apply(Transaction txInThread) {
            if (Util.transactionIsTerminated(this.terminationGuard)) {
                return 0L;
            }
            List<Map<String, Object>> batch = this.rebindBatch(txInThread);
            Map<String, Object> params = Util.map("_count", this.collector.getCount(), "_batch", batch);
            return PeriodicUtils.executeAndReportErrors(txInThread, this.consumer, params, batch, batch.size(), null, this.collector);
        }
    }

    static class OneByOneExecuteBatch
    extends ExecuteBatch {
        OneByOneExecuteBatch(TerminationGuard terminationGuard, BatchAndTotalCollector collector, List<Map<String, Object>> batch, BiFunction<Transaction, Map<String, Object>, QueryStatistics> consumer) {
            super(terminationGuard, collector, batch, consumer);
        }

        @Override
        public final Long apply(Transaction txInThread) {
            if (Util.transactionIsTerminated(this.terminationGuard)) {
                return 0L;
            }
            AtomicLong localCount = new AtomicLong(this.collector.getCount());
            List<Map<String, Object>> batch = this.rebindBatch(txInThread);
            return batch.stream().mapToLong(p -> {
                if (localCount.get() % 1000L == 0L && Util.transactionIsTerminated(this.terminationGuard)) {
                    return 0L;
                }
                Map<String, Object> params = Util.merge(p, Util.map("_count", localCount.get(), "_batch", batch));
                return PeriodicUtils.executeAndReportErrors(txInThread, this.consumer, params, batch, 1, localCount, this.collector);
            }).sum();
        }
    }

    static abstract class ExecuteBatch
    implements Function<Transaction, Long> {
        protected TerminationGuard terminationGuard;
        protected BatchAndTotalCollector collector;
        private List<Map<String, Object>> batch;
        protected BiFunction<Transaction, Map<String, Object>, QueryStatistics> consumer;

        ExecuteBatch(TerminationGuard terminationGuard, BatchAndTotalCollector collector, List<Map<String, Object>> batch, BiFunction<Transaction, Map<String, Object>, QueryStatistics> consumer) {
            this.terminationGuard = terminationGuard;
            this.collector = collector;
            this.batch = batch;
            this.consumer = consumer;
        }

        protected List<Map<String, Object>> rebindBatch(Transaction tx) {
            return Util.rebindRows(tx, this.batch);
        }

        public void release() {
            this.terminationGuard = null;
            this.collector = null;
            this.batch = null;
            this.consumer = null;
        }
    }

    public static class JobInfo {
        @Description(value="The name of the job.")
        public final String name;
        @Description(value="The delay on the job.")
        public long delay;
        @Description(value="The rate of the job.")
        public long rate;
        @Description(value="If the job has completed.")
        public boolean done;
        @Description(value="If the job has been cancelled.")
        public boolean cancelled;

        public JobInfo(String name) {
            this.name = name;
        }

        public JobInfo(String name, long delay, long rate) {
            this.name = name;
            this.delay = delay;
            this.rate = rate;
        }

        public JobInfo update(Future future) {
            this.done = future.isDone();
            this.cancelled = future.isCancelled();
            return this;
        }

        public boolean equals(Object o) {
            return this == o || o instanceof JobInfo && this.name.equals(((JobInfo)o).name);
        }

        public int hashCode() {
            return this.name.hashCode();
        }
    }
}

