/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.servicebus;

import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.implementation.AmqpReceiveLink;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.logging.LoggingEventBuilder;
import com.azure.messaging.servicebus.ServiceBusSessionAcquirer;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusTracer;
import java.time.Duration;
import java.util.function.Supplier;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.message.Message;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

final class ServiceBusSessionReactorReceiver
implements AmqpReceiveLink {
    private final ClientLogger logger;
    private final String sessionId;
    private final AmqpReceiveLink sessionLink;
    private final boolean hasIdleTimeout;
    private final Sinks.Many<Boolean> nextItemIdleTimeoutSink = Sinks.many().multicast().onBackpressureBuffer();
    private final Sinks.Empty<Void> terminateEndpointStatesSink = Sinks.empty();
    private final Disposable.Composite disposables = Disposables.composite();

    ServiceBusSessionReactorReceiver(ClientLogger logger, ServiceBusTracer tracer, ServiceBusSessionAcquirer.Session session, Duration sessionIdleTimeout, Duration maxSessionLockRenew) {
        this.logger = logger;
        this.sessionId = session.getId();
        this.sessionLink = session.getLink();
        boolean bl = this.hasIdleTimeout = sessionIdleTimeout != null;
        if (this.hasIdleTimeout) {
            this.disposables.add(Flux.switchOnNext((Publisher)this.nextItemIdleTimeoutSink.asFlux().map(__ -> Mono.delay((Duration)sessionIdleTimeout))).subscribe(v -> {
                this.withLinkInfo(logger.atInfo()).addKeyValue("timeout", (Object)sessionIdleTimeout).log("Did not a receive message within timeout.");
                this.terminateEndpointStatesSink.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST);
            }));
        }
        this.disposables.add(session.beginLockRenew(tracer, maxSessionLockRenew));
    }

    public String getSessionId() {
        return this.sessionId;
    }

    public String getHostname() {
        return this.sessionLink.getHostname();
    }

    public String getConnectionId() {
        return this.sessionLink.getConnectionId();
    }

    public String getLinkName() {
        return this.sessionLink.getLinkName();
    }

    public String getEntityPath() {
        return this.sessionLink.getEntityPath();
    }

    public Flux<AmqpEndpointState> getEndpointStates() {
        Flux endpointStates = this.hasIdleTimeout ? this.sessionLink.getEndpointStates().takeUntilOther((Publisher)this.terminateEndpointStatesSink.asMono()) : this.sessionLink.getEndpointStates();
        return endpointStates.onErrorResume(e -> {
            this.withLinkInfo(this.logger.atWarning()).log("Error occurred. Ending session {}.", new Object[]{this.sessionId, e});
            return Mono.empty();
        });
    }

    public Flux<Message> receive() {
        if (this.hasIdleTimeout) {
            Mono beginIdleTimer = Mono.defer(() -> {
                this.nextItemIdleTimeoutSink.emitNext((Object)true, Sinks.EmitFailureHandler.FAIL_FAST);
                return Mono.empty();
            });
            Flux messages = this.sessionLink.receive();
            return beginIdleTimer.thenMany((Publisher)messages).doOnNext(m -> this.nextItemIdleTimeoutSink.emitNext((Object)true, Sinks.EmitFailureHandler.FAIL_FAST));
        }
        return this.sessionLink.receive();
    }

    public Mono<Void> updateDisposition(String deliveryTag, DeliveryState deliveryState) {
        return this.sessionLink.updateDisposition(deliveryTag, deliveryState);
    }

    public void addCredit(Supplier<Long> creditSupplier) {
        this.sessionLink.addCredit(creditSupplier);
    }

    public void dispose() {
        this.disposables.dispose();
        this.sessionLink.dispose();
    }

    public Mono<Void> closeAsync() {
        this.disposables.dispose();
        return this.sessionLink.closeAsync();
    }

    public Mono<Void> addCredits(int credits) {
        return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new UnsupportedOperationException("addCredits(int) should not be called in V2 route."));
    }

    public int getCredits() {
        throw this.logger.logExceptionAsError((RuntimeException)new UnsupportedOperationException("getCredits() should not be called in V2 route."));
    }

    public void setEmptyCreditListener(Supplier<Integer> creditSupplier) {
        throw this.logger.logExceptionAsError((RuntimeException)new UnsupportedOperationException("setEmptyCreditListener should not be called in V2 route."));
    }

    private LoggingEventBuilder withLinkInfo(LoggingEventBuilder builder) {
        return builder.addKeyValue("sessionId", this.sessionId).addKeyValue("linkName", this.getLinkName());
    }
}

