/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.driver.integration.reactive;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.junit.MatcherAssert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.neo4j.driver.Bookmark;
import org.neo4j.driver.Query;
import org.neo4j.driver.Record;
import org.neo4j.driver.SessionConfig;
import org.neo4j.driver.Value;
import org.neo4j.driver.Values;
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.exceptions.ServiceUnavailableException;
import org.neo4j.driver.internal.InternalBookmark;
import org.neo4j.driver.internal.util.EnabledOnNeo4jWith;
import org.neo4j.driver.internal.util.Iterables;
import org.neo4j.driver.internal.util.Matchers;
import org.neo4j.driver.internal.util.Neo4jFeature;
import org.neo4j.driver.reactive.RxResult;
import org.neo4j.driver.reactive.RxSession;
import org.neo4j.driver.reactive.RxTransaction;
import org.neo4j.driver.summary.QueryType;
import org.neo4j.driver.summary.ResultSummary;
import org.neo4j.driver.types.Node;
import org.neo4j.driver.util.DatabaseExtension;
import org.neo4j.driver.util.ParallelizableIT;
import org.neo4j.driver.util.TestUtil;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

@EnabledOnNeo4jWith(value=Neo4jFeature.BOLT_V4)
@ParallelizableIT
class RxTransactionIT {
    @RegisterExtension
    static final DatabaseExtension neo4j = new DatabaseExtension();
    private RxSession session;

    RxTransactionIT() {
    }

    @BeforeEach
    void setUp() {
        this.session = neo4j.driver().rxSession();
    }

    @Test
    void shouldBePossibleToCommitEmptyTx() {
        Bookmark bookmarkBefore = this.session.lastBookmark();
        Mono commit = Mono.from((Publisher)this.session.beginTransaction()).flatMap(tx -> Mono.from((Publisher)tx.commit()));
        StepVerifier.create((Publisher)commit).verifyComplete();
        Bookmark bookmarkAfter = this.session.lastBookmark();
        Assertions.assertNotNull((Object)bookmarkAfter);
        Assertions.assertNotEquals((Object)bookmarkBefore, (Object)bookmarkAfter);
    }

    @Test
    void shouldBePossibleToRollbackEmptyTx() {
        Bookmark bookmarkBefore = this.session.lastBookmark();
        Mono rollback = Mono.from((Publisher)this.session.beginTransaction()).flatMap(tx -> Mono.from((Publisher)tx.rollback()));
        StepVerifier.create((Publisher)rollback).verifyComplete();
        Bookmark bookmarkAfter = this.session.lastBookmark();
        Assertions.assertEquals((Object)bookmarkBefore, (Object)bookmarkAfter);
    }

    @Test
    void shouldBePossibleToRunSingleQueryAndCommit() {
        Flux ids = Flux.usingWhen((Publisher)this.session.beginTransaction(), tx -> Flux.from((Publisher)tx.run("CREATE (n:Node {id: 42}) RETURN n").records()).map(record -> record.get(0).asNode().get("id").asInt()), RxTransaction::commit, (tx, error) -> tx.rollback(), null);
        StepVerifier.create((Publisher)ids).expectNext((Object)42).verifyComplete();
        Assertions.assertEquals((int)1, (int)this.countNodes(42));
    }

    @Test
    void shouldBePossibleToRunSingleQueryAndRollback() {
        RxTransaction tx = (RxTransaction)TestUtil.await(Mono.from((Publisher)this.session.beginTransaction()));
        RxTransactionIT.assertCanRunCreate(tx);
        this.assertCanRollback(tx);
        Assertions.assertEquals((int)0, (int)this.countNodes(4242));
    }

    @ParameterizedTest
    @MethodSource(value={"commit"})
    void shouldBePossibleToRunMultipleQueries(boolean commit) {
        RxTransaction tx = (RxTransaction)TestUtil.await(Mono.from((Publisher)this.session.beginTransaction()));
        RxResult cursor1 = tx.run("CREATE (n:Node {id: 1})");
        TestUtil.await(cursor1.records());
        RxResult cursor2 = tx.run("CREATE (n:Node {id: 2})");
        TestUtil.await(cursor2.records());
        RxResult cursor3 = tx.run("CREATE (n:Node {id: 1})");
        TestUtil.await(cursor3.records());
        this.assertCanCommitOrRollback(commit, tx);
        this.verifyCommittedOrRolledBack(commit);
    }

    @ParameterizedTest
    @MethodSource(value={"commit"})
    void shouldBePossibleToRunMultipleQueriesWithoutWaiting(boolean commit) {
        RxTransaction tx = (RxTransaction)TestUtil.await(Mono.from((Publisher)this.session.beginTransaction()));
        RxResult cursor1 = tx.run("CREATE (n:Node {id: 1})");
        RxResult cursor2 = tx.run("CREATE (n:Node {id: 2})");
        RxResult cursor3 = tx.run("CREATE (n:Node {id: 1})");
        TestUtil.await(Flux.from((Publisher)cursor1.records()).concatWith(cursor2.records()).concatWith(cursor3.records()));
        this.assertCanCommitOrRollback(commit, tx);
        this.verifyCommittedOrRolledBack(commit);
    }

