/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.servicebus;

import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.ServiceBusMessageContext;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import java.util.Objects;
import java.util.concurrent.Semaphore;
import java.util.function.Function;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;

final class FluxAutoComplete
extends FluxOperator<ServiceBusMessageContext, ServiceBusMessageContext> {
    private final Semaphore completionLock;
    private final Function<ServiceBusMessageContext, Mono<Void>> onComplete;
    private final Function<ServiceBusMessageContext, Mono<Void>> onAbandon;
    private final ClientLogger logger = new ClientLogger(FluxAutoComplete.class);

    FluxAutoComplete(Flux<? extends ServiceBusMessageContext> upstream, Semaphore completionLock, Function<ServiceBusMessageContext, Mono<Void>> onComplete, Function<ServiceBusMessageContext, Mono<Void>> onAbandon) {
        super(upstream);
        this.completionLock = completionLock;
        this.onComplete = Objects.requireNonNull(onComplete, "'onComplete' cannot be null.");
        this.onAbandon = Objects.requireNonNull(onAbandon, "'onAbandon' cannot be null.");
    }

    public void subscribe(CoreSubscriber<? super ServiceBusMessageContext> coreSubscriber) {
        Objects.requireNonNull(coreSubscriber, "'coreSubscriber' cannot be null.");
        AutoCompleteSubscriber subscriber = new AutoCompleteSubscriber(coreSubscriber, this.completionLock, this.onComplete, this.onAbandon, this.logger);
        this.source.subscribe((CoreSubscriber)subscriber);
    }

    static final class AutoCompleteSubscriber
    extends BaseSubscriber<ServiceBusMessageContext> {
        private final CoreSubscriber<? super ServiceBusMessageContext> downstream;
        private final Function<ServiceBusMessageContext, Mono<Void>> onComplete;
        private final Function<ServiceBusMessageContext, Mono<Void>> onAbandon;
        private final Semaphore semaphore;
        private final ClientLogger logger;

        AutoCompleteSubscriber(CoreSubscriber<? super ServiceBusMessageContext> downstream, Semaphore completionLock, Function<ServiceBusMessageContext, Mono<Void>> onComplete, Function<ServiceBusMessageContext, Mono<Void>> onAbandon, ClientLogger logger) {
            this.downstream = downstream;
            this.onComplete = onComplete;
            this.onAbandon = onAbandon;
            this.semaphore = completionLock;
            this.logger = logger;
        }

        protected void hookOnSubscribe(Subscription subscription) {
            this.logger.info("Subscription received. Subscribing downstream. {}", new Object[]{subscription});
            this.downstream.onSubscribe((Subscription)this);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected void hookOnNext(ServiceBusMessageContext value) {
            ServiceBusReceivedMessage message = value.getMessage();
            String sequenceNumber = message != null ? String.valueOf(message.getSequenceNumber()) : "n/a";
            this.logger.atVerbose().addKeyValue("sequenceNumber", sequenceNumber).log("ON NEXT: Passing message downstream.");
            try {
                this.semaphore.acquire();
            }
            catch (InterruptedException e) {
                this.logger.info("Unable to acquire semaphore.", new Object[]{e});
            }
            try {
                this.downstream.onNext((Object)value);
                this.applyWithCatch(this.onComplete, value, "complete");
            }
            catch (Exception e) {
                this.logger.atError().addKeyValue("sequenceNumber", sequenceNumber).log("Error occurred processing message. Abandoning.", new Object[]{e});
                this.applyWithCatch(this.onAbandon, value, "abandon");
            }
            finally {
                this.logger.atVerbose().addKeyValue("sequenceNumber", sequenceNumber).log("ON NEXT: Finished.");
                this.semaphore.release();
            }
        }

        protected void hookOnError(Throwable throwable) {
            this.logger.error("Error occurred. Passing downstream.", new Object[]{throwable});
            this.downstream.onError(throwable);
        }

        protected void hookOnComplete() {
            this.downstream.onComplete();
        }

        public Context currentContext() {
            return this.downstream.currentContext();
        }

        private void applyWithCatch(Function<ServiceBusMessageContext, Mono<Void>> function, ServiceBusMessageContext context, String operation) {
            if (context.getMessage() != null && context.getMessage().isSettled()) {
                return;
            }
            try {
                function.apply(context).block();
            }
            catch (Exception e) {
                this.logger.warning("Unable to '{}' message.", new Object[]{operation, e});
                this.upstream().cancel();
                this.onError(e);
            }
        }
    }
}

