/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.driver.integration;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.hamcrest.junit.MatcherAssert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.neo4j.driver.Driver;
import org.neo4j.driver.QueryRunner;
import org.neo4j.driver.Result;
import org.neo4j.driver.Session;
import org.neo4j.driver.Transaction;
import org.neo4j.driver.Values;
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.exceptions.Neo4jException;
import org.neo4j.driver.exceptions.ServiceUnavailableException;
import org.neo4j.driver.exceptions.TransientException;
import org.neo4j.driver.util.DaemonThreadFactory;
import org.neo4j.driver.util.DatabaseExtension;
import org.neo4j.driver.util.ParallelizableIT;
import org.neo4j.driver.util.TestUtil;

@ParallelizableIT
class SessionResetIT {
    private static final int CSV_FILE_SIZE = 10000;
    private static final int LOAD_CSV_BATCH_SIZE = 10;
    private static final String SHORT_QUERY_1 = "CREATE (n:Node {name: 'foo', occupation: 'bar'})";
    private static final String SHORT_QUERY_2 = "MATCH (n:Node {name: 'foo'}) RETURN count(n)";
    private static final String LONG_QUERY = "UNWIND range(0, 10000000) AS i CREATE (n:Node {idx: i}) DELETE n";
    private static final String LONG_PERIODIC_COMMIT_QUERY_TEMPLATE = "USING PERIODIC COMMIT 1 LOAD CSV FROM '%s' AS line UNWIND range(1, 10) AS index CREATE (n:Node {id: index, name: line[0], occupation: line[1]})";
    private static final int STRESS_TEST_THREAD_COUNT = Runtime.getRuntime().availableProcessors() * 2;
    private static final long STRESS_TEST_DURATION_MS = TimeUnit.SECONDS.toMillis(5L);
    private static final String[] STRESS_TEST_QUERIES = new String[]{"CREATE (n:Node {name: 'foo', occupation: 'bar'})", "MATCH (n:Node {name: 'foo'}) RETURN count(n)", "UNWIND range(0, 10000000) AS i CREATE (n:Node {idx: i}) DELETE n"};
    @RegisterExtension
    static final DatabaseExtension neo4j = new DatabaseExtension();
    private ExecutorService executor;

    SessionResetIT() {
    }

    @BeforeEach
    void setUp() {
        this.executor = Executors.newCachedThreadPool(DaemonThreadFactory.daemon(this.getClass().getSimpleName() + "-thread"));
    }

    @AfterEach
    void tearDown() {
        if (this.executor != null) {
            this.executor.shutdownNow();
        }
    }

    @Test
    void shouldTerminateAutoCommitQuery() {
        this.testQueryTermination(LONG_QUERY, true);
    }

    @Test
    void shouldTerminateQueryInUnmanagedTransaction() {
        this.testQueryTermination(LONG_QUERY, false);
    }

    @Test
    void shouldTerminatePeriodicCommitQueryRandomly() {
        Assumptions.assumeTrue((boolean)neo4j.isNeo4j44OrEarlier());
        Future<Void> queryResult = this.runQueryInDifferentThreadAndResetSession(SessionResetIT.longPeriodicCommitQuery(), true);
        ExecutionException e = (ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> {
            Void cfr_ignored_0 = (Void)queryResult.get(1L, TimeUnit.MINUTES);
        });
        MatcherAssert.assertThat((Object)e.getCause(), (Matcher)Matchers.instanceOf(Neo4jException.class));
        this.awaitNoActiveQueries();
        MatcherAssert.assertThat((Object)this.countNodes(), (Matcher)Matchers.lessThanOrEqualTo((Comparable)Long.valueOf(100000L)));
    }

    @Test
    void shouldTerminateAutoCommitQueriesRandomly() throws Exception {
        this.testRandomQueryTermination(true);
    }

