/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.driver.integration.reactive;

import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.neo4j.driver.Result;
import org.neo4j.driver.Session;
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.exceptions.DatabaseException;
import org.neo4j.driver.exceptions.ServiceUnavailableException;
import org.neo4j.driver.exceptions.SessionExpiredException;
import org.neo4j.driver.exceptions.TransientException;
import org.neo4j.driver.internal.util.EnabledOnNeo4jWith;
import org.neo4j.driver.internal.util.Neo4jFeature;
import org.neo4j.driver.reactive.RxResult;
import org.neo4j.driver.reactive.RxSession;
import org.neo4j.driver.reactive.RxTransaction;
import org.neo4j.driver.reactive.RxTransactionWork;
import org.neo4j.driver.util.DatabaseExtension;
import org.neo4j.driver.util.ParallelizableIT;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

@EnabledOnNeo4jWith(value=Neo4jFeature.BOLT_V4)
@ParallelizableIT
class RxSessionIT {
    @RegisterExtension
    static final DatabaseExtension neo4j = new DatabaseExtension();

    RxSessionIT() {
    }

    @Test
    void shouldAllowSessionRun() {
        RxSession session = neo4j.driver().rxSession();
        RxResult res = session.run("UNWIND [1,2,3,4] AS a RETURN a");
        StepVerifier.create((Publisher)Flux.from((Publisher)res.records()).map(r -> r.get("a").asInt())).expectNext((Object)1).expectNext((Object)2).expectNext((Object)3).expectNext((Object)4).expectComplete().verify();
    }

    @Test
    void shouldBeAbleToReuseSessionAfterFailure() {
        RxSession session = neo4j.driver().rxSession();
        RxResult res1 = session.run("INVALID");
        StepVerifier.create((Publisher)res1.records()).expectError(ClientException.class).verify();
        RxResult res2 = session.run("RETURN 1");
        StepVerifier.create((Publisher)res2.records()).assertNext(record -> Assertions.assertEquals((long)record.get("1").asLong(), (long)1L)).expectComplete().verify();
    }

    @Test
    void shouldRunAsyncTransactionWithoutRetries() {
        RxSession session = neo4j.driver().rxSession();
        InvocationTrackingWork work = new InvocationTrackingWork("CREATE (:Apa) RETURN 42");
        Publisher publisher = session.writeTransaction((RxTransactionWork)work);
        StepVerifier.create((Publisher)publisher).expectNext((Object)42).verifyComplete();
        Assertions.assertEquals((int)1, (int)work.invocationCount());
        Assertions.assertEquals((long)1L, (long)this.countNodesByLabel("Apa"));
    }

    @Test
    void shouldRunAsyncTransactionWithRetriesOnAsyncFailures() {
        RxSession session = neo4j.driver().rxSession();
        InvocationTrackingWork work = new InvocationTrackingWork("CREATE (:Node) RETURN 24").withAsyncFailures(new RuntimeException[]{new ServiceUnavailableException("Oh!"), new SessionExpiredException("Ah!"), new TransientException("Code", "Message")});
        Publisher publisher = session.writeTransaction((RxTransactionWork)work);
        StepVerifier.create((Publisher)publisher).expectNext((Object)24).verifyComplete();
        Assertions.assertEquals((int)4, (int)work.invocationCount());
        Assertions.assertEquals((long)1L, (long)this.countNodesByLabel("Node"));
        this.assertNoParallelScheduler();
    }

    @Test
    void shouldRunAsyncTransactionWithRetriesOnSyncFailures() {
        RxSession session = neo4j.driver().rxSession();
        InvocationTrackingWork work = new InvocationTrackingWork("CREATE (:Test) RETURN 12").withSyncFailures(new RuntimeException[]{new TransientException("Oh!", "Deadlock!"), new ServiceUnavailableException("Oh! Network Failure")});
        Publisher publisher = session.writeTransaction((RxTransactionWork)work);
        StepVerifier.create((Publisher)publisher).expectNext((Object)12).verifyComplete();
        Assertions.assertEquals((int)3, (int)work.invocationCount());
        Assertions.assertEquals((long)1L, (long)this.countNodesByLabel("Test"));
        this.assertNoParallelScheduler();
    }

