/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.driver.integration.reactive;

import java.util.Collections;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.hamcrest.junit.MatcherAssert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.neo4j.driver.Values;
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.internal.util.EnabledOnNeo4jWith;
import org.neo4j.driver.internal.util.Neo4jFeature;
import org.neo4j.driver.reactive.RxResult;
import org.neo4j.driver.reactive.RxSession;
import org.neo4j.driver.summary.QueryType;
import org.neo4j.driver.util.DatabaseExtension;
import org.neo4j.driver.util.ParallelizableIT;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

@EnabledOnNeo4jWith(value=Neo4jFeature.BOLT_V4)
@ParallelizableIT
class RxResultIT {
    @RegisterExtension
    static final DatabaseExtension neo4j = new DatabaseExtension();

    RxResultIT() {
    }

    @Test
    void shouldAllowIteratingOverResultStream() {
        RxResult res = this.sessionRunUnwind();
        this.verifyCanAccessFullRecords(res);
    }

    @Test
    void shouldAllowIteratingOverLargeResultStream() {
        int size = 100000;
        RxSession session = neo4j.driver().rxSession();
        RxResult res = session.run("UNWIND range(1, $size) AS x RETURN x", Values.parameters((Object[])new Object[]{"size", size}));
        StepVerifier.FirstStep step = StepVerifier.create((Publisher)Flux.from((Publisher)res.records()).limitRate(100).map(r -> r.get("x").asInt()));
        for (int i = 1; i <= size; ++i) {
            step.expectNext((Object)i);
        }
        step.expectComplete().verify();
    }

    @Test
    void shouldReturnKeysRecordsAndSummaryInOrder() {
        RxResult res = this.sessionRunUnwind();
        this.verifyCanAccessKeys(res);
        this.verifyCanAccessFullRecords(res);
        this.verifyCanAccessSummary(res);
    }

    @Test
    void shouldSecondVisitOfRecordReceiveEmptyRecordStream() throws Throwable {
        RxResult res = this.sessionRunUnwind();
        this.verifyCanAccessFullRecords(res);
        this.verifyRecordsAlreadyDiscarded(res);
    }

    @Test
    void shouldReturnKeysSummaryAndDiscardRecords() {
        RxResult res = this.sessionRunUnwind();
        this.verifyCanAccessKeys(res);
        this.verifyCanAccessSummary(res);
        this.verifyRecordsAlreadyDiscarded(res);
    }

    @Test
    void shouldAllowOnlySummary() {
        RxResult res = this.sessionRunUnwind();
        this.verifyCanAccessSummary(res);
    }

    @Test
    void shouldAllowAccessKeysAndSummaryAfterRecord() throws Throwable {
        RxResult res = this.sessionRunUnwind();
        this.verifyCanAccessFullRecords(res);
        this.verifyCanAccessKeys(res);
        this.verifyCanAccessSummary(res);
        this.verifyCanAccessKeys(res);
        this.verifyCanAccessSummary(res);
    }

    @Test
    void shouldGiveHelpfulFailureMessageWhenAccessNonExistingField() {
        RxSession session = neo4j.driver().rxSession();
        RxResult rs = session.run("CREATE (n:Person {name:$name}) RETURN n", Values.parameters((Object[])new Object[]{"name", "Tom Hanks"}));
        StepVerifier.create((Publisher)Flux.from((Publisher)rs.records()).single()).assertNext(record -> Assertions.assertTrue((boolean)record.get("m").isNull())).expectComplete().verify();
    }

    @Test
    void shouldGiveHelpfulFailureMessageWhenAccessNonExistingPropertyOnNode() {
        RxSession session = neo4j.driver().rxSession();
        RxResult rs = session.run("CREATE (n:Person {name:$name}) RETURN n", Values.parameters((Object[])new Object[]{"name", "Tom Hanks"}));
        StepVerifier.create((Publisher)Flux.from((Publisher)rs.records()).single()).assertNext(record -> Assertions.assertTrue((boolean)record.get("n").get("age").isNull())).expectComplete().verify();
    }