    @Test
    void shouldTerminateQueriesInUnmanagedTransactionsRandomly() throws Exception {
        this.testRandomQueryTermination(false);
    }

    private void resetSessionAfterTimeout(Session session, int timeout) {
        this.executor.submit(() -> {
            try {
                Thread.sleep(timeout * 1000);
            }
            catch (InterruptedException interruptedException) {
            }
            finally {
                session.reset();
            }
        });
    }

    @Test
    void shouldAllowMoreQueriesAfterSessionReset() {
        try (Session session = neo4j.driver().session();){
            session.run("RETURN 1").consume();
            session.reset();
            session.run("RETURN 2").consume();
        }
    }

    @Test
    void shouldAllowMoreTxAfterSessionReset() {
        try (Session session = neo4j.driver().session();){
            try (Transaction tx = session.beginTransaction();){
                tx.run("RETURN 1");
                tx.commit();
            }
            session.reset();
            tx = session.beginTransaction();
            var4_6 = null;
            try {
                tx.run("RETURN 2");
                tx.commit();
            }
            catch (Throwable throwable) {
                var4_6 = throwable;
                throw throwable;
            }
            finally {
                if (tx != null) {
                    if (var4_6 != null) {
                        try {
                            tx.close();
                        }
                        catch (Throwable throwable) {
                            var4_6.addSuppressed(throwable);
                        }
                    } else {
                        tx.close();
                    }
                }
            }
        }
    }

    @Test
    void shouldMarkTxAsFailedAndDisallowRunAfterSessionReset() {
        try (Session session = neo4j.driver().session();){
            Transaction tx = session.beginTransaction();
            session.reset();
            Exception e = (Exception)Assertions.assertThrows(Exception.class, () -> {
                tx.run("RETURN 1");
                tx.commit();
            });
            MatcherAssert.assertThat((Object)e.getMessage(), (Matcher)CoreMatchers.startsWith((String)"Cannot run more queries in this transaction"));
        }
    }

    @Test
    void shouldAllowMoreTxAfterSessionResetInTx() {
        try (Session session = neo4j.driver().session();){
            try (Transaction ignore = session.beginTransaction();){
                session.reset();
            }
            var4_6 = null;
            try (Transaction tx = session.beginTransaction();){
                tx.run("RETURN 2");
                tx.commit();
            }
            catch (Throwable throwable) {
                var4_6 = throwable;
                throw throwable;
            }
        }
    }

    @Test
    void resetShouldStopQueryWaitingForALock() throws Exception {
        this.testResetOfQueryWaitingForLock(new NodeIdUpdater(){

            @Override
            void performUpdate(Driver driver, int nodeId, int newNodeId, AtomicReference<Session> usedSessionRef, CountDownLatch latchToWait) throws Exception {
                try (Session session = driver.session();){
                    usedSessionRef.set(session);
                    latchToWait.await();
                    Result result = SessionResetIT.updateNodeId((QueryRunner)session, nodeId, newNodeId);
                    result.consume();
                }
            }
        });
    }

    @Test
    void resetShouldStopTransactionWaitingForALock() throws Exception {
        this.testResetOfQueryWaitingForLock(new NodeIdUpdater(){

            @Override
            public void performUpdate(Driver driver, int nodeId, int newNodeId, AtomicReference<Session> usedSessionRef, CountDownLatch latchToWait) throws Exception {
                try (Session session = neo4j.driver().session();
                     Transaction tx = session.beginTransaction();){
                    usedSessionRef.set(session);
                    latchToWait.await();
                    Result result = SessionResetIT.updateNodeId((QueryRunner)tx, nodeId, newNodeId);
                    result.consume();
                }
            }
        });
    }

