/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.axonserver.connector.query.subscription;

import io.axoniq.axonserver.connector.query.SubscriptionQueryResult;
import io.axoniq.axonserver.grpc.query.QueryUpdate;
import jakarta.annotation.Nonnull;
import org.axonframework.axonserver.connector.event.util.GrpcExceptionParser;
import org.axonframework.axonserver.connector.query.subscription.SubscriptionMessageSerializer;
import org.axonframework.queryhandling.QueryResponseMessage;
import org.axonframework.queryhandling.SubscriptionQueryMessage;
import org.axonframework.queryhandling.SubscriptionQueryResponseMessages;
import org.axonframework.queryhandling.SubscriptionQueryUpdateMessage;
import org.axonframework.queryhandling.tracing.QueryBusSpanFactory;
import org.axonframework.tracing.Span;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

public class AxonServerSubscriptionQueryResponseMessages
implements SubscriptionQueryResponseMessages {
    private final Logger logger = LoggerFactory.getLogger(AxonServerSubscriptionQueryResponseMessages.class);
    private final Mono<QueryResponseMessage> initialResult;
    private final SubscriptionQueryResult result;
    private final Flux<SubscriptionQueryUpdateMessage> updates = Flux.create(fluxSink -> {
        fluxSink.onRequest(count -> {
            int i = 0;
            while ((long)i < count) {
                QueryUpdate next = (QueryUpdate)result.updates().nextIfAvailable();
                if (next == null) {
                    if (!result.updates().isClosed()) break;
                    this.completeFlux((FluxSink<SubscriptionQueryUpdateMessage>)fluxSink, result.updates().getError().orElse(null));
                    break;
                }
                this.publishQueryUpdate(queryMessage, subscriptionSerializer, spanFactory, (FluxSink<SubscriptionQueryUpdateMessage>)fluxSink, next);
                ++i;
            }
        });
        fluxSink.onDispose(() -> {
            this.logger.debug("Flux was disposed. Will close subscription query");
            result.updates().close();
        });
        result.updates().onAvailable(() -> {
            if (fluxSink.requestedFromDownstream() > 0L) {
                QueryUpdate next = (QueryUpdate)result.updates().nextIfAvailable();
                if (next != null) {
                    this.publishQueryUpdate(queryMessage, subscriptionSerializer, spanFactory, (FluxSink<SubscriptionQueryUpdateMessage>)fluxSink, next);
                }
            } else {
                this.logger.trace("Not sending update to Flux Sink. Not enough info requested");
            }
            if (result.updates().isClosed()) {
                this.completeFlux((FluxSink<SubscriptionQueryUpdateMessage>)fluxSink, result.updates().getError().orElse(null));
            }
        });
    }).doOnError(e -> result.updates().close());

    public AxonServerSubscriptionQueryResponseMessages(SubscriptionQueryMessage queryMessage, SubscriptionQueryResult result, SubscriptionMessageSerializer subscriptionSerializer, QueryBusSpanFactory spanFactory, Span parentSpan) {
        this.initialResult = Mono.fromCompletionStage(() -> ((SubscriptionQueryResult)result).initialResult()).onErrorMap(GrpcExceptionParser::parse).doOnError(arg_0 -> ((Span)parentSpan).recordException(arg_0)).doOnTerminate(() -> ((Span)parentSpan).end()).map(subscriptionSerializer::deserialize);
        this.result = result;
    }

    private void publishQueryUpdate(SubscriptionQueryMessage queryMessage, SubscriptionMessageSerializer subscriptionSerializer, QueryBusSpanFactory spanFactory, FluxSink<SubscriptionQueryUpdateMessage> fluxSink, QueryUpdate next) {
        SubscriptionQueryUpdateMessage message = subscriptionSerializer.deserialize(next);
        spanFactory.createSubscriptionQueryProcessUpdateSpan(message, queryMessage).run(() -> fluxSink.next((Object)message));
    }

    private void completeFlux(FluxSink<SubscriptionQueryUpdateMessage> fluxSink, Throwable error) {
        if (error != null) {
            fluxSink.error(error);
        } else {
            fluxSink.complete();
        }
    }

    @Nonnull
    public Flux<QueryResponseMessage> initialResult() {
        return Flux.from(this.initialResult);
    }

    @Nonnull
    public Flux<SubscriptionQueryUpdateMessage> updates() {
        return this.updates;
    }

    public void close() {
        this.result.updates().close();
    }
}

