/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.queryhandling;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import org.axonframework.messaging.Message;
import org.axonframework.queryhandling.GenericStreamingQueryMessage;
import org.axonframework.queryhandling.NoHandlerForQueryException;
import org.axonframework.queryhandling.QueryBus;
import org.axonframework.queryhandling.QueryExecutionException;
import org.axonframework.queryhandling.QueryHandler;
import org.axonframework.queryhandling.QueryResponseMessage;
import org.axonframework.queryhandling.SimpleQueryBus;
import org.axonframework.queryhandling.StreamingQueryMessage;
import org.axonframework.queryhandling.annotation.AnnotationQueryHandlerAdapter;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

class StreamingQueryTest {
    private final SimpleQueryBus queryBus = SimpleQueryBus.builder().build();
    private final MyQueryHandler myQueryHandler = new MyQueryHandler();
    private final AnnotationQueryHandlerAdapter<MyQueryHandler> annotationQueryHandlerAdapter = new AnnotationQueryHandlerAdapter((Object)this.myQueryHandler);
    private final ErrorQueryHandler errorQueryHandler = new ErrorQueryHandler();
    private final AnnotationQueryHandlerAdapter<ErrorQueryHandler> errorQueryHandlerAdapter = new AnnotationQueryHandlerAdapter((Object)this.errorQueryHandler);
    private static final ConcurrentLinkedQueue<String> handlersInvoked = new ConcurrentLinkedQueue();

    StreamingQueryTest() {
    }

    @BeforeEach
    void setUp() {
        this.annotationQueryHandlerAdapter.subscribe((QueryBus)this.queryBus);
    }

    @AfterEach
    void reset() {
        this.myQueryHandler.errorThrown.set(false);
    }

    private <Q, R> Flux<R> streamingQueryPayloads(StreamingQueryMessage<Q, R> queryMessage) {
        return this.streamingQuery(queryMessage).map(Message::getPayload);
    }

    private <Q, R> Flux<QueryResponseMessage<R>> streamingQuery(StreamingQueryMessage<Q, R> queryMessage) {
        return Flux.from((Publisher)this.queryBus.streamingQuery(queryMessage));
    }

    @Test
    void streamingFluxResults() {
        GenericStreamingQueryMessage queryMessage = new GenericStreamingQueryMessage((Object)"criteria", "fluxQuery", String.class);
        StepVerifier.create(this.streamingQueryPayloads((StreamingQueryMessage)queryMessage)).expectNext((Object)"a", (Object)"b", (Object)"c", (Object)"d").verifyComplete();
    }

    @Test
    void switchHandlerOnError() {
        handlersInvoked.removeIf(n -> true);
        this.errorQueryHandlerAdapter.subscribe((QueryBus)this.queryBus);
        GenericStreamingQueryMessage queryMessage = new GenericStreamingQueryMessage((Object)"criteria", "listQuery", String.class);
        StepVerifier.create(this.streamingQueryPayloads((StreamingQueryMessage)queryMessage)).expectNext((Object)"a", (Object)"b", (Object)"c", (Object)"d").verifyComplete();
        ArrayList<String> handlers_invoked = new ArrayList<String>(handlersInvoked);
        Assertions.assertEquals(Arrays.asList("handler_error", "handler_healthy"), handlers_invoked);
    }

    @Test
    void optionalResults() {
        GenericStreamingQueryMessage queryMessage = new GenericStreamingQueryMessage((Object)"criteria", "optionalResultQuery", String.class);
        StepVerifier.create(this.streamingQueryPayloads((StreamingQueryMessage)queryMessage)).expectNext((Object)"optional").verifyComplete();
    }

    @Test
    void emptyOptionalResults() {
        GenericStreamingQueryMessage queryMessage = new GenericStreamingQueryMessage((Object)"criteria", "emptyOptionalResultQuery", String.class);
        StepVerifier.create(this.streamingQuery((StreamingQueryMessage)queryMessage)).expectComplete().verify();
    }

    @Test
    void streamingListResults() {
        GenericStreamingQueryMessage queryMessage = new GenericStreamingQueryMessage((Object)"criteria", "listQuery", String.class);
        StepVerifier.create(this.streamingQueryPayloads((StreamingQueryMessage)queryMessage)).expectNext((Object)"a", (Object)"b", (Object)"c", (Object)"d").verifyComplete();
    }

    @Test
    void streamingStreamResults() {
        GenericStreamingQueryMessage queryMessage = new GenericStreamingQueryMessage((Object)"criteria", "streamQuery", String.class);
        StepVerifier.create(this.streamingQueryPayloads((StreamingQueryMessage)queryMessage)).expectNext((Object)"a", (Object)"b", (Object)"c", (Object)"d").verifyComplete();
    }