    @ParameterizedTest
    @MethodSource(value={"commit"})
    void shouldRunQueriesOnResultPublish(boolean commit) {
        RxTransaction tx = (RxTransaction)TestUtil.await(Mono.from((Publisher)this.session.beginTransaction()));
        RxResult cursor1 = tx.run("CREATE (n:Person {name: 'Alice'}) RETURN n.name");
        RxResult cursor2 = tx.run("CREATE (n:Person {name: 'Bob'}) RETURN n.name");
        Object records = TestUtil.await(Flux.from((Publisher)cursor2.records()).concatWith(cursor1.records()));
        MatcherAssert.assertThat((Object)records.size(), (Matcher)org.hamcrest.Matchers.equalTo((Object)2));
        MatcherAssert.assertThat((Object)((Record)records.get(0)).get("n.name").asString(), (Matcher)org.hamcrest.Matchers.equalTo((Object)"Bob"));
        MatcherAssert.assertThat((Object)((Record)records.get(1)).get("n.name").asString(), (Matcher)org.hamcrest.Matchers.equalTo((Object)"Alice"));
        this.assertCanCommitOrRollback(commit, tx);
    }

    @ParameterizedTest
    @MethodSource(value={"commit"})
    void shouldDiscardOnCommitOrRollback(boolean commit) {
        RxTransaction tx = (RxTransaction)TestUtil.await(Mono.from((Publisher)this.session.beginTransaction()));
        RxResult cursor = tx.run("UNWIND [1,2,3,4] AS a RETURN a");
        TestUtil.await(Flux.from((Publisher)cursor.keys()));
        this.assertCanCommitOrRollback(commit, tx);
        StepVerifier.create((Publisher)Flux.from((Publisher)cursor.records())).expectErrorSatisfies(error -> MatcherAssert.assertThat((Object)error.getMessage(), (Matcher)CoreMatchers.containsString((String)"has already been consumed"))).verify();
    }

    @ParameterizedTest
    @MethodSource(value={"commit"})
    void shouldBePossibleToRunMultipleQueriesWithoutStreaming(boolean commit) {
        RxTransaction tx = (RxTransaction)TestUtil.await(Mono.from((Publisher)this.session.beginTransaction()));
        RxResult cursor1 = tx.run("CREATE (n:Node {id: 1})");
        RxResult cursor2 = tx.run("CREATE (n:Node {id: 2})");
        RxResult cursor3 = tx.run("CREATE (n:Node {id: 1})");
        TestUtil.await(Flux.from((Publisher)cursor1.keys()).concatWith(cursor2.keys()).concatWith(cursor3.keys()));
        this.assertCanCommitOrRollback(commit, tx);
        this.verifyCommittedOrRolledBack(commit);
    }

    @Test
    void shouldFailToCommitAfterSingleWrongQuery() {
        RxTransaction tx = (RxTransaction)TestUtil.await(Mono.from((Publisher)this.session.beginTransaction()));
        RxTransactionIT.assertFailToRunWrongQuery(tx);
        Assertions.assertThrows(ClientException.class, () -> TestUtil.await(tx.commit()));
    }

    @Test
    void shouldAllowRollbackAfterSingleWrongQuery() {
        RxTransaction tx = (RxTransaction)TestUtil.await(Mono.from((Publisher)this.session.beginTransaction()));
        RxTransactionIT.assertFailToRunWrongQuery(tx);
        this.assertCanRollback(tx);
    }

    @Test
    void shouldFailToCommitAfterCoupleCorrectAndSingleWrongQuery() {
        RxTransaction tx = (RxTransaction)TestUtil.await(Mono.from((Publisher)this.session.beginTransaction()));
        RxTransactionIT.assertCanRunCreate(tx);
        this.assertCanRunReturnOne(tx);
        RxTransactionIT.assertFailToRunWrongQuery(tx);
        Assertions.assertThrows(ClientException.class, () -> TestUtil.await(tx.commit()));
    }

    @Test
    void shouldAllowRollbackAfterCoupleCorrectAndSingleWrongQuery() {
        RxTransaction tx = (RxTransaction)TestUtil.await(Mono.from((Publisher)this.session.beginTransaction()));
        RxTransactionIT.assertCanRunCreate(tx);
        this.assertCanRunReturnOne(tx);
        RxTransactionIT.assertFailToRunWrongQuery(tx);
        this.assertCanRollback(tx);
    }