    @Test
    void shouldRunAsyncTransactionThatCanNotBeRetried() {
        RxSession session = neo4j.driver().rxSession();
        InvocationTrackingWork work = new InvocationTrackingWork("UNWIND [10, 5, 0] AS x CREATE (:Hi) RETURN 10/x");
        Publisher publisher = session.writeTransaction((RxTransactionWork)work);
        StepVerifier.create((Publisher)publisher).expectNext((Object)1).expectNext((Object)2).expectErrorSatisfies(error -> MatcherAssert.assertThat((Object)error, (Matcher)CoreMatchers.instanceOf(ClientException.class))).verify();
        Assertions.assertEquals((int)1, (int)work.invocationCount());
        Assertions.assertEquals((long)0L, (long)this.countNodesByLabel("Hi"));
        this.assertNoParallelScheduler();
    }

    @Test
    void shouldRunAsyncTransactionThatCanNotBeRetriedAfterATransientFailure() {
        RxSession session = neo4j.driver().rxSession();
        InvocationTrackingWork work = new InvocationTrackingWork("CREATE (:Person) RETURN 1").withSyncFailures(new RuntimeException[]{new TransientException("Oh!", "Deadlock!")}).withAsyncFailures(new RuntimeException[]{new DatabaseException("Oh!", "OutOfMemory!")});
        Publisher publisher = session.writeTransaction((RxTransactionWork)work);
        StepVerifier.create((Publisher)publisher).expectErrorSatisfies(e -> {
            MatcherAssert.assertThat((Object)e, (Matcher)CoreMatchers.instanceOf(DatabaseException.class));
            Assertions.assertEquals((int)1, (int)e.getSuppressed().length);
            MatcherAssert.assertThat((Object)e.getSuppressed()[0], (Matcher)CoreMatchers.instanceOf(TransientException.class));
        }).verify();
        Assertions.assertEquals((int)2, (int)work.invocationCount());
        Assertions.assertEquals((long)0L, (long)this.countNodesByLabel("Person"));
        this.assertNoParallelScheduler();
    }

    private void assertNoParallelScheduler() {
        Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
        for (Thread t : threadSet) {
            String name = t.getName();
            MatcherAssert.assertThat((Object)name, (Matcher)CoreMatchers.not((Matcher)CoreMatchers.startsWith((String)"parallel")));
        }
    }

    private long countNodesByLabel(String label) {
        try (Session session = neo4j.driver().session();){
            Result result = session.run("MATCH (n:" + label + ") RETURN count(n)");
            long l = result.single().get(0).asLong();
            return l;
        }
    }

    private static class InvocationTrackingWork
    implements RxTransactionWork<Publisher<Integer>> {
        final String query;
        final AtomicInteger invocationCount;
        Iterator<RuntimeException> asyncFailures = Collections.emptyIterator();
        Iterator<RuntimeException> syncFailures = Collections.emptyIterator();

        InvocationTrackingWork(String query) {
            this.query = query;
            this.invocationCount = new AtomicInteger();
        }

        InvocationTrackingWork withAsyncFailures(RuntimeException ... failures) {
            this.asyncFailures = Arrays.asList(failures).iterator();
            return this;
        }

        InvocationTrackingWork withSyncFailures(RuntimeException ... failures) {
            this.syncFailures = Arrays.asList(failures).iterator();
            return this;
        }

        int invocationCount() {
            return this.invocationCount.get();
        }

        public Publisher<Integer> execute(RxTransaction tx) {
            this.invocationCount.incrementAndGet();
            if (this.syncFailures.hasNext()) {
                throw this.syncFailures.next();
            }
            if (this.asyncFailures.hasNext()) {
                return Mono.error((Throwable)this.asyncFailures.next());
            }
            return Flux.from((Publisher)tx.run(this.query).records()).map(r -> r.get(0).asInt());
        }
    }
}