    @Test
    void resetShouldStopWriteTransactionWaitingForALock() throws Exception {
        final AtomicInteger invocationsOfWork = new AtomicInteger();
        this.testResetOfQueryWaitingForLock(new NodeIdUpdater(){

            @Override
            public void performUpdate(Driver driver, int nodeId, int newNodeId, AtomicReference<Session> usedSessionRef, CountDownLatch latchToWait) throws Exception {
                try (Session session = driver.session();){
                    usedSessionRef.set(session);
                    latchToWait.await();
                    session.writeTransaction(tx -> {
                        invocationsOfWork.incrementAndGet();
                        Result result = SessionResetIT.updateNodeId((QueryRunner)tx, nodeId, newNodeId);
                        result.consume();
                        return null;
                    });
                }
            }
        });
        Assertions.assertEquals((int)1, (int)invocationsOfWork.get());
    }

    @Test
    void shouldBeAbleToRunMoreQueriesAfterResetOnNoErrorState() {
        try (Session session = neo4j.driver().session();){
            session.reset();
            Transaction tx = session.beginTransaction();
            tx.run("CREATE (n:FirstNode)");
            tx.commit();
            Result result = session.run("MATCH (n) RETURN count(n)");
            long nodes = result.single().get("count(n)").asLong();
            MatcherAssert.assertThat((Object)nodes, (Matcher)CoreMatchers.equalTo((Object)1L));
        }
    }

    @Test
    void shouldHandleResetBeforeRun() {
        try (Session session = neo4j.driver().session();
             Transaction tx = session.beginTransaction();){
            session.reset();
            ClientException e = (ClientException)Assertions.assertThrows(ClientException.class, () -> tx.run("CREATE (n:FirstNode)"));
            MatcherAssert.assertThat((Object)e.getMessage(), (Matcher)CoreMatchers.containsString((String)"Cannot run more queries in this transaction"));
        }
    }

    @Test
    void shouldHandleResetFromMultipleThreads() throws Throwable {
        Session session = neo4j.driver().session();
        CountDownLatch beforeCommit = new CountDownLatch(1);
        CountDownLatch afterReset = new CountDownLatch(1);
        Future<Void> txFuture = this.executor.submit(() -> {
            Transaction tx1 = session.beginTransaction();
            tx1.run("CREATE (n:FirstNode)");
            beforeCommit.countDown();
            afterReset.await();
            try {
                tx1.commit();
            }
            catch (Neo4jException neo4jException) {
                // empty catch block
            }
            try (Transaction tx2 = session.beginTransaction();){
                tx2.run("CREATE (n:SecondNode)");
                tx2.commit();
            }
            return null;
        });
        Future<Void> resetFuture = this.executor.submit(() -> {
            beforeCommit.await();
            session.reset();
            afterReset.countDown();
            return null;
        });
        this.executor.shutdown();
        this.executor.awaitTermination(20L, TimeUnit.SECONDS);
        txFuture.get(20L, TimeUnit.SECONDS);
        resetFuture.get(20L, TimeUnit.SECONDS);
        Assertions.assertEquals((long)0L, (long)this.countNodes("FirstNode"));
        Assertions.assertEquals((long)1L, (long)this.countNodes("SecondNode"));
    }

    private void testResetOfQueryWaitingForLock(NodeIdUpdater nodeIdUpdater) throws Exception {
        Assumptions.assumeTrue((boolean)neo4j.isNeo4j44OrEarlier());
        int nodeId = 42;
        int newNodeId1 = 4242;
        int newNodeId2 = 424242;
        this.createNodeWithId(nodeId);
        CountDownLatch nodeLocked = new CountDownLatch(1);
        AtomicReference<Session> otherSessionRef = new AtomicReference<Session>();
        try (Session session = neo4j.driver().session();
             Transaction tx = session.beginTransaction();){
            Future<Void> txResult = nodeIdUpdater.update(nodeId, newNodeId1, otherSessionRef, nodeLocked);
            Result result = SessionResetIT.updateNodeId((QueryRunner)tx, nodeId, newNodeId2);
            result.consume();
            nodeLocked.countDown();
            Thread.sleep(2000L);
            otherSessionRef.get().reset();
            SessionResetIT.assertTransactionTerminated(txResult);
            tx.commit();
        }
        session = neo4j.driver().session();
        var8_8 = null;
        try {
            Result result = session.run("MATCH (n) RETURN n.id AS id");
            int value = result.single().get("id").asInt();
            Assertions.assertEquals((int)newNodeId2, (int)value);
        }
        catch (Throwable throwable) {
            var8_8 = throwable;
            throw throwable;
        }
        finally {
            if (session != null) {
                if (var8_8 != null) {
                    try {
                        session.close();
                    }
                    catch (Throwable throwable) {
                        var8_8.addSuppressed(throwable);
                    }
                } else {
                    session.close();
                }
            }
        }
    }