    @Test
    void shouldNotAllowNewQueriesAfterAnIncorrectQuery() {
        RxTransaction tx = (RxTransaction)TestUtil.await(Mono.from((Publisher)this.session.beginTransaction()));
        RxTransactionIT.assertFailToRunWrongQuery(tx);
        RxResult result = tx.run("CREATE ()");
        Exception e = (Exception)Assertions.assertThrows(Exception.class, () -> TestUtil.await(result.records()));
        MatcherAssert.assertThat((Object)e.getMessage(), (Matcher)org.hamcrest.Matchers.startsWith((String)"Cannot run more queries in this transaction"));
        this.assertCanRollback(tx);
    }

    @Test
    void shouldFailBoBeginTxWithInvalidBookmark() {
        RxSession session = neo4j.driver().rxSession(SessionConfig.builder().withBookmarks(new Bookmark[]{InternalBookmark.parse((String)"InvalidBookmark")}).build());
        ClientException e = (ClientException)Assertions.assertThrows(ClientException.class, () -> TestUtil.await(session.beginTransaction()));
        MatcherAssert.assertThat((Object)e.getMessage(), (Matcher)org.hamcrest.Matchers.containsString((String)"InvalidBookmark"));
    }

    @Test
    void shouldFailToCommitWhenCommitted() {
        RxTransaction tx = (RxTransaction)TestUtil.await(Mono.from((Publisher)this.session.beginTransaction()));
        RxTransactionIT.assertCanRunCreate(tx);
        this.assertCanCommit(tx);
        Mono secondCommit = Mono.from((Publisher)tx.commit());
        StepVerifier.create((Publisher)secondCommit).expectErrorSatisfies(error -> {
            MatcherAssert.assertThat((Object)error, (Matcher)org.hamcrest.Matchers.instanceOf(ClientException.class));
            MatcherAssert.assertThat((Object)error.getMessage(), (Matcher)org.hamcrest.Matchers.startsWith((String)"Can't commit, transaction has been committed"));
        }).verify();
    }

    @Test
    void shouldFailToRollbackWhenRolledBack() {
        RxTransaction tx = (RxTransaction)TestUtil.await(Mono.from((Publisher)this.session.beginTransaction()));
        RxTransactionIT.assertCanRunCreate(tx);
        this.assertCanRollback(tx);
        Mono secondRollback = Mono.from((Publisher)tx.rollback());
        StepVerifier.create((Publisher)secondRollback).expectErrorSatisfies(error -> {
            MatcherAssert.assertThat((Object)error, (Matcher)org.hamcrest.Matchers.instanceOf(ClientException.class));
            MatcherAssert.assertThat((Object)error.getMessage(), (Matcher)org.hamcrest.Matchers.startsWith((String)"Can't rollback, transaction has been rolled back"));
        }).verify();
    }

    @Test
    void shouldFailToCommitWhenRolledBack() {
        RxTransaction tx = (RxTransaction)TestUtil.await(Mono.from((Publisher)this.session.beginTransaction()));
        RxTransactionIT.assertCanRunCreate(tx);
        this.assertCanRollback(tx);
        ClientException e = (ClientException)Assertions.assertThrows(ClientException.class, () -> TestUtil.await(tx.commit()));
        MatcherAssert.assertThat((Object)e.getMessage(), (Matcher)org.hamcrest.Matchers.containsString((String)"transaction has been rolled back"));
    }

    @Test
    void shouldFailToRollbackWhenCommitted() {
        RxTransaction tx = (RxTransaction)TestUtil.await(Mono.from((Publisher)this.session.beginTransaction()));
        RxTransactionIT.assertCanRunCreate(tx);
        this.assertCanCommit(tx);
        ClientException e = (ClientException)Assertions.assertThrows(ClientException.class, () -> TestUtil.await(tx.rollback()));
        MatcherAssert.assertThat((Object)e.getMessage(), (Matcher)org.hamcrest.Matchers.containsString((String)"transaction has been committed"));
    }

    @Test
    void shouldAllowRollbackAfterFailedCommit() {
        Flux records = Flux.usingWhen((Publisher)this.session.beginTransaction(), tx -> Flux.from((Publisher)tx.run("WRONG").records()), RxTransaction::commit, (tx, error) -> tx.rollback(), null);
        StepVerifier.create((Publisher)records).verifyErrorSatisfies(error -> MatcherAssert.assertThat((Object)error.getMessage(), (Matcher)org.hamcrest.Matchers.containsString((String)"Invalid input")));
    }

    @Test
    void shouldExposeQueryKeysForColumnsWithAliases() {
        RxTransaction tx = (RxTransaction)TestUtil.await(Mono.from((Publisher)this.session.beginTransaction()));
        RxResult result = tx.run("RETURN 1 AS one, 2 AS two, 3 AS three, 4 AS five");
        List keys = (List)TestUtil.await(Mono.from((Publisher)result.keys()));
        Assertions.assertEquals(Arrays.asList("one", "two", "three", "five"), (Object)keys);
        this.assertCanRollback(tx);
    }