    @Test
    void streamingSingleResult() {
        GenericStreamingQueryMessage queryMessage = new GenericStreamingQueryMessage((Object)"criteria", "singleResultQuery", String.class);
        StepVerifier.create(this.streamingQueryPayloads((StreamingQueryMessage)queryMessage)).expectNext((Object)"lonely").verifyComplete();
    }

    @Test
    void streamingCompletableFutureResult() {
        GenericStreamingQueryMessage queryMessage = new GenericStreamingQueryMessage((Object)"criteria", "completableFutureQuery", String.class);
        StepVerifier.create(this.streamingQueryPayloads((StreamingQueryMessage)queryMessage)).expectNext((Object)"future").verifyComplete();
    }

    @Test
    void streamingFluxAfterHandlerCompletes() {
        GenericStreamingQueryMessage queryMessage = new GenericStreamingQueryMessage((Object)"criteria", "streamingAfterHandlerCompletesQuery", Long.class);
        StepVerifier.create(this.streamingQueryPayloads((StreamingQueryMessage)queryMessage)).expectNext((Object)0L, (Object)1L, (Object)2L, (Object)3L, (Object)4L).verifyComplete();
    }

    @Test
    void streamingMonoResult() {
        GenericStreamingQueryMessage queryMessage = new GenericStreamingQueryMessage((Object)"criteria", "monoQuery", String.class);
        StepVerifier.create(this.streamingQueryPayloads((StreamingQueryMessage)queryMessage)).expectNext((Object)"helloMono").verifyComplete();
    }

    @Test
    void streamingNullResult() {
        GenericStreamingQueryMessage queryMessage = new GenericStreamingQueryMessage((Object)"criteria", "nullQuery", String.class);
        StepVerifier.create(this.streamingQueryPayloads((StreamingQueryMessage)queryMessage)).expectComplete().verify();
    }

    @Test
    void errorResult() {
        GenericStreamingQueryMessage queryMessage = new GenericStreamingQueryMessage((Object)"criteria", "exceptionQuery", String.class);
        StepVerifier.create(this.streamingQueryPayloads((StreamingQueryMessage)queryMessage)).expectErrorMatches(t -> t instanceof QueryExecutionException && t.getMessage().startsWith("Error starting stream")).verify();
    }

    @Test
    void throttledFluxQuery() {
        GenericStreamingQueryMessage queryMessage = new GenericStreamingQueryMessage((Object)"criteria", "throttledFluxQuery", Long.class);
        StepVerifier.create(this.streamingQueryPayloads((StreamingQueryMessage)queryMessage)).expectNext((Object[])new Long[]{0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L}).verifyComplete();
    }

    @Test
    void backpressureFluxQuery() {
        GenericStreamingQueryMessage queryMessage = new GenericStreamingQueryMessage((Object)"criteria", "backPressure", Long.class);
        StepVerifier.create(this.streamingQueryPayloads((StreamingQueryMessage)queryMessage), (long)10L).expectNextCount(10L).thenRequest(10L).expectNextCount(10L).thenCancel().verify();
    }

    @Test
    void dispatchInterceptor() {
        AtomicBoolean hasBeenCalled = new AtomicBoolean();
        this.queryBus.registerDispatchInterceptor(messages -> {
            hasBeenCalled.set(true);
            return (i, m) -> m;
        });
        GenericStreamingQueryMessage queryMessage = new GenericStreamingQueryMessage((Object)"criteria", "fluxQuery", String.class);
        StepVerifier.create(this.streamingQueryPayloads((StreamingQueryMessage)queryMessage)).expectNext((Object)"a", (Object)"b", (Object)"c", (Object)"d").verifyComplete();
        Assertions.assertTrue((boolean)hasBeenCalled.get());
    }

    @Test
    void handlerInterceptor() {
        this.queryBus.registerHandlerInterceptor((unitOfWork, interceptorChain) -> ((Flux)interceptorChain.proceed()).map(it -> "a"));
        GenericStreamingQueryMessage queryMessage = new GenericStreamingQueryMessage((Object)"criteria", "fluxQuery", String.class);
        StepVerifier.create(this.streamingQueryPayloads((StreamingQueryMessage)queryMessage)).expectNext((Object)"a", (Object)"a", (Object)"a", (Object)"a").verifyComplete();
    }

