/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.axonserver.connector.query.subscription;

import io.axoniq.axonserver.grpc.query.QueryProviderInbound;
import io.axoniq.axonserver.grpc.query.QueryProviderOutbound;
import io.axoniq.axonserver.grpc.query.SubscriptionQuery;
import io.axoniq.axonserver.grpc.query.SubscriptionQueryRequest;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import org.axonframework.axonserver.connector.Publisher;
import org.axonframework.axonserver.connector.query.subscription.DisposableResult;
import org.axonframework.axonserver.connector.query.subscription.SubscriptionMessageSerializer;
import org.axonframework.common.Registration;
import org.axonframework.queryhandling.QueryBus;
import org.axonframework.queryhandling.QueryResponseMessage;
import org.axonframework.queryhandling.SubscriptionQueryResult;
import org.axonframework.queryhandling.SubscriptionQueryUpdateMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;

public class SubscriptionQueryRequestTarget {
    private final Logger logger = LoggerFactory.getLogger(SubscriptionQueryRequestTarget.class);
    private final QueryBus localSegment;
    private final Publisher<QueryProviderOutbound> publisher;
    private final SubscriptionMessageSerializer serializer;
    private final Map<String, Map<String, SubscriptionQueryResult<QueryResponseMessage<Object>, SubscriptionQueryUpdateMessage<Object>>>> subscriptions;

    public SubscriptionQueryRequestTarget(QueryBus localSegment, Publisher<QueryProviderOutbound> publisher, SubscriptionMessageSerializer serializer) {
        this.localSegment = localSegment;
        this.publisher = publisher;
        this.serializer = serializer;
        this.subscriptions = new ConcurrentHashMap<String, Map<String, SubscriptionQueryResult<QueryResponseMessage<Object>, SubscriptionQueryUpdateMessage<Object>>>>();
    }

    public void onSubscriptionQueryRequest(String context, QueryProviderInbound queryProviderInbound) {
        SubscriptionQueryRequest subscriptionQuery = queryProviderInbound.getSubscriptionQueryRequest();
        try {
            switch (subscriptionQuery.getRequestCase()) {
                case SUBSCRIBE: {
                    this.subscribe(context, subscriptionQuery.getSubscribe());
                    break;
                }
                case GET_INITIAL_RESULT: {
                    this.getInitialResult(context, subscriptionQuery.getGetInitialResult());
                    break;
                }
                case UNSUBSCRIBE: {
                    this.unsubscribe(context, subscriptionQuery.getUnsubscribe());
                }
            }
        }
        catch (Exception e) {
            this.logger.warn("Error handling SubscriptionQueryRequest.", (Throwable)e);
        }
    }

    private void subscribe(String context, SubscriptionQuery subscriptionQuery) {
        String subscriptionId = subscriptionQuery.getSubscriptionIdentifier();
        SubscriptionQueryResult result = this.localSegment.subscriptionQuery(this.serializer.deserialize(subscriptionQuery));
        Disposable disposable = result.updates().subscribe(u -> this.publisher.publish(this.serializer.serialize((SubscriptionQueryUpdateMessage<?>)u, subscriptionId)), e -> this.publisher.publish(this.serializer.serializeCompleteExceptionally(subscriptionId, (Throwable)e)), () -> this.publisher.publish(this.serializer.serializeComplete(subscriptionId)));
        Registration registration = () -> {
            disposable.dispose();
            return true;
        };
        this.subscriptions.computeIfAbsent(context, k -> new ConcurrentHashMap()).computeIfAbsent(subscriptionId, id -> new DisposableResult(result, registration));
    }

    private void getInitialResult(String context, SubscriptionQuery query) {
        String subscriptionId = query.getSubscriptionIdentifier();
        this.subscriptions.get(context).get(subscriptionId).initialResult().subscribe(i -> this.publisher.publish(this.serializer.serialize((QueryResponseMessage)i, subscriptionId)), e -> this.logger.debug("Error in initial result for subscription id: {}", (Object)subscriptionId));
    }

    private void unsubscribe(String context, SubscriptionQuery unsubscribe) {
        String subscriptionId = unsubscribe.getSubscriptionIdentifier();
        this.logger.debug("unsubscribe locally subscriptionId {}", (Object)subscriptionId);
        Optional.ofNullable(this.subscriptions.get(context).remove(subscriptionId)).ifPresent(Registration::cancel);
    }

    public void onApplicationDisconnected(String context) {
        this.subscriptions.getOrDefault(context, Collections.emptyMap()).values().forEach(Registration::cancel);
        this.subscriptions.clear();
    }
}