    @Test
    void shouldExposeQueryKeysForColumnsWithoutAliases() {
        RxTransaction tx = (RxTransaction)TestUtil.await(Mono.from((Publisher)this.session.beginTransaction()));
        RxResult result = tx.run("RETURN 1, 2, 3, 5");
        List keys = (List)TestUtil.await(Mono.from((Publisher)result.keys()));
        Assertions.assertEquals(Arrays.asList("1", "2", "3", "5"), (Object)keys);
        this.assertCanRollback(tx);
    }

    @Test
    void shouldExposeResultSummaryForSimpleQuery() {
        RxTransaction tx = (RxTransaction)TestUtil.await(Mono.from((Publisher)this.session.beginTransaction()));
        String query = "CREATE (p1:Person {name: $name1})-[:KNOWS]->(p2:Person {name: $name2}) RETURN p1, p2";
        Value params = Values.parameters((Object[])new Object[]{"name1", "Bob", "name2", "John"});
        RxResult result = tx.run(query, params);
        TestUtil.await(result.records());
        ResultSummary summary = (ResultSummary)TestUtil.await(Mono.from((Publisher)result.consume()));
        Assertions.assertEquals((Object)new Query(query, params), (Object)summary.query());
        Assertions.assertEquals((int)2, (int)summary.counters().nodesCreated());
        Assertions.assertEquals((int)2, (int)summary.counters().labelsAdded());
        Assertions.assertEquals((int)2, (int)summary.counters().propertiesSet());
        Assertions.assertEquals((int)1, (int)summary.counters().relationshipsCreated());
        Assertions.assertEquals((Object)QueryType.READ_WRITE, (Object)summary.queryType());
        Assertions.assertFalse((boolean)summary.hasPlan());
        Assertions.assertFalse((boolean)summary.hasProfile());
        Assertions.assertNull((Object)summary.plan());
        Assertions.assertNull((Object)summary.profile());
        Assertions.assertEquals((int)0, (int)summary.notifications().size());
        MatcherAssert.assertThat((Object)summary, Matchers.containsResultAvailableAfterAndResultConsumedAfter());
        this.assertCanRollback(tx);
    }

    @Test
    void shouldExposeResultSummaryForExplainQuery() {
        RxTransaction tx = (RxTransaction)TestUtil.await(Mono.from((Publisher)this.session.beginTransaction()));
        String query = "EXPLAIN MATCH (n) RETURN n";
        RxResult result = tx.run(query);
        TestUtil.await(result.records());
        ResultSummary summary = (ResultSummary)TestUtil.await(Mono.from((Publisher)result.consume()));
        Assertions.assertEquals((Object)new Query(query), (Object)summary.query());
        Assertions.assertEquals((int)0, (int)summary.counters().nodesCreated());
        Assertions.assertEquals((int)0, (int)summary.counters().propertiesSet());
        Assertions.assertEquals((Object)QueryType.READ_ONLY, (Object)summary.queryType());
        Assertions.assertTrue((boolean)summary.hasPlan());
        Assertions.assertFalse((boolean)summary.hasProfile());
        Assertions.assertNotNull((Object)summary.plan());
        MatcherAssert.assertThat((Object)summary.plan().toString().toLowerCase(), (Matcher)org.hamcrest.Matchers.containsString((String)"scan"));
        Assertions.assertNull((Object)summary.profile());
        Assertions.assertEquals((int)0, (int)summary.notifications().size());
        MatcherAssert.assertThat((Object)summary, Matchers.containsResultAvailableAfterAndResultConsumedAfter());
        this.assertCanRollback(tx);
    }

    @Test
    void shouldExposeResultSummaryForProfileQuery() {
        RxTransaction tx = (RxTransaction)TestUtil.await(Mono.from((Publisher)this.session.beginTransaction()));
        String query = "PROFILE MERGE (n {name: $name}) ON CREATE SET n.created = timestamp() ON MATCH SET n.counter = coalesce(n.counter, 0) + 1";
        Value params = Values.parameters((Object[])new Object[]{"name", "Bob"});
        RxResult result = tx.run(query, params);
        TestUtil.await(result.records());
        ResultSummary summary = (ResultSummary)TestUtil.await(Mono.from((Publisher)result.consume()));
        Assertions.assertEquals((Object)new Query(query, params), (Object)summary.query());
        Assertions.assertEquals((int)1, (int)summary.counters().nodesCreated());
        Assertions.assertEquals((int)2, (int)summary.counters().propertiesSet());
        Assertions.assertEquals((int)0, (int)summary.counters().relationshipsCreated());
        Assertions.assertEquals((Object)QueryType.WRITE_ONLY, (Object)summary.queryType());
        Assertions.assertTrue((boolean)summary.hasPlan());
        Assertions.assertTrue((boolean)summary.hasProfile());
        Assertions.assertNotNull((Object)summary.plan());
        Assertions.assertNotNull((Object)summary.profile());
        String profileAsString = summary.profile().toString().toLowerCase();
        MatcherAssert.assertThat((Object)profileAsString, (Matcher)org.hamcrest.Matchers.containsString((String)"hits"));
        Assertions.assertEquals((int)0, (int)summary.notifications().size());
        MatcherAssert.assertThat((Object)summary, Matchers.containsResultAvailableAfterAndResultConsumedAfter());
        this.assertCanRollback(tx);
    }

