/*
 * Decompiled with CFR 0.152.
 */
package com.azure.core.amqp.implementation.handler;

import com.azure.core.amqp.implementation.AmqpLoggingUtils;
import com.azure.core.amqp.implementation.handler.LinkHandler;
import com.azure.core.util.logging.LoggingEventBuilder;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Sender;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

public class SendLinkHandler
extends LinkHandler {
    private final String linkName;
    private final String entityPath;
    private final AtomicBoolean isRemoteActive = new AtomicBoolean();
    private final AtomicBoolean isTerminated = new AtomicBoolean();
    private final Sinks.Many<Integer> creditProcessor = Sinks.many().unicast().onBackpressureBuffer();
    private final Sinks.Many<Delivery> deliveryProcessor = Sinks.many().multicast().onBackpressureBuffer();

    public SendLinkHandler(String connectionId, String hostname, String linkName, String entityPath) {
        super(connectionId, hostname, entityPath);
        this.linkName = Objects.requireNonNull(linkName, "'linkName' cannot be null.");
        this.entityPath = entityPath;
    }

    public String getLinkName() {
        return this.linkName;
    }

    public Flux<Integer> getLinkCredits() {
        return this.creditProcessor.asFlux();
    }

    public Flux<Delivery> getDeliveredMessages() {
        return this.deliveryProcessor.asFlux();
    }

    @Override
    public void close() {
        if (this.isTerminated.getAndSet(true)) {
            return;
        }
        this.creditProcessor.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
        this.deliveryProcessor.emitComplete((signalType, emitResult) -> {
            AmqpLoggingUtils.addSignalTypeAndResult(this.logger.atVerbose(), signalType, emitResult).addKeyValue("linkName", this.linkName).addKeyValue("entityPath", this.entityPath).log("Unable to emit complete on deliverySink.");
            return false;
        });
        this.onNext(EndpointState.CLOSED);
    }

    public void onLinkLocalOpen(Event event) {
        Link link = event.getLink();
        if (link instanceof Sender) {
            this.logger.atVerbose().addKeyValue("linkName", link.getName()).addKeyValue("entityPath", this.entityPath).addKeyValue("localTarget", (Object)link.getTarget()).log("onLinkLocalOpen");
        }
    }

    public void onLinkRemoteOpen(Event event) {
        Link link = event.getLink();
        if (!(link instanceof Sender)) {
            return;
        }
        LoggingEventBuilder logBuilder = this.logger.atInfo().addKeyValue("linkName", link.getName()).addKeyValue("entityPath", this.entityPath);
        if (link.getRemoteTarget() != null) {
            logBuilder.addKeyValue("remoteTarget", (Object)link.getRemoteTarget());
            if (!this.isRemoteActive.getAndSet(true)) {
                this.onNext(EndpointState.ACTIVE);
            }
        } else {
            logBuilder.addKeyValue("remoteTarget", "n/a").addKeyValue("action", "waitingForError");
        }
        logBuilder.log("onLinkRemoteOpen");
    }

    public void onLinkFlow(Event event) {
        if (!this.isRemoteActive.getAndSet(true)) {
            this.onNext(EndpointState.ACTIVE);
        }
        Sender sender = event.getSender();
        int credits = sender.getRemoteCredit();
        this.creditProcessor.emitNext((Object)credits, (signalType, emitResult) -> {
            this.logger.atVerbose().addKeyValue("linkName", this.linkName).addKeyValue("emitResult", (Object)emitResult).addKeyValue("credits", (long)credits).log("Unable to emit credits.");
            return false;
        });
        this.logger.atVerbose().addKeyValue("linkName", this.linkName).addKeyValue("unsettled", (long)sender.getUnsettled()).addKeyValue("credits", (long)credits).log("onLinkFlow.");
    }

    @Override
    public void onLinkLocalClose(Event event) {
        super.onLinkLocalClose(event);
        if (!this.isRemoteActive.get()) {
            this.logger.atInfo().addKeyValue("linkName", this.getLinkName()).addKeyValue("entityPath", this.entityPath).log("Sender link was never active. Closing endpoint states.");
            super.close();
        }
    }

    public void onDelivery(Event event) {
        Delivery delivery = event.getDelivery();
        while (delivery != null) {
            Sender sender = (Sender)delivery.getLink();
            String deliveryTag = new String(delivery.getTag(), StandardCharsets.UTF_8);
            this.logger.atVerbose().addKeyValue("linkName", this.getLinkName()).addKeyValue("unsettled", (long)sender.getUnsettled()).addKeyValue("credit", (long)sender.getRemoteCredit()).addKeyValue("deliveryState", (Object)delivery.getRemoteState()).addKeyValue("delivery.isBuffered", delivery.isBuffered()).addKeyValue("delivery.id", deliveryTag).log("onDelivery");
            this.deliveryProcessor.emitNext((Object)delivery, (signalType, emitResult) -> {
                this.logger.atWarning().addKeyValue("linkName", this.getLinkName()).addKeyValue("emitResult", (Object)emitResult).addKeyValue("delivery.id", deliveryTag).log("Unable to emit delivery.");
                return emitResult == Sinks.EmitResult.FAIL_OVERFLOW;
            });
            delivery.settle();
            delivery = sender.current();
        }
    }
}