    private void createNodeWithId(int id) {
        try (Session session = neo4j.driver().session();){
            session.run("CREATE (n {id: $id})", Values.parameters((Object[])new Object[]{"id", id}));
        }
    }

    private static Result updateNodeId(QueryRunner queryRunner, int currentId, int newId) {
        return queryRunner.run("MATCH (n {id: $currentId}) SET n.id = $newId", Values.parameters((Object[])new Object[]{"currentId", currentId, "newId", newId}));
    }

    private static void assertTransactionTerminated(Future<Void> work) {
        ExecutionException e = (ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> {
            Void cfr_ignored_0 = (Void)work.get(20L, TimeUnit.SECONDS);
        });
        MatcherAssert.assertThat((Object)e.getCause(), (Matcher)CoreMatchers.instanceOf(TransientException.class));
        MatcherAssert.assertThat((Object)e.getCause().getMessage(), (Matcher)CoreMatchers.startsWith((String)"The transaction has been terminated"));
    }

    private void testRandomQueryTermination(boolean autoCommit) throws Exception {
        Assumptions.assumeTrue((boolean)neo4j.isNeo4j44OrEarlier());
        Set<Session> runningSessions = Collections.newSetFromMap(new ConcurrentHashMap());
        AtomicBoolean stop = new AtomicBoolean();
        ArrayList futures = new ArrayList();
        for (int i = 0; i < STRESS_TEST_THREAD_COUNT; ++i) {
            futures.add(this.executor.submit(() -> {
                ThreadLocalRandom random = ThreadLocalRandom.current();
                while (!stop.get()) {
                    this.runRandomQuery(autoCommit, random, runningSessions, stop);
                }
            }));
        }
        long deadline = System.currentTimeMillis() + STRESS_TEST_DURATION_MS;
        while (!stop.get()) {
            if (System.currentTimeMillis() > deadline) {
                stop.set(true);
            }
            SessionResetIT.resetAny(runningSessions);
            TimeUnit.MILLISECONDS.sleep(30L);
        }
        TestUtil.awaitAllFutures(futures);
        this.awaitNoActiveQueries();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void runRandomQuery(boolean autoCommit, Random random, Set<Session> runningSessions, AtomicBoolean stop) {
        block5: {
            try {
                Session session = neo4j.driver().session();
                runningSessions.add(session);
                try {
                    String query = STRESS_TEST_QUERIES[random.nextInt(STRESS_TEST_QUERIES.length - 1)];
                    SessionResetIT.runQuery(session, query, autoCommit);
                }
                finally {
                    runningSessions.remove(session);
                    session.close();
                }
            }
            catch (Throwable error) {
                if (stop.get() || SessionResetIT.isAcceptable(error)) break block5;
                stop.set(true);
                throw error;
            }
        }
    }

    private void testQueryTermination(String query, boolean autoCommit) {
        Assumptions.assumeTrue((boolean)neo4j.isNeo4j44OrEarlier());
        Future<Void> queryResult = this.runQueryInDifferentThreadAndResetSession(query, autoCommit);
        ExecutionException e = (ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> {
            Void cfr_ignored_0 = (Void)queryResult.get(10L, TimeUnit.SECONDS);
        });
        MatcherAssert.assertThat((Object)e.getCause(), (Matcher)Matchers.instanceOf(Neo4jException.class));
        this.awaitNoActiveQueries();
    }

    private Future<Void> runQueryInDifferentThreadAndResetSession(String query, boolean autoCommit) {
        AtomicReference sessionRef = new AtomicReference();
        CompletableFuture<Void> queryResult = CompletableFuture.runAsync(() -> {
            Session session = neo4j.driver().session();
            sessionRef.set(session);
            SessionResetIT.runQuery(session, query, autoCommit);
        });
        this.awaitActiveQueriesToContain(query);
        Session session = (Session)sessionRef.get();
        Assertions.assertNotNull((Object)session);
        session.reset();
        return queryResult;
    }

    private static void runQuery(Session session, String query, boolean autoCommit) {
        if (autoCommit) {
            session.run(query).consume();
        } else {
            try (Transaction tx = session.beginTransaction();){
                tx.run(query);
                tx.commit();
            }
        }
    }

    private void awaitNoActiveQueries() {
        TestUtil.awaitCondition(() -> TestUtil.activeQueryCount(neo4j.driver()) == 0);
    }

    private void awaitActiveQueriesToContain(String value) {
        TestUtil.awaitCondition(() -> TestUtil.activeQueryNames(neo4j.driver()).stream().anyMatch(query -> query.contains(value)));
    }

    private long countNodes() {
        return this.countNodes(null);
    }

    private long countNodes(String label) {
        try (Session session = neo4j.driver().session();){
            Result result = session.run("MATCH (n" + (label == null ? "" : ":" + label) + ") RETURN count(n) AS result");
            long l = result.single().get(0).asLong();
            return l;
        }
    }

    private static void resetAny(Set<Session> sessions) {
        sessions.stream().findAny().ifPresent(session -> {
            if (sessions.remove(session)) {
                SessionResetIT.resetSafely(session);
            }
        });
    }

    private static void resetSafely(Session session) {
        block3: {
            try {
                if (session.isOpen()) {
                    session.reset();
                }
            }
            catch (ClientException e) {
                if (!session.isOpen()) break block3;
                throw e;
            }
        }
    }

    private static boolean isAcceptable(Throwable error) {
        while (error.getCause() != null) {
            error = error.getCause();
        }
        return SessionResetIT.isTransactionTerminatedException(error) || error instanceof ServiceUnavailableException || error instanceof ClientException || error instanceof ClosedChannelException;
    }

    private static boolean isTransactionTerminatedException(Throwable error) {
        return error instanceof TransientException && error.getMessage().startsWith("The transaction has been terminated") || error.getMessage().startsWith("Trying to execute query in a terminated transaction");
    }

    private static String longPeriodicCommitQuery() {
        String fileUri = SessionResetIT.createTmpCsvFile();
        return String.format(LONG_PERIODIC_COMMIT_QUERY_TEMPLATE, fileUri);
    }

    private static String createTmpCsvFile() {
        String lines = IntStream.range(0, 10000).mapToObj(i -> "Foo-" + i + ", Bar-" + i).collect(Collectors.joining(System.lineSeparator()));
        try {
            return neo4j.addImportFile("iris", ".csv", lines);
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    private abstract class NodeIdUpdater {
        private NodeIdUpdater() {
        }

        final Future<Void> update(int nodeId, int newNodeId, AtomicReference<Session> usedSessionRef, CountDownLatch latchToWait) {
            return SessionResetIT.this.executor.submit(() -> {
                this.performUpdate(neo4j.driver(), nodeId, newNodeId, usedSessionRef, latchToWait);
                return null;
            });
        }

        abstract void performUpdate(Driver var1, int var2, int var3, AtomicReference<Session> var4, CountDownLatch var5) throws Exception;
    }
}