    @Test
    void shouldCancelRecordStream() {
        RxTransaction tx = (RxTransaction)TestUtil.await(Mono.from((Publisher)this.session.beginTransaction()));
        RxResult result = tx.run("UNWIND ['a', 'b', 'c'] AS x RETURN x");
        Flux abc = Flux.from((Publisher)result.records()).limitRate(1).take(1L).map(record -> record.get(0).asString());
        StepVerifier.create((Publisher)abc).expectNext((Object)"a").verifyComplete();
        this.assertCanRollback(tx);
    }

    @Test
    void shouldForEachWithEmptyCursor() {
        this.testForEach("MATCH (n:SomeReallyStrangeLabel) RETURN n", 0);
    }

    @Test
    void shouldForEachWithNonEmptyCursor() {
        this.testForEach("UNWIND range(1, 12555) AS x CREATE (n:Node {id: x}) RETURN n", 12555);
    }

    @Test
    void shouldFailForEachWhenActionFails() {
        RuntimeException e = new RuntimeException();
        Flux records = Flux.usingWhen((Publisher)this.session.beginTransaction(), tx -> Flux.from((Publisher)tx.run("RETURN 'Hi!'").records()).doOnNext(record -> {
            throw e;
        }), RxTransaction::commit, (tx, error) -> tx.rollback(), null);
        StepVerifier.create((Publisher)records).expectErrorSatisfies(error -> Assertions.assertEquals((Object)e, (Object)error)).verify();
    }

    @Test
    void shouldConvertToListWithEmptyCursor() {
        this.testList("CREATE (:Person)-[:KNOWS]->(:Person)", Collections.emptyList());
    }

    @Test
    void shouldConvertToListWithNonEmptyCursor() {
        this.testList("UNWIND [1, '1', 2, '2', 3, '3'] AS x RETURN x", Arrays.asList(1L, "1", 2L, "2", 3L, "3"));
    }

    @Test
    void shouldConvertToTransformedListWithEmptyCursor() {
        RxTransaction tx = (RxTransaction)TestUtil.await(Mono.from((Publisher)this.session.beginTransaction()));
        RxResult result = tx.run("CREATE ()");
        Object maps = TestUtil.await(Flux.from((Publisher)result.records()).map(record -> record.get(0).asMap()));
        Assertions.assertEquals((int)0, (int)maps.size());
        this.assertCanRollback(tx);
    }

    @Test
    void shouldConvertToTransformedListWithNonEmptyCursor() {
        RxTransaction tx = (RxTransaction)TestUtil.await(Mono.from((Publisher)this.session.beginTransaction()));
        RxResult result = tx.run("UNWIND ['a', 'b', 'c'] AS x RETURN x");
        Object strings = TestUtil.await(Flux.from((Publisher)result.records()).map(record -> record.get(0).asString() + "!"));
        Assertions.assertEquals(Arrays.asList("a!", "b!", "c!"), strings);
        this.assertCanRollback(tx);
    }

    @Test
    void shouldFailWhenListTransformationFunctionFails() {
        RuntimeException e = new RuntimeException();
        Flux records = Flux.usingWhen((Publisher)this.session.beginTransaction(), tx -> Flux.from((Publisher)tx.run("RETURN 'Hi!'").records()).map(record -> {
            throw e;
        }), RxTransaction::commit, (tx, error) -> tx.rollback(), null);
        StepVerifier.create((Publisher)records).expectErrorSatisfies(error -> Assertions.assertEquals((Object)e, (Object)error)).verify();
    }

    @Test
    void shouldFailToCommitWhenServerIsRestarted() {
        RxTransaction tx = (RxTransaction)TestUtil.await(Mono.from((Publisher)this.session.beginTransaction()));
        RxResult result = tx.run("RETURN 1");
        Assertions.assertThrows(ServiceUnavailableException.class, () -> {
            TestUtil.await(Flux.from((Publisher)result.records()).doOnSubscribe(subscription -> neo4j.stopDb()));
            TestUtil.await(tx.commit());
        });
        this.assertCanRollback(tx);
    }

    @Test
    void shouldFailSingleWithEmptyCursor() {
        RxTransaction tx = (RxTransaction)TestUtil.await(Mono.from((Publisher)this.session.beginTransaction()));
        RxResult result = tx.run("MATCH (n:NoSuchLabel) RETURN n");
        NoSuchElementException e = (NoSuchElementException)Assertions.assertThrows(NoSuchElementException.class, () -> TestUtil.await(Flux.from((Publisher)result.records()).single()));
        MatcherAssert.assertThat((Object)e.getMessage(), (Matcher)org.hamcrest.Matchers.containsString((String)"Source was empty"));
        this.assertCanRollback(tx);
    }