    @Test
    void errorStream() {
        GenericStreamingQueryMessage queryMessage = new GenericStreamingQueryMessage((Object)"criteria", "errorStream", String.class);
        StepVerifier.create(this.streamingQueryPayloads((StreamingQueryMessage)queryMessage)).verifyErrorMatches(t -> t instanceof RuntimeException && t.getMessage().equals("oops"));
    }

    @Test
    void queryNotExists() {
        GenericStreamingQueryMessage queryMessage = new GenericStreamingQueryMessage((Object)"criteria", "queryNotExists", String.class);
        StepVerifier.create(this.streamingQueryPayloads((StreamingQueryMessage)queryMessage)).verifyErrorMatches(t -> t instanceof NoHandlerForQueryException);
    }

    @Test
    void resubscribeWorksEvenWhenAnErrorHasBeenCashed() {
        GenericStreamingQueryMessage queryMessage = new GenericStreamingQueryMessage((Object)"criteria", "exceptionQueryOnce", String.class);
        Flux flux = this.streamingQueryPayloads((StreamingQueryMessage)queryMessage);
        StepVerifier.create(flux).expectErrorMatches(t -> t instanceof QueryExecutionException && t.getMessage().startsWith("Error starting stream")).verify();
        StepVerifier.create(flux).expectNext((Object)"correctNow").verifyComplete();
    }

    private static class MyQueryHandler {
        AtomicBoolean errorThrown = new AtomicBoolean(false);

        private MyQueryHandler() {
        }

        @QueryHandler(queryName="fluxQuery")
        public Flux<String> fluxQuery(String criteria) {
            return Flux.just((Object[])new String[]{"a", "b", "c", "d"});
        }

        @QueryHandler(queryName="listQuery")
        public List<String> listQuery(String criteria) {
            handlersInvoked.add("handler_healthy");
            return Arrays.asList("a", "b", "c", "d");
        }

        @QueryHandler(queryName="streamQuery")
        public Stream<String> streamQuery(String criteria) {
            return Stream.of("a", "b", "c", "d");
        }

        @QueryHandler(queryName="singleResultQuery")
        public String singleResultQuery(String criteria) {
            return "lonely";
        }

        @QueryHandler(queryName="optionalResultQuery")
        public Optional<String> optionalResultQuery(String criteria) {
            return Optional.of("optional");
        }

        @QueryHandler(queryName="emptyOptionalResultQuery")
        public Optional<String> emptyOptionalResultQuery(String criteria) {
            return Optional.empty();
        }

        @QueryHandler(queryName="completableFutureQuery")
        public CompletableFuture<String> completableFutureQuery(String criteria) {
            return CompletableFuture.completedFuture("future");
        }

        @QueryHandler(queryName="streamingAfterHandlerCompletesQuery")
        public Flux<Long> streamingAfterHandlerCompletesQuery(String criteria) {
            return Flux.interval((Duration)Duration.ofSeconds(1L)).take(5L);
        }

        @QueryHandler(queryName="monoQuery")
        public Mono<String> monoQuery(String criteria) {
            return Mono.fromCallable(() -> "helloMono").delayElement(Duration.ofMillis(100L));
        }

        @QueryHandler(queryName="nullQuery")
        public Flux<String> nullQuery(String criteria) {
            return null;
        }

        @QueryHandler(queryName="exceptionQuery")
        public Flux<String> exceptionQuery(String criteria) {
            throw new RuntimeException("oops");
        }

        @QueryHandler(queryName="throttledFluxQuery")
        public Flux<Long> throttledFlux(String criteria) {
            return Flux.interval((Duration)Duration.ofMillis(100L)).window(2).take(4L).flatMap(Function.identity());
        }

        @QueryHandler(queryName="backPressure")
        public Flux<Long> backPressureQuery(String criteria) {
            return Flux.create(longFluxSink -> longFluxSink.onRequest(r -> LongStream.range(0L, r).forEach(arg_0 -> ((FluxSink)longFluxSink).next(arg_0))));
        }

        @QueryHandler(queryName="errorStream")
        public Flux<String> errorStream(String criteria) {
            return Flux.error((Throwable)new RuntimeException("oops"));
        }

        @QueryHandler(queryName="exceptionQueryOnce")
        public Flux<String> exceptionQueryOnce(String criteria) {
            if (this.errorThrown.compareAndSet(false, true)) {
                throw new RuntimeException("oops");
            }
            return Flux.just((Object)"correctNow");
        }
    }

    private static class ErrorQueryHandler {
        private ErrorQueryHandler() {
        }

        @QueryHandler(queryName="listQuery")
        public Flux<String> listQuery(String criteria) {
            handlersInvoked.add("handler_error");
            throw new RuntimeException("ooops");
        }
    }
}

