/*
 * Decompiled with CFR 0.152.
 */
package com.azure.core.amqp.implementation;

import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.implementation.AmqpEndpointStateUtil;
import com.azure.core.amqp.implementation.AmqpReceiveLink;
import com.azure.core.amqp.implementation.ReactorDispatcher;
import com.azure.core.amqp.implementation.TokenManager;
import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler;
import com.azure.core.util.logging.ClientLogger;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.message.Message;
import org.reactivestreams.Subscriber;
import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.ReplayProcessor;

public class ReactorReceiver
implements AmqpReceiveLink {
    private final AtomicBoolean hasAuthorized = new AtomicBoolean(true);
    private final String entityPath;
    private final Receiver receiver;
    private final ReceiveLinkHandler handler;
    private final TokenManager tokenManager;
    private final ReactorDispatcher dispatcher;
    private final Disposable subscriptions;
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final EmitterProcessor<Message> messagesProcessor;
    private final ClientLogger logger = new ClientLogger(ReactorReceiver.class);
    private final ReplayProcessor<AmqpEndpointState> endpointStates;
    private final AtomicReference<Supplier<Integer>> creditSupplier = new AtomicReference();

    protected ReactorReceiver(String entityPath, Receiver receiver, ReceiveLinkHandler handler, TokenManager tokenManager, ReactorDispatcher dispatcher) {
        this.entityPath = entityPath;
        this.receiver = receiver;
        this.handler = handler;
        this.tokenManager = tokenManager;
        this.dispatcher = dispatcher;
        this.messagesProcessor = (EmitterProcessor)this.handler.getDeliveredMessages().map(this::decodeDelivery).doOnNext(next -> {
            if (receiver.getRemoteCredit() == 0 && !this.isDisposed.get()) {
                Supplier<Integer> supplier = this.creditSupplier.get();
                if (supplier == null) {
                    return;
                }
                Integer credits = supplier.get();
                if (credits != null && credits > 0) {
                    this.addCredits(credits);
                }
            }
        }).subscribeWith((Subscriber)EmitterProcessor.create());
        this.endpointStates = (ReplayProcessor)this.handler.getEndpointStates().map(state -> {
            this.logger.verbose("connectionId[{}], path[{}], linkName[{}]: State {}", new Object[]{handler.getConnectionId(), entityPath, this.getLinkName(), state});
            return AmqpEndpointStateUtil.getConnectionState(state);
        }).subscribeWith((Subscriber)ReplayProcessor.cacheLastOrDefault((Object)((Object)AmqpEndpointState.UNINITIALIZED)));
        this.subscriptions = this.tokenManager.getAuthorizationResults().subscribe(response -> {
            this.logger.verbose("Token refreshed: {}", new Object[]{response});
            this.hasAuthorized.set(true);
        }, error -> {
            this.logger.info("connectionId[{}], path[{}], linkName[{}] - tokenRenewalFailure[{}]", new Object[]{handler.getConnectionId(), this.entityPath, this.getLinkName(), error.getMessage()});
            this.hasAuthorized.set(false);
        }, () -> this.hasAuthorized.set(false));
    }

    @Override
    public Flux<AmqpEndpointState> getEndpointStates() {
        return this.endpointStates.distinct();
    }

    @Override
    public Flux<Message> receive() {
        return this.messagesProcessor;
    }

    @Override
    public void addCredits(int credits) {
        if (!this.isDisposed.get()) {
            try {
                this.dispatcher.invoke(() -> this.receiver.flow(credits));
            }
            catch (IOException e) {
                this.logger.warning("Unable to schedule work to add more credits.", new Object[]{e});
            }
        }
    }

    @Override
    public void addCreditsInstantly(int credits) {
        this.receiver.flow(credits);
    }

    @Override
    public int getCredits() {
        return this.receiver.getRemoteCredit();
    }

    @Override
    public void setEmptyCreditListener(Supplier<Integer> creditSupplier) {
        Objects.requireNonNull(creditSupplier, "'creditSupplier' cannot be null.");
        this.creditSupplier.set(creditSupplier);
    }

    @Override
    public String getLinkName() {
        return this.receiver.getName();
    }

    @Override
    public String getEntityPath() {
        return this.entityPath;
    }

    @Override
    public String getHostname() {
        return this.handler.getHostname();
    }

    public boolean isDisposed() {
        return this.isDisposed.get();
    }

    public void dispose() {
        if (this.isDisposed.getAndSet(true)) {
            return;
        }
        this.subscriptions.dispose();
        this.messagesProcessor.onComplete();
        this.tokenManager.close();
        this.receiver.close();
        try {
            this.dispatcher.invoke(() -> {
                this.receiver.free();
                this.handler.close();
            });
        }
        catch (IOException e) {
            this.logger.warning("Could not schedule disposing of receiver on ReactorDispatcher.", new Object[]{e});
            this.handler.close();
        }
    }

    void dispose(ErrorCondition condition) {
        if (this.isDisposed.getAndSet(true)) {
            return;
        }
        this.logger.verbose("connectionId[{}], path[{}], linkName[{}]: setting error condition {}", new Object[]{this.handler.getConnectionId(), this.entityPath, this.getLinkName(), condition});
        if (this.receiver.getLocalState() != EndpointState.CLOSED) {
            this.receiver.close();
            if (this.receiver.getCondition() == null) {
                this.receiver.setCondition(condition);
            }
        }
        try {
            this.dispatcher.invoke(() -> {
                this.receiver.free();
                this.handler.close();
            });
        }
        catch (IOException e) {
            this.logger.warning("Could not schedule disposing of receiver on ReactorDispatcher.", new Object[]{e});
            this.handler.close();
        }
        this.messagesProcessor.onComplete();
        this.tokenManager.close();
    }

    protected Message decodeDelivery(Delivery delivery) {
        int messageSize = delivery.pending();
        byte[] buffer = new byte[messageSize];
        int read = this.receiver.recv(buffer, 0, messageSize);
        this.receiver.advance();
        Message message = Proton.message();
        message.decode(buffer, 0, read);
        delivery.settle();
        return message;
    }

    public String toString() {
        return String.format("link name: [%s], entity path: [%s]", this.receiver.getName(), this.entityPath);
    }
}