    @Test
    void shouldFailSingleWithMultiRecordCursor() {
        RxTransaction tx = (RxTransaction)TestUtil.await(Mono.from((Publisher)this.session.beginTransaction()));
        RxResult result = tx.run("UNWIND ['a', 'b'] AS x RETURN x");
        IndexOutOfBoundsException e = (IndexOutOfBoundsException)Assertions.assertThrows(IndexOutOfBoundsException.class, () -> TestUtil.await(Flux.from((Publisher)result.records()).single()));
        MatcherAssert.assertThat((Object)e.getMessage(), (Matcher)org.hamcrest.Matchers.startsWith((String)"Source emitted more than one item"));
        this.assertCanRollback(tx);
    }

    @Test
    void shouldReturnSingleWithSingleRecordCursor() {
        RxTransaction tx = (RxTransaction)TestUtil.await(Mono.from((Publisher)this.session.beginTransaction()));
        RxResult result = tx.run("RETURN 'Hello!'");
        Record record = (Record)TestUtil.await(Flux.from((Publisher)result.records()).single());
        Assertions.assertEquals((Object)"Hello!", (Object)record.get(0).asString());
        this.assertCanRollback(tx);
    }

    @Test
    void shouldPropagateFailureFromFirstRecordInSingleAsync() {
        RxTransaction tx = (RxTransaction)TestUtil.await(Mono.from((Publisher)this.session.beginTransaction()));
        RxResult result = tx.run("UNWIND [0] AS x RETURN 10 / x");
        ClientException e = (ClientException)Assertions.assertThrows(ClientException.class, () -> TestUtil.await(Flux.from((Publisher)result.records()).single()));
        MatcherAssert.assertThat((Object)e.getMessage(), (Matcher)org.hamcrest.Matchers.containsString((String)"/ by zero"));
        this.assertCanRollback(tx);
    }

    @Test
    void shouldPropagateFailureFromSecondRecordInSingleAsync() {
        RxTransaction tx = (RxTransaction)TestUtil.await(Mono.from((Publisher)this.session.beginTransaction()));
        RxResult result = tx.run("UNWIND [1, 0] AS x RETURN 10 / x");
        ClientException e = (ClientException)Assertions.assertThrows(ClientException.class, () -> TestUtil.await(Flux.from((Publisher)result.records()).single()));
        MatcherAssert.assertThat((Object)e.getMessage(), (Matcher)org.hamcrest.Matchers.containsString((String)"/ by zero"));
        this.assertCanRollback(tx);
    }

    @Test
    void shouldConsumeEmptyCursor() {
        this.testConsume("MATCH (n:NoSuchLabel) RETURN n");
    }

    @Test
    void shouldConsumeNonEmptyCursor() {
        this.testConsume("RETURN 42");
    }

    @ParameterizedTest
    @MethodSource(value={"commit"})
    void shouldFailToRunQueryAfterCommit(boolean commit) {
        RxTransaction tx = (RxTransaction)TestUtil.await(Mono.from((Publisher)this.session.beginTransaction()));
        RxResult result = tx.run("CREATE (:MyLabel)");
        TestUtil.await(result.records());
        this.assertCanCommitOrRollback(commit, tx);
        Record record = (Record)TestUtil.await(Flux.from((Publisher)this.session.run("MATCH (n:MyLabel) RETURN count(n)").records()).single());
        if (commit) {
            Assertions.assertEquals((int)1, (int)record.get(0).asInt());
        } else {
            Assertions.assertEquals((int)0, (int)record.get(0).asInt());
        }
        ClientException e = (ClientException)Assertions.assertThrows(ClientException.class, () -> TestUtil.await(tx.run("CREATE (:MyOtherLabel)").records()));
        MatcherAssert.assertThat((Object)e.getMessage(), (Matcher)org.hamcrest.Matchers.containsString((String)"Cannot run more queries in this transaction, it has been "));
    }

    @Test
    void shouldFailToRunQueryWhenTerminated() {
        RxTransaction tx = (RxTransaction)TestUtil.await(Mono.from((Publisher)this.session.beginTransaction()));
        RxTransactionIT.assertFailToRunWrongQuery(tx);
        ClientException e = (ClientException)Assertions.assertThrows(ClientException.class, () -> TestUtil.await(tx.run("CREATE (:MyOtherLabel)").records()));
        MatcherAssert.assertThat((Object)e.getMessage(), (Matcher)org.hamcrest.Matchers.startsWith((String)"Cannot run more queries in this transaction"));
        this.assertCanRollback(tx);
    }

    @Test
    void shouldUpdateSessionBookmarkAfterCommit() {
        Bookmark bookmarkBefore = this.session.lastBookmark();
        TestUtil.await(Flux.usingWhen((Publisher)this.session.beginTransaction(), tx -> tx.run("CREATE (:MyNode)").records(), RxTransaction::commit, (tx, error) -> tx.rollback(), null));
        Bookmark bookmarkAfter = this.session.lastBookmark();
        Assertions.assertNotNull((Object)bookmarkAfter);
        Assertions.assertNotEquals((Object)bookmarkBefore, (Object)bookmarkAfter);
    }

