/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.queryhandling;

import java.time.Duration;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.responsetypes.ResponseTypes;
import org.axonframework.queryhandling.GenericSubscriptionQueryMessage;
import org.axonframework.queryhandling.SimpleQueryUpdateEmitter;
import org.axonframework.queryhandling.SubscriptionQueryBackpressure;
import org.axonframework.queryhandling.SubscriptionQueryMessage;
import org.axonframework.queryhandling.UpdateHandlerRegistration;
import org.axonframework.tracing.SpanFactory;
import org.axonframework.tracing.TestSpanFactory;
import org.hamcrest.CoreMatchers;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

class SimpleQueryUpdateEmitterTest {
    private final TestSpanFactory spanFactory = new TestSpanFactory();
    private final SimpleQueryUpdateEmitter testSubject = SimpleQueryUpdateEmitter.builder().spanFactory((SpanFactory)this.spanFactory).build();

    SimpleQueryUpdateEmitterTest() {
    }

    @Test
    void completingRegistrationOldApi() {
        GenericSubscriptionQueryMessage queryMessage = new GenericSubscriptionQueryMessage((Object)"some-payload", "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.instanceOf(String.class));
        UpdateHandlerRegistration result = this.testSubject.registerUpdateHandler((SubscriptionQueryMessage)queryMessage, SubscriptionQueryBackpressure.defaultBackpressure(), 1024);
        this.testSubject.emit(any -> true, (Object)"some-awesome-text");
        result.complete();
        StepVerifier.create((Publisher)result.getUpdates().map(Message::getPayload)).expectNext((Object)"some-awesome-text").verifyComplete();
    }

    @Test
    void concurrentUpdateEmitting() {
        GenericSubscriptionQueryMessage queryMessage = new GenericSubscriptionQueryMessage((Object)"some-payload", "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.instanceOf(String.class));
        UpdateHandlerRegistration registration = this.testSubject.registerUpdateHandler((SubscriptionQueryMessage)queryMessage, 128);
        ExecutorService executors = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        for (int i = 0; i < 100; ++i) {
            executors.submit(() -> this.testSubject.emit(q -> true, (Object)"Update"));
        }
        executors.shutdown();
        StepVerifier.create((Publisher)registration.getUpdates()).expectNextCount(100L).then(() -> this.testSubject.complete(q -> true)).verifyComplete();
    }

    @Test
    void concurrentUpdateEmitting_WithBackpressure() {
        GenericSubscriptionQueryMessage queryMessage = new GenericSubscriptionQueryMessage((Object)"some-payload", "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.instanceOf(String.class));
        UpdateHandlerRegistration registration = this.testSubject.registerUpdateHandler((SubscriptionQueryMessage)queryMessage, SubscriptionQueryBackpressure.defaultBackpressure(), 128);
        ExecutorService executors = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        for (int i = 0; i < 100; ++i) {
            executors.submit(() -> this.testSubject.emit(q -> true, (Object)"Update"));
        }
        executors.shutdown();
        StepVerifier.create((Publisher)registration.getUpdates()).expectNextCount(100L).then(() -> this.testSubject.complete(q -> true)).verifyComplete();
    }

    @Test
    void cancelingRegistrationDoesNotCompleteFluxOfUpdatesOldApi() {
        GenericSubscriptionQueryMessage queryMessage = new GenericSubscriptionQueryMessage((Object)"some-payload", "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.instanceOf(String.class));
        UpdateHandlerRegistration result = this.testSubject.registerUpdateHandler((SubscriptionQueryMessage)queryMessage, SubscriptionQueryBackpressure.defaultBackpressure(), 1024);
        this.testSubject.emit(any -> true, (Object)"some-awesome-text");
        result.getRegistration().cancel();
        StepVerifier.create((Publisher)result.getUpdates().map(Message::getPayload)).expectNext((Object)"some-awesome-text").verifyTimeout(Duration.ofMillis(500L));
    }

    @Test
    void completingRegistration() {
        GenericSubscriptionQueryMessage queryMessage = new GenericSubscriptionQueryMessage((Object)"some-payload", "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.instanceOf(String.class));
        UpdateHandlerRegistration result = this.testSubject.registerUpdateHandler((SubscriptionQueryMessage)queryMessage, 1024);
        result.getUpdates().subscribe();
        this.testSubject.emit(any -> true, (Object)"some-awesome-text");
        result.complete();
        StepVerifier.create((Publisher)result.getUpdates().map(Message::getPayload)).expectNext((Object)"some-awesome-text").verifyComplete();
    }

    @Test
    void queryUpdateEmitterIsTraced() {
        GenericSubscriptionQueryMessage queryMessage = new GenericSubscriptionQueryMessage((Object)"some-payload", "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.instanceOf(String.class));
        UpdateHandlerRegistration result = this.testSubject.registerUpdateHandler((SubscriptionQueryMessage)queryMessage, 1024);
        result.getUpdates().subscribe();
        this.testSubject.emit(any -> true, (Object)"some-awesome-text");
        result.complete();
        this.spanFactory.verifySpanCompleted("SimpleQueryUpdateEmitter.emit");
        this.spanFactory.verifySpanHasType("SimpleQueryUpdateEmitter.emit", TestSpanFactory.TestSpanType.INTERNAL);
        this.spanFactory.verifySpanCompleted("QueryUpdateEmitter.emit chatMessages");
        this.spanFactory.verifySpanHasType("QueryUpdateEmitter.emit chatMessages", TestSpanFactory.TestSpanType.DISPATCH);
    }

    @Test
    void differentUpdateAreDisambiguatedAndWrongTypesAreFilteredBasedOnQueryTypes() {
        GenericSubscriptionQueryMessage queryMessage = new GenericSubscriptionQueryMessage((Object)"some-payload", "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.instanceOf(Integer.class));
        UpdateHandlerRegistration result = this.testSubject.registerUpdateHandler((SubscriptionQueryMessage)queryMessage, 1024);
        result.getUpdates().subscribe();
        this.testSubject.emit(any -> true, (Object)"some-awesome-text");
        this.testSubject.emit(any -> true, (Object)1234);
        result.complete();
        StepVerifier.create((Publisher)result.getUpdates().map(Message::getPayload)).expectNext((Object)1234).verifyComplete();
    }

    @Test
    void updateResponseTypeFilteringWorksForMultipleInstanceOfWithArrayAndList() {
        GenericSubscriptionQueryMessage queryMessage = new GenericSubscriptionQueryMessage((Object)"some-payload", "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.multipleInstancesOf(String.class));
        UpdateHandlerRegistration result = this.testSubject.registerUpdateHandler((SubscriptionQueryMessage)queryMessage, 1024);
        result.getUpdates().subscribe();
        this.testSubject.emit(any -> true, (Object)"some-awesome-text");
        this.testSubject.emit(any -> true, (Object)1234);
        this.testSubject.emit(any -> true, Optional.of("optional-payload"));
        this.testSubject.emit(any -> true, Optional.empty());
        this.testSubject.emit(any -> true, (Object)new String[]{"array-item-1", "array-item-2"});
        this.testSubject.emit(any -> true, Arrays.asList("list-item-1", "list-item-2"));
        this.testSubject.emit(any -> true, (Object)Flux.just((Object[])new String[]{"flux-item-1", "flux-item-2"}));
        this.testSubject.emit(any -> true, (Object)Mono.just((Object)"mono-item"));
        result.complete();
        StepVerifier.create((Publisher)result.getUpdates().map(Message::getPayload)).expectNextMatches(actual -> CoreMatchers.equalTo((Object)new String[]{"array-item-1", "array-item-2"}).matches(actual)).expectNextMatches(actual -> CoreMatchers.equalTo(Arrays.asList("list-item-1", "list-item-2")).matches(actual)).verifyComplete();
    }

    @Test
    void updateResponseTypeFilteringWorksForOptionaInstanceOf() {
        GenericSubscriptionQueryMessage queryMessage = new GenericSubscriptionQueryMessage((Object)"some-payload", "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.optionalInstanceOf(String.class));
        UpdateHandlerRegistration result = this.testSubject.registerUpdateHandler((SubscriptionQueryMessage)queryMessage, 1024);
        result.getUpdates().subscribe();
        this.testSubject.emit(any -> true, (Object)"some-awesome-text");
        this.testSubject.emit(any -> true, (Object)1234);
        this.testSubject.emit(any -> true, Optional.of("optional-payload"));
        this.testSubject.emit(any -> true, Optional.empty());
        this.testSubject.emit(any -> true, (Object)new String[]{"array-item-1", "array-item-2"});
        this.testSubject.emit(any -> true, Arrays.asList("list-item-1", "list-item-2"));
        this.testSubject.emit(any -> true, (Object)Flux.just((Object[])new String[]{"flux-item-1", "flux-item-2"}));
        this.testSubject.emit(any -> true, (Object)Mono.just((Object)"mono-item"));
        result.complete();
        StepVerifier.create((Publisher)result.getUpdates().map(Message::getPayload)).expectNext(Optional.of("optional-payload"), Optional.empty()).verifyComplete();
    }

    @Test
    void updateResponseTypeFilteringWorksForPublisherOf() {
        GenericSubscriptionQueryMessage queryMessage = new GenericSubscriptionQueryMessage((Object)"some-payload", "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.publisherOf(String.class));
        UpdateHandlerRegistration result = this.testSubject.registerUpdateHandler((SubscriptionQueryMessage)queryMessage, 1024);
        result.getUpdates().subscribe();
        this.testSubject.emit(any -> true, (Object)"some-awesome-text");
        this.testSubject.emit(any -> true, (Object)1234);
        this.testSubject.emit(any -> true, Optional.of("optional-payload"));
        this.testSubject.emit(any -> true, Optional.empty());
        this.testSubject.emit(any -> true, (Object)new String[]{"array-item-1", "array-item-2"});
        this.testSubject.emit(any -> true, Arrays.asList("list-item-1", "list-item-2"));
        this.testSubject.emit(any -> true, (Object)Flux.just((Object[])new String[]{"flux-item-1", "flux-item-2"}));
        this.testSubject.emit(any -> true, (Object)Mono.just((Object)"mono-item"));
        this.testSubject.emit(any -> true, (Object)Mono.empty());
        result.complete();
        StepVerifier.create((Publisher)result.getUpdates().map(Message::getPayload)).expectNextMatches(publisher -> {
            try {
                StepVerifier.create((Publisher)((Publisher)publisher)).expectNext((Object)"flux-item-1", (Object)"flux-item-2").verifyComplete();
            }
            catch (Exception e) {
                return false;
            }
            return true;
        }).expectNextMatches(publisher -> {
            try {
                StepVerifier.create((Publisher)((Publisher)publisher)).expectNext((Object)"mono-item").verifyComplete();
            }
            catch (Exception e) {
                return false;
            }
            return true;
        }).expectNextMatches(publisher -> {
            try {
                StepVerifier.create((Publisher)((Publisher)publisher)).verifyComplete();
            }
            catch (Exception e) {
                return false;
            }
            return true;
        }).verifyComplete();
    }

    @Test
    void multipleInstanceUpdatesAreDelivered() {
        GenericSubscriptionQueryMessage queryMessage = new GenericSubscriptionQueryMessage((Object)"some-payload", "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.multipleInstancesOf(String.class));
        UpdateHandlerRegistration result = this.testSubject.registerUpdateHandler((SubscriptionQueryMessage)queryMessage, 1024);
        result.getUpdates().subscribe();
        this.testSubject.emit(any -> true, Arrays.asList("text1", "text2"));
        this.testSubject.emit(any -> true, Arrays.asList("text3", "text4"));
        result.complete();
        StepVerifier.create((Publisher)result.getUpdates().map(Message::getPayload)).expectNext(Arrays.asList("text1", "text2"), Arrays.asList("text3", "text4")).verifyComplete();
    }

    @Test
    void optionalUpdatesAreDelivered() {
        GenericSubscriptionQueryMessage queryMessage = new GenericSubscriptionQueryMessage((Object)"some-payload", "chatMessages", ResponseTypes.optionalInstanceOf(String.class), ResponseTypes.optionalInstanceOf(String.class));
        UpdateHandlerRegistration result = this.testSubject.registerUpdateHandler((SubscriptionQueryMessage)queryMessage, 1024);
        result.getUpdates().subscribe();
        this.testSubject.emit(any -> true, Optional.of("text1"));
        this.testSubject.emit(any -> true, Optional.of("text2"));
        result.complete();
        StepVerifier.create((Publisher)result.getUpdates().map(Message::getPayload)).expectNext(Optional.of("text1"), Optional.of("text2")).verifyComplete();
    }

    @Test
    void cancelingRegistrationDoesNotCompleteFluxOfUpdates() {
        GenericSubscriptionQueryMessage queryMessage = new GenericSubscriptionQueryMessage((Object)"some-payload", "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.instanceOf(String.class));
        UpdateHandlerRegistration result = this.testSubject.registerUpdateHandler((SubscriptionQueryMessage)queryMessage, 1024);
        result.getUpdates().subscribe();
        this.testSubject.emit(any -> true, (Object)"some-awesome-text");
        result.getRegistration().cancel();
        StepVerifier.create((Publisher)result.getUpdates().map(Message::getPayload)).expectNext((Object)"some-awesome-text").verifyTimeout(Duration.ofMillis(500L));
    }
}

