/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.http.netty;

import io.servicetalk.buffer.api.Buffer;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.AsyncContext;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.TerminalSignalConsumer;
import io.servicetalk.context.api.ContextMap;
import io.servicetalk.http.api.HttpExecutionStrategies;
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.http.api.HttpExecutionStrategyInfluencer;
import io.servicetalk.http.api.HttpHeaders;
import io.servicetalk.http.api.HttpLifecycleObserver;
import io.servicetalk.http.api.HttpResponseMetaData;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.netty.NoopHttpLifecycleObserver;
import io.servicetalk.http.utils.BeforeFinallyHttpOperator;
import io.servicetalk.transport.api.ConnectionInfo;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class AbstractLifecycleObserverHttpFilter
implements HttpExecutionStrategyInfluencer {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractLifecycleObserverHttpFilter.class);
    static final ContextMap.Key<Consumer<ConnectionInfo>> ON_CONNECTION_SELECTED_CONSUMER = ContextMap.Key.newKey("ON_CONNECTION_SELECTED_CONSUMER", Consumer.class);
    private final HttpLifecycleObserver observer;
    private final boolean client;

    AbstractLifecycleObserverHttpFilter(HttpLifecycleObserver observer, boolean client) {
        this.observer = Objects.requireNonNull(observer);
        this.client = client;
    }

    final Single<StreamingHttpResponse> trackLifecycle(@Nullable ConnectionInfo connInfo, StreamingHttpRequest request, Function<StreamingHttpRequest, Single<StreamingHttpResponse>> responseFunction) {
        return Single.defer(() -> {
            Single responseSingle;
            boolean clearAsyncContext;
            HttpLifecycleObserver.HttpExchangeObserver onExchange = AbstractLifecycleObserverHttpFilter.safeReport(this.observer::onNewExchange, this.observer, "onNewExchange", NoopHttpLifecycleObserver.NoopHttpExchangeObserver.INSTANCE);
            if (connInfo != null) {
                AbstractLifecycleObserverHttpFilter.safeReport(onExchange::onConnectionSelected, connInfo, (Object)onExchange, "onConnectionSelected");
                clearAsyncContext = false;
            } else {
                AsyncContext.put(ON_CONNECTION_SELECTED_CONSUMER, selectedConnection -> AbstractLifecycleObserverHttpFilter.safeReport(onExchange::onConnectionSelected, selectedConnection, (Object)onExchange, "onConnectionSelected"));
                clearAsyncContext = true;
            }
            final ExchangeContext exchangeContext = new ExchangeContext(onExchange, this.client, clearAsyncContext);
            final HttpLifecycleObserver.HttpRequestObserver onRequest = AbstractLifecycleObserverHttpFilter.safeReport(onExchange::onRequest, request, onExchange, "onRequest", NoopHttpLifecycleObserver.NoopHttpRequestObserver.INSTANCE);
            StreamingHttpRequest transformed = request.transformMessageBody(p -> {
                if (this.client) {
                    p = p.beforeSubscriber(() -> {
                        exchangeContext.requestMessageBodyStarts();
                        return NoopSubscriber.INSTANCE;
                    });
                }
                return p.beforeOnNext(item -> {
                    if (item instanceof Buffer) {
                        AbstractLifecycleObserverHttpFilter.safeReport(onRequest::onRequestData, (Buffer)item, (Object)onRequest, "onRequestData");
                    } else if (item instanceof HttpHeaders) {
                        AbstractLifecycleObserverHttpFilter.safeReport(onRequest::onRequestTrailers, (HttpHeaders)item, (Object)onRequest, "onRequestTrailers");
                    } else {
                        LOGGER.warn("Programming mistake: unexpected message body item is received on the request: {}", (Object)item.getClass().getName());
                    }
                }).beforeFinally(new TerminalSignalConsumer(){

                    @Override
                    public void onComplete() {
                        AbstractLifecycleObserverHttpFilter.safeReport(onRequest::onRequestComplete, onRequest, "onRequestComplete");
                        exchangeContext.decrementRemaining();
                    }

                    @Override
                    public void onError(Throwable cause) {
                        AbstractLifecycleObserverHttpFilter.safeReport(onRequest::onRequestError, cause, onRequest, "onRequestError");
                        exchangeContext.decrementRemaining();
                    }

                    @Override
                    public void cancel() {
                        AbstractLifecycleObserverHttpFilter.safeReport(onRequest::onRequestCancel, onRequest, "onRequestCancel");
                        exchangeContext.decrementRemaining();
                    }
                });
            });
            try {
                responseSingle = (Single)responseFunction.apply(transformed);
            }
            catch (Throwable t) {
                onExchange.onResponseError(t);
                return Single.failed(t).shareContextOnSubscribe();
            }
            return responseSingle.liftSync(new BeforeFinallyHttpOperator(exchangeContext, true)).map(resp -> {
                exchangeContext.onResponse((HttpResponseMetaData)resp);
                return resp.transformMessageBody(p -> p.beforeOnNext(exchangeContext::onResponseBody));
            }).shareContextOnSubscribe();
        });
    }

    @Override
    public final HttpExecutionStrategy requiredOffloads() {
        return HttpExecutionStrategies.offloadNone();
    }

    private static <T> T safeReport(Supplier<T> supplier, Object observer, String eventName, T defaultValue) {
        try {
            return Objects.requireNonNull(supplier.get());
        }
        catch (Throwable unexpected) {
            LOGGER.warn("Unexpected exception from {} while reporting a '{}' event", observer, eventName, unexpected);
            return defaultValue;
        }
    }

    private static <T, A> T safeReport(Function<A, T> function, A argument, Object observer, String eventName, T defaultValue) {
        try {
            return Objects.requireNonNull(function.apply(argument));
        }
        catch (Throwable unexpected) {
            LOGGER.warn("Unexpected exception from {} while reporting a '{}' event", observer, eventName, unexpected);
            return defaultValue;
        }
    }

    private static <T> void safeReport(Consumer<T> consumer, T t, Object observer, String eventName) {
        try {
            consumer.accept(t);
        }
        catch (Throwable unexpected) {
            LOGGER.warn("Unexpected exception from {} while reporting a '{}' event", observer, eventName, unexpected);
        }
    }

    private static void safeReport(Runnable runnable, Object observer, String eventName) {
        try {
            runnable.run();
        }
        catch (Throwable unexpected) {
            LOGGER.warn("Unexpected exception from {} while reporting a '{}' event", observer, eventName, unexpected);
        }
    }

    private static void safeReport(Consumer<Throwable> onError, Throwable t, Object observer, String eventName) {
        try {
            onError.accept(t);
        }
        catch (Throwable unexpected) {
            unexpected.addSuppressed(t);
            LOGGER.warn("Unexpected exception from {} while reporting a '{}' event", observer, eventName, unexpected);
        }
    }

    private static final class NoopSubscriber
    implements PublisherSource.Subscriber<Object> {
        static final NoopSubscriber INSTANCE = new NoopSubscriber();

        private NoopSubscriber() {
        }

        @Override
        public void onSubscribe(PublisherSource.Subscription subscription) {
        }

        @Override
        public void onNext(@Nullable Object o) {
        }

        @Override
        public void onError(Throwable t) {
        }

        @Override
        public void onComplete() {
        }
    }

    private static final class ExchangeContext
    implements TerminalSignalConsumer {
        private static final AtomicIntegerFieldUpdater<ExchangeContext> remainingUpdater = AtomicIntegerFieldUpdater.newUpdater(ExchangeContext.class, "remaining");
        private final HttpLifecycleObserver.HttpExchangeObserver onExchange;
        private final boolean clearAsyncContext;
        @Nullable
        private HttpLifecycleObserver.HttpResponseObserver onResponse;
        private volatile int remaining;

        private ExchangeContext(HttpLifecycleObserver.HttpExchangeObserver onExchange, boolean client, boolean clearAsyncContext) {
            this.onExchange = onExchange;
            this.remaining = client ? 1 : 2;
            this.clearAsyncContext = clearAsyncContext;
        }

        void onResponse(HttpResponseMetaData responseMetaData) {
            this.onResponse = (HttpLifecycleObserver.HttpResponseObserver)AbstractLifecycleObserverHttpFilter.safeReport(this.onExchange::onResponse, responseMetaData, this.onExchange, "onResponse", NoopHttpLifecycleObserver.NoopHttpResponseObserver.INSTANCE);
        }

        void onResponseBody(Object item) {
            assert (this.onResponse != null);
            if (item instanceof Buffer) {
                AbstractLifecycleObserverHttpFilter.safeReport(this.onResponse::onResponseData, (Buffer)item, this.onResponse, "onResponseData");
            } else if (item instanceof HttpHeaders) {
                AbstractLifecycleObserverHttpFilter.safeReport(this.onResponse::onResponseTrailers, (HttpHeaders)item, this.onResponse, "onResponseTrailers");
            } else {
                LOGGER.warn("Programming mistake: unexpected message body item is received on the response: {}", (Object)item.getClass().getName());
            }
        }

        @Override
        public void onComplete() {
            if (this.onResponse != null) {
                AbstractLifecycleObserverHttpFilter.safeReport(this.onResponse::onResponseComplete, this.onResponse, "onResponseComplete");
            }
            this.decrementRemaining();
        }

        @Override
        public void onError(Throwable t) {
            if (this.onResponse == null) {
                AbstractLifecycleObserverHttpFilter.safeReport(this.onExchange::onResponseError, t, this.onExchange, "onResponseError");
            } else {
                AbstractLifecycleObserverHttpFilter.safeReport(this.onResponse::onResponseError, t, this.onResponse, "onResponseError");
            }
            this.decrementRemaining();
        }

        @Override
        public void cancel() {
            if (this.onResponse == null) {
                AbstractLifecycleObserverHttpFilter.safeReport(this.onExchange::onResponseCancel, this.onExchange, "onResponseCancel");
            } else {
                AbstractLifecycleObserverHttpFilter.safeReport(this.onResponse::onResponseCancel, this.onResponse, "onResponseCancel");
            }
            this.decrementRemaining();
        }

        void requestMessageBodyStarts() {
            remainingUpdater.incrementAndGet(this);
        }

        void decrementRemaining() {
            if (remainingUpdater.decrementAndGet(this) == 0) {
                AbstractLifecycleObserverHttpFilter.safeReport(this.onExchange::onExchangeFinally, this.onExchange, "onExchangeFinally");
                if (this.clearAsyncContext) {
                    AsyncContext.remove(ON_CONNECTION_SELECTED_CONSUMER);
                }
            }
        }
    }
}