    @Test
    void shouldFailToCommitWhenQueriesFailAndErrorNotConsumed() throws InterruptedException {
        RxTransaction tx = (RxTransaction)TestUtil.await(Mono.from((Publisher)this.session.beginTransaction()));
        RxResult result1 = tx.run("CREATE (:TestNode)");
        RxResult result2 = tx.run("CREATE (:TestNode)");
        RxResult result3 = tx.run("RETURN 10 / 0");
        RxResult result4 = tx.run("CREATE (:TestNode)");
        Flux records = Flux.from((Publisher)result1.records()).concatWith(result2.records()).concatWith(result3.records()).concatWith(result4.records());
        ClientException e = (ClientException)Assertions.assertThrows(ClientException.class, () -> TestUtil.await(records));
        Assertions.assertEquals((Object)"/ by zero", (Object)e.getMessage());
        this.assertCanRollback(tx);
    }

    @Test
    void shouldNotRunUntilPublisherIsConnected() throws Throwable {
        RxTransaction tx = (RxTransaction)TestUtil.await(Mono.from((Publisher)this.session.beginTransaction()));
        RxResult result1 = tx.run("RETURN 1");
        RxResult result2 = tx.run("RETURN 2");
        RxResult result3 = tx.run("RETURN 3");
        RxResult result4 = tx.run("RETURN 4");
        Flux records = Flux.from((Publisher)result4.records()).concatWith(result3.records()).concatWith(result2.records()).concatWith(result1.records());
        StepVerifier.create((Publisher)records.map(record -> record.get(0).asInt())).expectNext((Object)4).expectNext((Object)3).expectNext((Object)2).expectNext((Object)1).verifyComplete();
        this.assertCanRollback(tx);
    }

    @ParameterizedTest
    @MethodSource(value={"commit"})
    void shouldNotPropagateRunFailureIfNotExecuted(boolean commit) {
        RxTransaction tx = (RxTransaction)TestUtil.await(Mono.from((Publisher)this.session.beginTransaction()));
        tx.run("RETURN ILLEGAL");
        this.assertCanCommitOrRollback(commit, tx);
    }

    @Test
    void shouldPropagateRunFailureOnRecord() {
        RxTransaction tx = (RxTransaction)TestUtil.await(Mono.from((Publisher)this.session.beginTransaction()));
        RxResult result = tx.run("RETURN 42 / 0");
        TestUtil.await(result.keys());
        ClientException e = (ClientException)Assertions.assertThrows(ClientException.class, () -> TestUtil.await(result.records()));
        MatcherAssert.assertThat((Object)e.getMessage(), (Matcher)org.hamcrest.Matchers.containsString((String)"/ by zero"));
        this.assertCanRollback(tx);
    }

    @Test
    void shouldFailToCommitWhenPullAllFailureIsConsumed() {
        RxTransaction tx = (RxTransaction)TestUtil.await(Mono.from((Publisher)this.session.beginTransaction()));
        RxResult result = tx.run("FOREACH (value IN [1,2, 'aaa'] | CREATE (:Person {name: 10 / value}))");
        ClientException e1 = (ClientException)Assertions.assertThrows(ClientException.class, () -> TestUtil.await(result.records()));
        MatcherAssert.assertThat((Object)e1.code(), (Matcher)org.hamcrest.Matchers.containsString((String)"TypeError"));
        ClientException e2 = (ClientException)Assertions.assertThrows(ClientException.class, () -> TestUtil.await(tx.commit()));
        MatcherAssert.assertThat((Object)e2.getMessage(), (Matcher)org.hamcrest.Matchers.startsWith((String)"Transaction can't be committed"));
    }

    @Test
    void shouldBeAbleToRollbackWhenPullAllFailureIsConsumed() {
        RxTransaction tx = (RxTransaction)TestUtil.await(Mono.from((Publisher)this.session.beginTransaction()));
        RxResult result = tx.run("FOREACH (value IN [1,2, 'aaa'] | CREATE (:Person {name: 10 / value}))");
        ClientException e1 = (ClientException)Assertions.assertThrows(ClientException.class, () -> TestUtil.await(result.records()));
        MatcherAssert.assertThat((Object)e1.code(), (Matcher)org.hamcrest.Matchers.containsString((String)"TypeError"));
        this.assertCanRollback(tx);
    }

    @Test
    void shouldNotPropagateRunFailureFromSummary() {
        RxTransaction tx = (RxTransaction)TestUtil.await(Mono.from((Publisher)this.session.beginTransaction()));
        RxResult result = tx.run("RETURN Wrong");
        ClientException e = (ClientException)Assertions.assertThrows(ClientException.class, () -> TestUtil.await(result.records()));
        MatcherAssert.assertThat((Object)e.code(), (Matcher)org.hamcrest.Matchers.containsString((String)"SyntaxError"));
        TestUtil.await(result.consume());
        this.assertCanRollback(tx);
    }

