/*
 * Decompiled with CFR 0.152.
 */
package com.github.lhotari.reactive.pulsar.internal.adapter;

import com.github.lhotari.reactive.pulsar.adapter.MessageResult;
import com.github.lhotari.reactive.pulsar.adapter.ReactiveMessageConsumer;
import com.github.lhotari.reactive.pulsar.adapter.ReactiveMessageHandler;
import com.github.lhotari.reactive.pulsar.internal.adapter.DefaultReactiveMessageHandlerBuilder;
import com.github.lhotari.reactive.pulsar.internal.adapter.InKeyOrderMessageProcessors;
import com.github.lhotari.reactive.pulsar.internal.adapter.InflightLimiter;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.apache.pulsar.client.api.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.context.Context;
import reactor.util.context.ContextView;
import reactor.util.retry.Retry;

class DefaultReactiveMessageHandler<T>
implements ReactiveMessageHandler {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultReactiveMessageHandler.class);
    private static final String INFLIGHT_LIMITER_CONTEXT_KEY = DefaultReactiveMessageHandlerBuilder.class.getName() + ".INFLIGHT_LIMITER_CONTEXT_KEY";
    private final AtomicReference<Disposable> killSwitch = new AtomicReference();
    private final Mono<Void> pipeline;
    private final Function<Message<T>, Mono<Void>> messageHandler;
    private final BiConsumer<Message<T>, Throwable> errorLogger;
    private final Retry pipelineRetrySpec;
    private final Duration handlingTimeout;
    private final Function<Flux<Message<T>>, Flux<MessageResult<Void>>> streamingMessageHandler;
    private final boolean keyOrdered;
    private final int concurrency;
    private final int maxInflight;

    public DefaultReactiveMessageHandler(ReactiveMessageConsumer<T> messageConsumer, Function<Message<T>, Mono<Void>> messageHandler, BiConsumer<Message<T>, Throwable> errorLogger, Retry pipelineRetrySpec, Duration handlingTimeout, Function<Mono<Void>, Mono<Void>> transformer, Function<Flux<Message<T>>, Flux<MessageResult<Void>>> streamingMessageHandler, boolean keyOrdered, int concurrency, int maxInflight) {
        this.messageHandler = messageHandler;
        this.errorLogger = errorLogger;
        this.pipelineRetrySpec = pipelineRetrySpec;
        this.handlingTimeout = handlingTimeout;
        this.streamingMessageHandler = streamingMessageHandler;
        this.keyOrdered = keyOrdered;
        this.concurrency = concurrency;
        this.maxInflight = maxInflight;
        this.pipeline = messageConsumer.consumeMessages(this::createMessageConsumer).then().transform(transformer).transform(this::decoratePipeline);
    }

    private Mono<Void> decorateMessageHandler(Mono<Void> messageHandler) {
        if (this.handlingTimeout != null) {
            messageHandler = messageHandler.timeout(this.handlingTimeout);
        }
        if (this.maxInflight > 0) {
            messageHandler = messageHandler.transformDeferredContextual((original, context) -> {
                InflightLimiter inflightLimiter = (InflightLimiter)context.get((Object)INFLIGHT_LIMITER_CONTEXT_KEY);
                return inflightLimiter.transform(original);
            });
        }
        return messageHandler;
    }

    private Mono<Void> decoratePipeline(Mono<Void> pipeline) {
        if (this.maxInflight > 0) {
            Mono finalPipeline = pipeline;
            pipeline = Mono.using(() -> new InflightLimiter(this.maxInflight), inflightLimiter -> finalPipeline.contextWrite((ContextView)Context.of((Object)INFLIGHT_LIMITER_CONTEXT_KEY, (Object)inflightLimiter)), InflightLimiter::dispose);
        }
        if (this.pipelineRetrySpec != null) {
            return pipeline.retryWhen(this.pipelineRetrySpec);
        }
        return pipeline;
    }

    private Flux<MessageResult<Void>> createMessageConsumer(Flux<Message<T>> messageFlux) {
        if (this.messageHandler != null) {
            if (this.streamingMessageHandler != null) {
                throw new IllegalStateException("messageHandler and streamingMessageHandler cannot be set at the same time.");
            }
            if (this.concurrency > 1) {
                if (this.keyOrdered) {
                    return InKeyOrderMessageProcessors.processInKeyOrderConcurrently(messageFlux, this::handleMessage, Schedulers.parallel(), this.concurrency);
                }
                return messageFlux.flatMap(message -> this.handleMessage((Message<T>)message).subscribeOn(Schedulers.parallel()), this.concurrency);
            }
            return messageFlux.concatMap(this::handleMessage);
        }
        return Objects.requireNonNull(this.streamingMessageHandler, "streamingMessageHandler or messageHandler must be set").apply(messageFlux);
    }

    private Mono<MessageResult<Void>> handleMessage(Message<T> message) {
        return this.messageHandler.apply(message).transform(this::decorateMessageHandler).thenReturn(MessageResult.acknowledge(message.getMessageId())).onErrorResume(throwable -> {
            if (this.errorLogger != null) {
                try {
                    this.errorLogger.accept((Message<Message>)message, (Throwable)throwable);
                }
                catch (Exception e) {
                    LOG.error("Error in calling error logger", (Throwable)e);
                }
            } else {
                LOG.error("Message handling for message id {} failed.", (Object)message.getMessageId(), throwable);
            }
            return Mono.just(MessageResult.negativeAcknowledge(message.getMessageId()));
        });
    }

    @Override
    public ReactiveMessageHandler start() {
        if (this.killSwitch.get() != null) {
            throw new IllegalStateException("Message handler is already running.");
        }
        Disposable disposable = this.pipeline.subscribe(null, this::logError, this::logUnexpectedCompletion);
        if (!this.killSwitch.compareAndSet(null, disposable)) {
            disposable.dispose();
            throw new IllegalStateException("Message handler was already running.");
        }
        return this;
    }

    private void logError(Throwable throwable) {
        LOG.error("ReactiveMessageHandler was unexpectedly terminated.", throwable);
    }

    private void logUnexpectedCompletion() {
        if (this.isRunning()) {
            LOG.error("ReactiveMessageHandler was unexpectedly completed.");
        }
    }

    @Override
    public ReactiveMessageHandler stop() {
        Disposable disposable = this.killSwitch.getAndSet(null);
        if (disposable != null) {
            disposable.dispose();
        }
        return this;
    }

    @Override
    public boolean isRunning() {
        return this.killSwitch.get() != null;
    }

    @Override
    public void close() throws Exception {
        this.stop();
    }
}

