/*
 * Decompiled with CFR 0.152.
 */
package apoc.periodic;

import apoc.Pools;
import apoc.periodic.BatchAndTotalCollector;
import apoc.periodic.BatchAndTotalResult;
import apoc.periodic.BatchMode;
import apoc.periodic.PeriodicUtils;
import apoc.util.Util;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.QueryExecutionType;
import org.neo4j.graphdb.QueryStatistics;
import org.neo4j.graphdb.Result;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.schema.ConstraintDefinition;
import org.neo4j.graphdb.schema.IndexDefinition;
import org.neo4j.graphdb.schema.Schema;
import org.neo4j.internal.helpers.collection.Iterables;
import org.neo4j.internal.helpers.collection.Iterators;
import org.neo4j.internal.helpers.collection.Pair;
import org.neo4j.logging.Log;
import org.neo4j.procedure.Admin;
import org.neo4j.procedure.Context;
import org.neo4j.procedure.Description;
import org.neo4j.procedure.Mode;
import org.neo4j.procedure.Name;
import org.neo4j.procedure.Procedure;
import org.neo4j.procedure.TerminationGuard;

public class Periodic {
    public static final Pattern RUNTIME_PATTERN = Pattern.compile("\\bruntime\\s*=", 2);
    public static final Pattern CYPHER_PREFIX_PATTERN = Pattern.compile("\\bcypher\\b", 2);
    public static final String CYPHER_RUNTIME_SLOTTED = "cypher runtime=slotted ";
    static final Pattern LIMIT_PATTERN = Pattern.compile("\\slimit\\s", 2);
    @Context
    public GraphDatabaseService db;
    @Context
    public TerminationGuard terminationGuard;
    @Context
    public Log log;
    @Context
    public Pools pools;
    @Context
    public Transaction tx;

    @Admin
    @Procedure(mode=Mode.SCHEMA)
    @Description(value="apoc.periodic.truncate({config}) - removes all entities (and optionally indexes and constraints) from db using the apoc.periodic.iterate under the hood")
    public void truncate(@Name(value="config", defaultValue="{}") Map<String, Object> config) {
        this.iterate("MATCH ()-[r]->() RETURN id(r) as id", "MATCH ()-[r]->() WHERE id(r) = id DELETE r", config);
        this.iterate("MATCH (n) RETURN id(n) as id", "MATCH (n) WHERE id(n) = id DELETE n", config);
        if (Util.toBoolean(config.get("dropSchema"))) {
            Schema schema = this.tx.schema();
            schema.getConstraints().forEach(ConstraintDefinition::drop);
            schema.getIndexes().forEach(IndexDefinition::drop);
        }
    }

    @Procedure
    @Description(value="apoc.periodic.list - list all jobs")
    public Stream<JobInfo> list() {
        return this.pools.getJobList().entrySet().stream().map(e -> ((JobInfo)e.getKey()).update((Future)e.getValue()));
    }