    @Test
    void shouldHaveFieldNamesInResult() {
        RxSession session = neo4j.driver().rxSession();
        RxResult res = session.run("CREATE (n:TestNode {name:'test'}) RETURN n");
        StepVerifier.create((Publisher)res.keys()).expectNext(Collections.singletonList("n")).expectComplete().verify();
        StepVerifier.create((Publisher)res.records()).assertNext(record -> Assertions.assertEquals((Object)"[n]", (Object)record.keys().toString())).expectComplete().verify();
    }

    @Test
    void shouldReturnEmptyKeyAndRecordOnEmptyResult() {
        RxSession session = neo4j.driver().rxSession();
        RxResult rs = session.run("CREATE (n:Person {name:$name})", Values.parameters((Object[])new Object[]{"name", "Tom Hanks"}));
        StepVerifier.create((Publisher)rs.keys()).expectNext(Collections.emptyList()).expectComplete().verify();
        StepVerifier.create((Publisher)rs.records()).expectComplete().verify();
    }

    @Test
    void shouldOnlyErrorRecordAfterFailure() {
        RxSession session = neo4j.driver().rxSession();
        RxResult result = session.run("INVALID");
        Flux keys = Flux.from((Publisher)result.keys());
        Flux records = Flux.from((Publisher)result.records());
        Mono summaryMono = Mono.from((Publisher)result.consume());
        StepVerifier.create((Publisher)keys).expectNext(Collections.emptyList()).verifyComplete();
        StepVerifier.create((Publisher)records).expectErrorSatisfies(error -> {
            MatcherAssert.assertThat((Object)error, (Matcher)CoreMatchers.instanceOf(ClientException.class));
            MatcherAssert.assertThat((Object)error.getMessage(), (Matcher)CoreMatchers.containsString((String)"Invalid input"));
        }).verify();
        StepVerifier.create((Publisher)summaryMono).assertNext(summary -> {
            MatcherAssert.assertThat((Object)summary.query().text(), (Matcher)Matchers.equalTo((Object)"INVALID"));
            Assertions.assertNotNull((Object)summary.server().address());
            Assertions.assertNotNull((Object)summary.server().version());
        }).verifyComplete();
    }

    @Test
    void shouldErrorOnSummaryIfNoRecord() throws Throwable {
        RxSession session = neo4j.driver().rxSession();
        RxResult result = session.run("INVALID");
        Flux keys = Flux.from((Publisher)result.keys());
        Mono summaryMono = Mono.from((Publisher)result.consume());
        StepVerifier.create((Publisher)keys).expectNext(Collections.emptyList()).verifyComplete();
        StepVerifier.create((Publisher)summaryMono).expectErrorSatisfies(error -> {
            MatcherAssert.assertThat((Object)error, (Matcher)CoreMatchers.instanceOf(ClientException.class));
            MatcherAssert.assertThat((Object)error.getMessage(), (Matcher)CoreMatchers.containsString((String)"Invalid input"));
        }).verify();
        StepVerifier.create((Publisher)summaryMono).expectErrorSatisfies(error -> {
            MatcherAssert.assertThat((Object)error, (Matcher)CoreMatchers.instanceOf(ClientException.class));
            MatcherAssert.assertThat((Object)error.getMessage(), (Matcher)CoreMatchers.containsString((String)"Invalid input"));
        }).verify();
    }

    @Test
    void shouldDiscardRecords() {
        RxSession session = neo4j.driver().rxSession();
        RxResult result = session.run("UNWIND [1,2] AS a RETURN a");
        StepVerifier.create((Publisher)Flux.from((Publisher)result.records()).limitRate(1).take(1L)).assertNext(record -> MatcherAssert.assertThat((Object)record.get("a").asInt(), (Matcher)Matchers.equalTo((Object)1))).thenCancel().verify();
        StepVerifier.create((Publisher)result.consume()).assertNext(summary -> {
            MatcherAssert.assertThat((Object)summary, (Matcher)Matchers.notNullValue());
            MatcherAssert.assertThat((Object)summary.queryType(), (Matcher)Matchers.equalTo((Object)QueryType.READ_ONLY));
        }).expectComplete().verify();
    }