    private int countNodes(Object id) {
        RxResult result = this.session.run("MATCH (n:Node {id: $id}) RETURN count(n)", Values.parameters((Object[])new Object[]{"id", id}));
        return (Integer)TestUtil.await(Flux.from((Publisher)result.records()).single().map(record -> record.get(0).asInt()));
    }

    private void testForEach(String query, int expectedSeenRecords) {
        Flux summary = Flux.usingWhen((Publisher)this.session.beginTransaction(), tx -> {
            RxResult result = tx.run(query);
            AtomicInteger recordsSeen = new AtomicInteger();
            return Flux.from((Publisher)result.records()).doOnNext(record -> recordsSeen.incrementAndGet()).then(Mono.from((Publisher)result.consume())).doOnSuccess(s -> {
                Assertions.assertNotNull((Object)s);
                Assertions.assertEquals((Object)query, (Object)s.query().text());
                Assertions.assertEquals(Collections.emptyMap(), (Object)s.query().parameters().asMap());
                Assertions.assertEquals((int)expectedSeenRecords, (int)recordsSeen.get());
            });
        }, RxTransaction::commit, (tx, error) -> tx.rollback(), null);
        StepVerifier.create((Publisher)summary).expectNextCount(1L).verifyComplete();
    }

    private <T> void testList(String query, List<T> expectedList) {
        ArrayList actualList = new ArrayList();
        Flux records = Flux.usingWhen((Publisher)this.session.beginTransaction(), tx -> Flux.from((Publisher)tx.run(query).records()).collectList(), RxTransaction::commit, (tx, error) -> tx.rollback(), null);
        StepVerifier.create((Publisher)records.single()).consumeNextWith(allRecords -> {
            for (Record record : allRecords) {
                actualList.add(record.get(0).asObject());
            }
        }).verifyComplete();
        Assertions.assertEquals(expectedList, actualList);
    }

    private void testConsume(String query) {
        Flux summary = Flux.usingWhen((Publisher)this.session.beginTransaction(), tx -> tx.run(query).consume(), RxTransaction::commit, (tx, error) -> tx.rollback(), null);
        StepVerifier.create((Publisher)summary.single()).consumeNextWith(Assertions::assertNotNull).verifyComplete();
    }

    private void verifyCommittedOrRolledBack(boolean commit) {
        if (commit) {
            Assertions.assertEquals((int)2, (int)this.countNodes(1));
            Assertions.assertEquals((int)1, (int)this.countNodes(2));
        } else {
            Assertions.assertEquals((int)0, (int)this.countNodes(1));
            Assertions.assertEquals((int)0, (int)this.countNodes(2));
        }
    }

    private void assertCanCommitOrRollback(boolean commit, RxTransaction tx) {
        if (commit) {
            this.assertCanCommit(tx);
        } else {
            this.assertCanRollback(tx);
        }
    }

    private void assertCanCommit(RxTransaction tx) {
        MatcherAssert.assertThat(TestUtil.await(tx.commit()), (Matcher)org.hamcrest.Matchers.equalTo(Collections.emptyList()));
    }

    private void assertCanRollback(RxTransaction tx) {
        MatcherAssert.assertThat(TestUtil.await(tx.rollback()), (Matcher)org.hamcrest.Matchers.equalTo(Collections.emptyList()));
    }

    private static Stream<Boolean> commit() {
        return Stream.of(true, false);
    }

    private static void assertCanRunCreate(RxTransaction tx) {
        RxResult result = tx.run("CREATE (n:Node {id: 4242}) RETURN n");
        Record record = (Record)TestUtil.await(Flux.from((Publisher)result.records()).single());
        Node node = record.get(0).asNode();
        Assertions.assertEquals((Object)"Node", (Object)Iterables.single((Iterable)node.labels()));
        Assertions.assertEquals((int)4242, (int)node.get("id").asInt());
    }

    private static void assertFailToRunWrongQuery(RxTransaction tx) {
        RxResult result = tx.run("RETURN");
        Exception e = (Exception)Assertions.assertThrows(Exception.class, () -> TestUtil.await(result.records()));
        MatcherAssert.assertThat((Object)e, (Matcher)org.hamcrest.Matchers.is(Matchers.syntaxError()));
    }

    private void assertCanRunReturnOne(RxTransaction tx) {
        RxResult result = tx.run("RETURN 42");
        Object records = TestUtil.await(result.records());
        MatcherAssert.assertThat((Object)records.size(), (Matcher)org.hamcrest.Matchers.equalTo((Object)1));
        Record record = (Record)records.get(0);
        Assertions.assertEquals((int)42, (int)record.get(0).asInt());
    }
}