    @Procedure(mode=Mode.WRITE)
    @Description(value="apoc.periodic.commit(statement,params) - runs the given statement in separate transactions until it returns 0")
    public Stream<RundownResult> commit(@Name(value="statement") String statement, @Name(value="params", defaultValue="{}") Map<String, Object> parameters) throws ExecutionException, InterruptedException {
        this.validateQuery(statement);
        Map<Object, Object> params = parameters == null ? Collections.emptyMap() : parameters;
        long total = 0L;
        long executions = 0L;
        long updates = 0L;
        long start = System.nanoTime();
        if (!LIMIT_PATTERN.matcher(statement).find()) {
            throw new IllegalArgumentException("the statement sent to apoc.periodic.commit must contain a `limit`");
        }
        AtomicInteger batches = new AtomicInteger();
        AtomicInteger failedCommits = new AtomicInteger();
        ConcurrentHashMap<String, Long> commitErrors = new ConcurrentHashMap<String, Long>();
        AtomicInteger failedBatches = new AtomicInteger();
        ConcurrentHashMap<String, Long> batchErrors = new ConcurrentHashMap<String, Long>();
        do {
            Map<String, Object> window = Util.map("_count", updates, "_total", total);
            updates = Util.getFuture(this.pools.getScheduledExecutorService().submit(() -> {
                batches.incrementAndGet();
                try {
                    return this.executeNumericResultStatement(statement, Util.merge(window, params));
                }
                catch (Exception e) {
                    failedBatches.incrementAndGet();
                    Periodic.recordError(batchErrors, e);
                    return 0L;
                }
            }), commitErrors, failedCommits, 0L);
            total += updates;
            if (updates <= 0L) continue;
            ++executions;
        } while (updates > 0L && !Util.transactionIsTerminated(this.terminationGuard));
        long timeTaken = TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start);
        boolean wasTerminated = Util.transactionIsTerminated(this.terminationGuard);
        return Stream.of(new RundownResult(total, executions, timeTaken, batches.get(), failedBatches.get(), batchErrors, failedCommits.get(), commitErrors, wasTerminated));
    }

    private static void recordError(Map<String, Long> executionErrors, Exception e) {
        String msg = ExceptionUtils.getRootCause(e).getMessage();
        executionErrors.compute(msg, (s, i) -> i == null ? 1L : i + 1L);
    }

    private long executeNumericResultStatement(@Name(value="statement") String statement, @Name(value="params") Map<String, Object> parameters) {
        return (Long)this.db.executeTransactionally(statement, parameters, result -> {
            String column = (String)Iterables.single((Iterable)result.columns());
            return result.columnAs(column).stream().mapToLong(o -> (Long)o).sum();
        });
    }

    @Procedure
    @Description(value="apoc.periodic.cancel(name) - cancel job with the given name")
    public Stream<JobInfo> cancel(@Name(value="name") String name) {
        JobInfo info = new JobInfo(name);
        Future future = this.pools.getJobList().remove(info);
        if (future != null) {
            future.cancel(false);
            return Stream.of(info.update(future));
        }
        return Stream.empty();
    }

    @Procedure(mode=Mode.WRITE)
    @Description(value="apoc.periodic.submit('name',statement) - submit a one-off background statement")
    public Stream<JobInfo> submit(@Name(value="name") String name, @Name(value="statement") String statement) {
        this.validateQuery(statement);
        JobInfo info = this.submit(name, () -> {
            try {
                this.db.executeTransactionally(statement);
            }
            catch (Exception e) {
                this.log.warn("in background task via submit", (Throwable)e);
                throw new RuntimeException(e);
            }
        }, this.log);
        return Stream.of(info);
    }

    @Procedure(mode=Mode.WRITE)
    @Description(value="apoc.periodic.repeat('name',statement,repeat-rate-in-seconds, config) submit a repeatedly-called background statement. Fourth parameter 'config' is optional and can contain 'params' entry for nested statement.")
    public Stream<JobInfo> repeat(@Name(value="name") String name, @Name(value="statement") String statement, @Name(value="rate") long rate, @Name(value="config", defaultValue="{}") Map<String, Object> config) {
        this.validateQuery(statement);
        Map params = config.getOrDefault("params", Collections.emptyMap());
        JobInfo info = this.schedule(name, () -> this.db.executeTransactionally(statement, params), 0L, rate);
        return Stream.of(info);
    }

    private void validateQuery(String statement) {
        Util.validateQuery(this.db, statement, new QueryExecutionType.QueryType[0]);
    }

    @Procedure(mode=Mode.WRITE)
    @Description(value="apoc.periodic.countdown('name',statement,repeat-rate-in-seconds) submit a repeatedly-called background statement until it returns 0")
    public Stream<JobInfo> countdown(@Name(value="name") String name, @Name(value="statement") String statement, @Name(value="rate") long rate) {
        this.validateQuery(statement);
        JobInfo info = this.submit(name, new Countdown(name, statement, rate, this.log), this.log);
        info.rate = rate;
        return Stream.of(info);
    }

    public <T> JobInfo submit(String name, Runnable task, Log log) {
        JobInfo info = new JobInfo(name);
        Future future = this.pools.getJobList().remove(info);
        if (future != null && !future.isDone()) {
            future.cancel(false);
        }
        Runnable wrappingTask = Periodic.wrapTask(name, task, log);
        Future<?> newFuture = this.pools.getScheduledExecutorService().submit(wrappingTask);
        this.pools.getJobList().put(info, newFuture);
        return info;
    }

    public JobInfo schedule(String name, Runnable task, long delay, long repeat) {
        JobInfo info = new JobInfo(name, delay, repeat);
        Future future = this.pools.getJobList().remove(info);
        if (future != null && !future.isDone()) {
            future.cancel(false);
        }
        Runnable wrappingTask = Periodic.wrapTask(name, task, this.log);
        ScheduledFuture<?> newFuture = this.pools.getScheduledExecutorService().scheduleWithFixedDelay(wrappingTask, delay, repeat, TimeUnit.SECONDS);
        this.pools.getJobList().put(info, newFuture);
        return info;
    }

    private static Runnable wrapTask(String name, Runnable task, Log log) {
        return () -> {
            log.debug("Executing task " + name);
            try {
                task.run();
            }
            catch (Exception e) {
                log.error("Error while executing task " + name + " because of the following exception (the task will be killed):", (Throwable)e);
                throw e;
            }
            log.debug("Executed task " + name);
        };
    }

    @Procedure(mode=Mode.WRITE)
    @Description(value="apoc.periodic.iterate('statement returning items', 'statement per item', {batchSize:1000,iterateList:true,parallel:false,params:{},concurrency:50,retries:0}) YIELD batches, total - run the second statement for each item returned by the first statement. Returns number of batches and total processed rows")
    public Stream<BatchAndTotalResult> iterate(@Name(value="cypherIterate") String cypherIterate, @Name(value="cypherAction") String cypherAction, @Name(value="config") Map<String, Object> config) {
        this.validateQuery(cypherIterate);
        long batchSize = Util.toLong(config.getOrDefault("batchSize", 10000));
        if (batchSize < 1L) {
            throw new IllegalArgumentException("batchSize parameter must be > 0");
        }
        int concurrency = Util.toInteger(config.getOrDefault("concurrency", 50));
        if (concurrency < 1) {
            throw new IllegalArgumentException("concurrency parameter must be > 0");
        }
        boolean parallel = Util.toBoolean(config.getOrDefault("parallel", false));
        long retries = Util.toLong(config.getOrDefault("retries", 0));
        int failedParams = Util.toInteger(config.getOrDefault("failedParams", -1));
        BatchMode batchMode = BatchMode.fromConfig(config);
        Map params = config.getOrDefault("params", Collections.emptyMap());
        try (Result result = this.tx.execute(Periodic.slottedRuntime(cypherIterate), params);){
            Pair<String, Boolean> prepared = PeriodicUtils.prepareInnerStatement(cypherAction, batchMode, result.columns(), "_batch");
            String innerStatement = (String)prepared.first();
            boolean iterateList = (Boolean)prepared.other();
            this.log.info("starting batching from `%s` operation using iteration `%s` in separate thread", new Object[]{cypherIterate, cypherAction});
            Stream<BatchAndTotalResult> stream = PeriodicUtils.iterateAndExecuteBatchedInSeparateThread(this.db, this.terminationGuard, this.log, this.pools, (int)batchSize, parallel, iterateList, retries, (Iterator<Map<String, Object>>)result, (tx, p) -> {
                Result r = tx.execute(innerStatement, Util.merge(params, p));
                Iterators.count((Iterator)r);
                return r.getQueryStatistics();
            }, concurrency, failedParams);
            return stream;
        }
    }

    static String slottedRuntime(String cypherIterate) {
        if (RUNTIME_PATTERN.matcher(cypherIterate).find()) {
            return cypherIterate;
        }
        Matcher matcher = CYPHER_PREFIX_PATTERN.matcher(cypherIterate.substring(0, Math.min(15, cypherIterate.length())));
        return matcher.find() ? CYPHER_PREFIX_PATTERN.matcher(cypherIterate).replaceFirst(CYPHER_RUNTIME_SLOTTED) : CYPHER_RUNTIME_SLOTTED + cypherIterate;
    }

    private static long executeAndReportErrors(Transaction tx, BiFunction<Transaction, Map<String, Object>, QueryStatistics> consumer, Map<String, Object> params, List<Map<String, Object>> batch, int returnValue, AtomicLong localCount, BatchAndTotalCollector collector) {
        try {
            QueryStatistics statistics = consumer.apply(tx, params);
            if (localCount != null) {
                localCount.getAndIncrement();
            }
            collector.updateStatistics(statistics);
            return returnValue;
        }
        catch (Exception e) {
            collector.incrementFailedOps(batch.size());
            collector.amendFailedParamsMap(batch);
            Periodic.recordError(collector.getOperationErrors(), e);
            throw e;
        }
    }

    private class Countdown
    implements Runnable {
        private final String name;
        private final String statement;
        private final long rate;
        private final transient Log log;

        public Countdown(String name, String statement, long rate, Log log) {
            this.name = name;
            this.statement = statement;
            this.rate = rate;
            this.log = log;
        }

        @Override
        public void run() {
            if (Periodic.this.executeNumericResultStatement(this.statement, Collections.emptyMap()) > 0L) {
                Periodic.this.pools.getScheduledExecutorService().schedule(() -> Periodic.this.submit(this.name, this, this.log), this.rate, TimeUnit.SECONDS);
            }
        }
    }

    public static class JobInfo {
        public final String name;
        public long delay;
        public long rate;
        public boolean done;
        public boolean cancelled;

        public JobInfo(String name) {
            this.name = name;
        }

        public JobInfo(String name, long delay, long rate) {
            this.name = name;
            this.delay = delay;
            this.rate = rate;
        }

        public JobInfo update(Future future) {
            this.done = future.isDone();
            this.cancelled = future.isCancelled();
            return this;
        }

        public boolean equals(Object o) {
            return this == o || o instanceof JobInfo && this.name.equals(((JobInfo)o).name);
        }

        public int hashCode() {
            return this.name.hashCode();
        }
    }

    static class OneByOneExecuteBatch
    extends ExecuteBatch {
        OneByOneExecuteBatch(TerminationGuard terminationGuard, BatchAndTotalCollector collector, List<Map<String, Object>> batch, BiFunction<Transaction, Map<String, Object>, QueryStatistics> consumer) {
            super(terminationGuard, collector, batch, consumer);
        }

        @Override
        public final Long apply(Transaction txInThread) {
            if (Util.transactionIsTerminated(this.terminationGuard)) {
                return 0L;
            }
            AtomicLong localCount = new AtomicLong(this.collector.getCount());
            return this.batch.stream().mapToLong(p -> {
                if (localCount.get() % 1000L == 0L && Util.transactionIsTerminated(this.terminationGuard)) {
                    return 0L;
                }
                Map<String, Object> params = Util.merge(p, Util.map("_count", localCount.get(), "_batch", this.batch));
                return Periodic.executeAndReportErrors(txInThread, this.consumer, params, this.batch, 1, localCount, this.collector);
            }).sum();
        }
    }

    static class ListExecuteBatch
    extends ExecuteBatch {
        ListExecuteBatch(TerminationGuard terminationGuard, BatchAndTotalCollector collector, List<Map<String, Object>> batch, BiFunction<Transaction, Map<String, Object>, QueryStatistics> consumer) {
            super(terminationGuard, collector, batch, consumer);
        }

        @Override
        public final Long apply(Transaction txInThread) {
            if (Util.transactionIsTerminated(this.terminationGuard)) {
                return 0L;
            }
            Map<String, Object> params = Util.map("_count", this.collector.getCount(), "_batch", this.batch);
            return Periodic.executeAndReportErrors(txInThread, this.consumer, params, this.batch, this.batch.size(), null, this.collector);
        }
    }

    static abstract class ExecuteBatch
    implements Function<Transaction, Long> {
        protected TerminationGuard terminationGuard;
        protected BatchAndTotalCollector collector;
        protected List<Map<String, Object>> batch;
        protected BiFunction<Transaction, Map<String, Object>, QueryStatistics> consumer;

        ExecuteBatch(TerminationGuard terminationGuard, BatchAndTotalCollector collector, List<Map<String, Object>> batch, BiFunction<Transaction, Map<String, Object>, QueryStatistics> consumer) {
            this.terminationGuard = terminationGuard;
            this.collector = collector;
            this.batch = batch;
            this.consumer = consumer;
        }

        public void release() {
            this.terminationGuard = null;
            this.collector = null;
            this.batch = null;
            this.consumer = null;
        }
    }

    public static class RundownResult {
        public final long updates;
        public final long executions;
        public final long runtime;
        public final long batches;
        public final long failedBatches;
        public final Map<String, Long> batchErrors;
        public final long failedCommits;
        public final Map<String, Long> commitErrors;
        public final boolean wasTerminated;

        public RundownResult(long total, long executions, long timeTaken, long batches, long failedBatches, Map<String, Long> batchErrors, long failedCommits, Map<String, Long> commitErrors, boolean wasTerminated) {
            this.updates = total;
            this.executions = executions;
            this.runtime = timeTaken;
            this.batches = batches;
            this.failedBatches = failedBatches;
            this.batchErrors = batchErrors;
            this.failedCommits = failedCommits;
            this.commitErrors = commitErrors;
            this.wasTerminated = wasTerminated;
        }
    }
}