    @Test
    void shouldStreamCorrectRecordsBackBeforeError() {
        RxSession session = neo4j.driver().rxSession();
        RxResult result = session.run("CYPHER runtime=interpreted UNWIND range(5, 0, -1) AS x RETURN x / x");
        StepVerifier.create((Publisher)Flux.from((Publisher)result.records()).map(record -> record.get(0).asInt())).expectNext((Object)1).expectNext((Object)1).expectNext((Object)1).expectNext((Object)1).expectNext((Object)1).expectErrorSatisfies(error -> MatcherAssert.assertThat((Object)error.getMessage(), (Matcher)CoreMatchers.containsString((String)"/ by zero"))).verify();
    }

    @Test
    void shouldErrorToAccessRecordAfterSessionClose() {
        RxSession session = neo4j.driver().rxSession();
        RxResult result = session.run("UNWIND [1,2] AS a RETURN a");
        StepVerifier.create((Publisher)Flux.from((Publisher)session.close()).thenMany(result.records())).expectErrorSatisfies(error -> MatcherAssert.assertThat((Object)error.getMessage(), (Matcher)CoreMatchers.containsString((String)"session is already closed"))).verify();
    }

    @Test
    void shouldErrorToAccessKeysAfterSessionClose() {
        RxSession session = neo4j.driver().rxSession();
        RxResult result = session.run("UNWIND [1,2] AS a RETURN a");
        StepVerifier.create((Publisher)Flux.from((Publisher)session.close()).thenMany(result.keys())).expectErrorSatisfies(error -> MatcherAssert.assertThat((Object)error.getMessage(), (Matcher)CoreMatchers.containsString((String)"session is already closed"))).verify();
    }

    @Test
    void shouldErrorToAccessSummaryAfterSessionClose() {
        RxSession session = neo4j.driver().rxSession();
        RxResult result = session.run("UNWIND [1,2] AS a RETURN a");
        StepVerifier.create((Publisher)Flux.from((Publisher)session.close()).thenMany(result.consume())).expectErrorSatisfies(error -> MatcherAssert.assertThat((Object)error.getMessage(), (Matcher)CoreMatchers.containsString((String)"session is already closed"))).verify();
    }

    @Test
    void shouldErrorToAccessRecordAfterTxClose() {
        RxSession session = neo4j.driver().rxSession();
        RxResult result = session.run("UNWIND [1,2] AS a RETURN a");
        StepVerifier.create((Publisher)Flux.from((Publisher)session.beginTransaction()).single().flatMap(tx -> Flux.from((Publisher)tx.rollback()).singleOrEmpty().thenReturn(tx)).flatMapMany(tx -> tx.run("UNWIND [1,2] AS a RETURN a").records())).expectErrorSatisfies(error -> MatcherAssert.assertThat((Object)error.getMessage(), (Matcher)CoreMatchers.containsString((String)"Cannot run more queries"))).verify();
    }

    @Test
    void shouldErrorToAccessKeysAfterTxClose() {
        RxSession session = neo4j.driver().rxSession();
        RxResult result = session.run("UNWIND [1,2] AS a RETURN a");
        StepVerifier.create((Publisher)Flux.from((Publisher)session.beginTransaction()).single().flatMap(tx -> Flux.from((Publisher)tx.rollback()).singleOrEmpty().thenReturn(tx)).flatMapMany(tx -> tx.run("UNWIND [1,2] AS a RETURN a").keys())).expectErrorSatisfies(error -> MatcherAssert.assertThat((Object)error.getMessage(), (Matcher)CoreMatchers.containsString((String)"Cannot run more queries"))).verify();
    }

    @Test
    void shouldErrorToAccessSummaryAfterTxClose() {
        RxSession session = neo4j.driver().rxSession();
        RxResult result = session.run("UNWIND [1,2] AS a RETURN a");
        StepVerifier.create((Publisher)Flux.from((Publisher)session.beginTransaction()).single().flatMap(tx -> Flux.from((Publisher)tx.rollback()).singleOrEmpty().thenReturn(tx)).flatMapMany(tx -> tx.run("UNWIND [1,2] AS a RETURN a").consume())).expectErrorSatisfies(error -> MatcherAssert.assertThat((Object)error.getMessage(), (Matcher)CoreMatchers.containsString((String)"Cannot run more queries"))).verify();
    }

