/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.amqp.impl;

import io.vertx.amqp.AmqpClientOptions;
import io.vertx.amqp.AmqpConnection;
import io.vertx.amqp.AmqpReceiver;
import io.vertx.amqp.AmqpReceiverOptions;
import io.vertx.amqp.AmqpSender;
import io.vertx.amqp.AmqpSenderOptions;
import io.vertx.amqp.impl.AmqpClientImpl;
import io.vertx.amqp.impl.AmqpReceiverImpl;
import io.vertx.amqp.impl.AmqpSenderImpl;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.proton.ProtonClient;
import io.vertx.proton.ProtonClientOptions;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonLinkOptions;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonReceiver;
import io.vertx.proton.ProtonSender;
import io.vertx.proton.impl.ProtonConnectionImpl;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.engine.EndpointState;

public class AmqpConnectionImpl
implements AmqpConnection {
    public static final String PRODUCT = "vertx-amqp-client";
    public static final Symbol PRODUCT_KEY = Symbol.valueOf((String)"product");
    private final AmqpClientOptions options;
    private final AtomicBoolean closed = new AtomicBoolean();
    private final AtomicReference<ProtonConnection> connection = new AtomicReference();
    private final Context context;
    private final List<AmqpSender> senders = new CopyOnWriteArrayList<AmqpSender>();
    private final List<AmqpReceiver> receivers = new CopyOnWriteArrayList<AmqpReceiver>();
    private Handler<Throwable> exceptionHandler;

    AmqpConnectionImpl(Context context, AmqpClientImpl client, AmqpClientOptions options, ProtonClient proton, Handler<AsyncResult<AmqpConnection>> connectionHandler) {
        this.options = options;
        this.context = context;
        this.runOnContext((Handler<Void>)((Handler)x -> this.connect(client, Objects.requireNonNull(proton, "proton cannot be `null`"), (Handler<AsyncResult<AmqpConnection>>)Objects.requireNonNull(connectionHandler, "connection handler cannot be `null`"))));
    }

    private void connect(AmqpClientImpl client, ProtonClient proton, Handler<AsyncResult<AmqpConnection>> connectionHandler) {
        proton.connect((ProtonClientOptions)this.options, this.options.getHost(), this.options.getPort(), this.options.getUsername(), this.options.getPassword(), ar -> {
            if (ar.succeeded()) {
                if (!this.connection.compareAndSet(null, (ProtonConnection)ar.result())) {
                    connectionHandler.handle((Object)Future.failedFuture((String)"Unable to connect - already holding a connection"));
                    return;
                }
                HashMap<Symbol, String> map = new HashMap<Symbol, String>();
                map.put(PRODUCT_KEY, PRODUCT);
                if (this.options.getContainerId() != null) {
                    this.connection.get().setContainer(this.options.getContainerId());
                }
                if (this.options.getVirtualHost() != null) {
                    this.connection.get().setHostname(this.options.getVirtualHost());
                }
                this.connection.get().setProperties(map).disconnectHandler(ignored -> {
                    try {
                        this.onDisconnect();
                    }
                    finally {
                        this.closed.set(true);
                    }
                }).closeHandler(x -> {
                    try {
                        this.onDisconnect();
                    }
                    finally {
                        this.closed.set(true);
                    }
                }).openHandler(conn -> {
                    if (conn.succeeded()) {
                        client.register(this);
                        this.closed.set(false);
                        connectionHandler.handle((Object)Future.succeededFuture((Object)this));
                    } else {
                        this.closed.set(true);
                        connectionHandler.handle((Object)conn.mapEmpty());
                    }
                });
                this.connection.get().open();
            } else {
                connectionHandler.handle((Object)ar.mapEmpty());
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void onDisconnect() {
        Handler<Throwable> h = null;
        ProtonConnection conn = this.connection.getAndSet(null);
        AmqpConnectionImpl amqpConnectionImpl = this;
        synchronized (amqpConnectionImpl) {
            if (this.exceptionHandler != null) {
                h = this.exceptionHandler;
            }
        }
        if (h != null) {
            String message = this.getErrorMessage(conn);
            h.handle((Object)new Exception(message));
        }
    }

    private String getErrorMessage(ProtonConnection conn) {
        String message = "Connection disconnected";
        if (conn != null) {
            if (conn.getCondition() != null && conn.getCondition().getDescription() != null) {
                message = message + " - " + conn.getCondition().getDescription();
            } else if (conn.getRemoteCondition() != null && conn.getRemoteCondition().getDescription() != null) {
                message = message + " - " + conn.getRemoteCondition().getDescription();
            }
        }
        return message;
    }

    void runOnContext(Handler<Void> action) {
        this.context.runOnContext(action);
    }

    void runWithTrampoline(Handler<Void> action) {
        if (Vertx.currentContext() == this.context) {
            action.handle(null);
        } else {
            this.runOnContext(action);
        }
    }

    private boolean isLocalOpen() {
        ProtonConnection conn = this.connection.get();
        return conn != null && ((ProtonConnectionImpl)conn).getLocalState() == EndpointState.ACTIVE;
    }

    private boolean isRemoteOpen() {
        ProtonConnection conn = this.connection.get();
        return conn != null && ((ProtonConnectionImpl)conn).getRemoteState() == EndpointState.ACTIVE;
    }

    @Override
    public synchronized AmqpConnection exceptionHandler(Handler<Throwable> handler) {
        this.exceptionHandler = handler;
        return this;
    }

    @Override
    public AmqpConnection close(Handler<AsyncResult<Void>> done) {
        this.context.runOnContext(ignored -> {
            ProtonConnection actualConnection = this.connection.get();
            if (actualConnection == null || this.closed.get() || !this.isLocalOpen() && !this.isRemoteOpen()) {
                if (done != null) {
                    done.handle((Object)Future.succeededFuture());
                }
                return;
            }
            this.closed.set(true);
            Future future = Future.future();
            if (done != null) {
                future.onComplete(done);
            }
            if (actualConnection.isDisconnected()) {
                future.complete();
            } else {
                try {
                    actualConnection.disconnectHandler(con -> {
                        future.tryFail(this.getErrorMessage((ProtonConnection)con));
                        this.closed.set(true);
                    }).closeHandler(res -> {
                        this.closed.set(true);
                        if (res.succeeded()) {
                            future.tryComplete();
                        } else {
                            future.tryFail(res.cause());
                        }
                    }).close();
                }
                catch (Exception e) {
                    future.fail((Throwable)e);
                }
            }
        });
        return this;
    }

    void unregister(AmqpSender sender) {
        this.senders.remove(sender);
    }

    void unregister(AmqpReceiver receiver) {
        this.receivers.remove(receiver);
    }

    @Override
    public AmqpConnection createDynamicReceiver(Handler<AsyncResult<AmqpReceiver>> completionHandler) {
        return this.createReceiver(null, new AmqpReceiverOptions().setDynamic(true), completionHandler);
    }

    @Override
    public AmqpConnection createReceiver(String address, Handler<AsyncResult<AmqpReceiver>> completionHandler) {
        ProtonLinkOptions opts = new ProtonLinkOptions();
        this.runWithTrampoline((Handler<Void>)((Handler)x -> {
            ProtonConnection conn = this.connection.get();
            if (conn == null) {
                completionHandler.handle((Object)Future.failedFuture((String)"Not connected"));
            } else {
                ProtonReceiver receiver = conn.createReceiver(address, opts);
                new AmqpReceiverImpl(Objects.requireNonNull(address, "The address must not be `null`"), this, new AmqpReceiverOptions(), receiver, (Handler<AsyncResult<AmqpReceiver>>)Objects.requireNonNull(completionHandler, "The completion handler must not be `null`"));
            }
        }));
        return this;
    }

    @Override
    public AmqpConnection createReceiver(String address, AmqpReceiverOptions receiverOptions, Handler<AsyncResult<AmqpReceiver>> completionHandler) {
        ProtonLinkOptions opts = new ProtonLinkOptions();
        AmqpReceiverOptions recOpts = receiverOptions == null ? new AmqpReceiverOptions() : receiverOptions;
        opts.setDynamic(recOpts.isDynamic()).setLinkName(recOpts.getLinkName());
        this.runWithTrampoline((Handler<Void>)((Handler)v -> {
            ProtonConnection conn = this.connection.get();
            if (conn == null) {
                completionHandler.handle((Object)Future.failedFuture((String)"Not connected"));
            } else {
                ProtonReceiver receiver = conn.createReceiver(address, opts);
                if (receiverOptions != null) {
                    if (receiverOptions.getQos() != null) {
                        receiver.setQoS(ProtonQoS.valueOf((String)receiverOptions.getQos().toUpperCase()));
                    }
                    this.configureTheSource(recOpts, receiver);
                }
                new AmqpReceiverImpl(address, this, recOpts, receiver, completionHandler);
            }
        }));
        return this;
    }

    private void configureTheSource(AmqpReceiverOptions receiverOptions, ProtonReceiver receiver) {
        Source source = (Source)receiver.getSource();
        List<String> capabilities = receiverOptions.getCapabilities();
        if (!capabilities.isEmpty()) {
            source.setCapabilities((Symbol[])capabilities.stream().map(Symbol::valueOf).toArray(Symbol[]::new));
        }
        if (receiverOptions.isDurable()) {
            source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
            source.setDurable(TerminusDurability.UNSETTLED_STATE);
        }
    }

    @Override
    public AmqpConnection createSender(String address, Handler<AsyncResult<AmqpSender>> completionHandler) {
        Objects.requireNonNull(address, "The address must be set");
        return this.createSender(address, new AmqpSenderOptions(), completionHandler);
    }

    @Override
    public AmqpConnection createSender(String address, AmqpSenderOptions options, Handler<AsyncResult<AmqpSender>> completionHandler) {
        if (address == null && !options.isDynamic()) {
            throw new IllegalArgumentException("Address must be set if the link is not dynamic");
        }
        Objects.requireNonNull(completionHandler, "The completion handler must be set");
        this.runWithTrampoline((Handler<Void>)((Handler)x -> {
            ProtonConnection conn = this.connection.get();
            if (conn == null) {
                completionHandler.handle((Object)Future.failedFuture((String)"Not connected"));
            } else {
                ProtonSender sender;
                if (options != null) {
                    ProtonLinkOptions opts = new ProtonLinkOptions();
                    opts.setLinkName(options.getLinkName());
                    opts.setDynamic(options.isDynamic());
                    sender = conn.createSender(address, opts);
                    sender.setAutoDrained(options.isAutoDrained());
                } else {
                    sender = conn.createSender(address);
                }
                AmqpSenderImpl.create(sender, this, completionHandler);
            }
        }));
        return this;
    }

    @Override
    public AmqpConnection createAnonymousSender(Handler<AsyncResult<AmqpSender>> completionHandler) {
        Objects.requireNonNull(completionHandler, "The completion handler must be set");
        this.runWithTrampoline((Handler<Void>)((Handler)x -> {
            ProtonConnection conn = this.connection.get();
            if (conn == null) {
                completionHandler.handle((Object)Future.failedFuture((String)"Not connected"));
            } else {
                ProtonSender sender = conn.createSender(null);
                AmqpSenderImpl.create(sender, this, completionHandler);
            }
        }));
        return this;
    }

    @Override
    public boolean isDisconnected() {
        ProtonConnection current = this.connection.get();
        if (current != null) {
            return current.isDisconnected();
        }
        return true;
    }

    public ProtonConnection unwrap() {
        return this.connection.get();
    }

    public AmqpClientOptions options() {
        return this.options;
    }

    void register(AmqpSenderImpl sender) {
        this.senders.add(sender);
    }

    void register(AmqpReceiverImpl receiver) {
        this.receivers.add(receiver);
    }
}