    @Test
    void throwErrorAfterKeys() {
        RxSession session = neo4j.driver().rxSession();
        RxResult result = session.run("UNWIND [1,2] AS a RETURN a");
        StepVerifier.create((Publisher)Flux.from((Publisher)session.beginTransaction()).single().flatMap(tx -> Flux.from((Publisher)tx.rollback()).singleOrEmpty().thenReturn(tx)).flatMapMany(tx -> tx.run("UNWIND [1,2] AS a RETURN a").consume())).expectErrorSatisfies(error -> MatcherAssert.assertThat((Object)error.getMessage(), (Matcher)CoreMatchers.containsString((String)"Cannot run more queries"))).verify();
    }

    @Test
    void throwTheSameErrorWhenCallingConsumeMultipleTimes() {
        RxSession session = neo4j.driver().rxSession();
        RxResult result = session.run("Invalid");
        StepVerifier.create((Publisher)Flux.from((Publisher)result.consume())).expectErrorSatisfies(error -> MatcherAssert.assertThat((Object)error.getMessage(), (Matcher)CoreMatchers.containsString((String)"Invalid"))).verify();
        StepVerifier.create((Publisher)Flux.from((Publisher)result.consume())).expectErrorSatisfies(error -> MatcherAssert.assertThat((Object)error.getMessage(), (Matcher)CoreMatchers.containsString((String)"Invalid"))).verify();
    }

    @Test
    void keysShouldNotReportRunError() {
        RxSession session = neo4j.driver().rxSession();
        RxResult result = session.run("Invalid");
        StepVerifier.create((Publisher)Flux.from((Publisher)result.keys())).expectNext((Object)Collections.EMPTY_LIST).verifyComplete();
        StepVerifier.create((Publisher)Flux.from((Publisher)result.keys())).expectNext((Object)Collections.EMPTY_LIST).verifyComplete();
    }

    @Test
    void throwResultConsumedErrorWhenCallingRecordsMultipleTimes() {
        RxSession session = neo4j.driver().rxSession();
        RxResult result = session.run("Invalid");
        StepVerifier.create((Publisher)Flux.from((Publisher)result.records())).expectErrorSatisfies(error -> MatcherAssert.assertThat((Object)error.getMessage(), (Matcher)CoreMatchers.containsString((String)"Invalid"))).verify();
        this.verifyRecordsAlreadyDiscarded(result);
        this.verifyRecordsAlreadyDiscarded(result);
    }

    private void verifyCanAccessSummary(RxResult res) {
        StepVerifier.create((Publisher)res.consume()).assertNext(summary -> {
            MatcherAssert.assertThat((Object)summary.query().text(), (Matcher)Matchers.equalTo((Object)"UNWIND [1,2,3,4] AS a RETURN a"));
            MatcherAssert.assertThat((Object)summary.counters().nodesCreated(), (Matcher)Matchers.equalTo((Object)0));
            MatcherAssert.assertThat((Object)summary.queryType(), (Matcher)Matchers.equalTo((Object)QueryType.READ_ONLY));
        }).verifyComplete();
    }

    private void verifyRecordsAlreadyDiscarded(RxResult res) {
        StepVerifier.create((Publisher)Flux.from((Publisher)res.records())).expectErrorSatisfies(error -> MatcherAssert.assertThat((Object)error.getMessage(), (Matcher)CoreMatchers.containsString((String)"has already been consumed"))).verify();
    }

    private void verifyCanAccessFullRecords(RxResult res) {
        StepVerifier.create((Publisher)Flux.from((Publisher)res.records()).map(r -> r.get("a").asInt())).expectNext((Object)1).expectNext((Object)2).expectNext((Object)3).expectNext((Object)4).expectComplete().verify();
    }

    private void verifyCanAccessKeys(RxResult res) {
        StepVerifier.create((Publisher)res.keys()).expectNext(Collections.singletonList("a")).verifyComplete();
    }

    private RxResult sessionRunUnwind() {
        RxSession session = neo4j.driver().rxSession();
        return session.run("UNWIND [1,2,3,4] AS a RETURN a");
    }
}

