/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.amqp.impl;

import io.vertx.amqp.AmqpConnection;
import io.vertx.amqp.AmqpMessage;
import io.vertx.amqp.AmqpSender;
import io.vertx.amqp.impl.AmqpConnectionImpl;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.proton.ProtonSender;
import io.vertx.proton.impl.ProtonSenderImpl;

public class AmqpSenderImpl
implements AmqpSender {
    private static final Logger LOGGER = LoggerFactory.getLogger(AmqpSender.class);
    private final ProtonSender sender;
    private final AmqpConnectionImpl connection;
    private boolean closed;
    private Handler<Throwable> exceptionHandler;
    private Handler<Void> drainHandler;
    private long remoteCredit = 0L;

    private AmqpSenderImpl(ProtonSender sender, AmqpConnectionImpl connection, Handler<AsyncResult<AmqpSender>> completionHandler) {
        this.sender = sender;
        this.connection = connection;
        ((ProtonSender)sender.closeHandler(res -> this.onClose(sender, (AsyncResult<ProtonSender>)res, false))).detachHandler(res -> this.onClose(sender, (AsyncResult<ProtonSender>)res, true));
        sender.sendQueueDrainHandler(s -> {
            Handler<Void> dh = null;
            AmqpSenderImpl amqpSenderImpl = this;
            synchronized (amqpSenderImpl) {
                this.remoteCredit = ((ProtonSenderImpl)sender).getRemoteCredit();
                if (this.drainHandler != null) {
                    dh = this.drainHandler;
                }
            }
            if (dh != null) {
                dh.handle(null);
            }
        });
        sender.openHandler(done -> {
            if (done.failed()) {
                completionHandler.handle((Object)done.mapEmpty());
            } else {
                connection.register(this);
                completionHandler.handle((Object)Future.succeededFuture((Object)this));
            }
        });
        sender.open();
    }

    static void create(ProtonSender sender, AmqpConnectionImpl connection, Handler<AsyncResult<AmqpSender>> completionHandler) {
        new AmqpSenderImpl(sender, connection, completionHandler);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void onClose(ProtonSender sender, AsyncResult<ProtonSender> res, boolean detach) {
        Handler<Throwable> eh = null;
        boolean closeSender = false;
        AmqpSenderImpl amqpSenderImpl = this;
        synchronized (amqpSenderImpl) {
            if (!this.closed && this.exceptionHandler != null) {
                eh = this.exceptionHandler;
            }
            if (!this.closed) {
                this.closed = true;
                closeSender = true;
            }
        }
        if (eh != null) {
            if (res.succeeded()) {
                eh.handle((Object)new Exception("Sender closed remotely"));
            } else {
                eh.handle((Object)new Exception("Sender closed remotely with error", res.cause()));
            }
        }
        if (closeSender) {
            if (detach) {
                sender.detach();
            } else {
                sender.close();
            }
        }
    }

    public synchronized boolean writeQueueFull() {
        return this.remoteCredit <= 0L;
    }

    @Override
    public AmqpConnection connection() {
        return this.connection;
    }

    @Override
    public AmqpSender send(AmqpMessage message) {
        return this.doSend(message, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private AmqpSender doSend(AmqpMessage message, Handler<AsyncResult<Void>> acknowledgmentHandler) {
        AmqpMessage updated = message.address() == null ? AmqpMessage.create(message).address(this.address()).build() : message;
        Handler ack = delivery -> {
            Handler handler = acknowledgmentHandler;
            if (acknowledgmentHandler == null) {
                handler = ar -> {
                    if (ar.failed()) {
                        LOGGER.warn((Object)"Message rejected by remote peer", ar.cause());
                    }
                };
            }
            switch (delivery.getRemoteState().getType()) {
                case Rejected: {
                    handler.handle((Object)Future.failedFuture((String)"message rejected (REJECTED"));
                    break;
                }
                case Modified: {
                    handler.handle((Object)Future.failedFuture((String)"message rejected (MODIFIED)"));
                    break;
                }
                case Released: {
                    handler.handle((Object)Future.failedFuture((String)"message rejected (RELEASED)"));
                    break;
                }
                case Accepted: {
                    handler.handle((Object)Future.succeededFuture());
                    break;
                }
                default: {
                    handler.handle((Object)Future.failedFuture((String)("Unsupported delivery type: " + delivery.getRemoteState().getType())));
                }
            }
        };
        AmqpSenderImpl amqpSenderImpl = this;
        synchronized (amqpSenderImpl) {
            --this.remoteCredit;
        }
        this.connection.runWithTrampoline((Handler<Void>)((Handler)x -> {
            this.sender.send(updated.unwrap(), ack);
            AmqpSenderImpl amqpSenderImpl = this;
            synchronized (amqpSenderImpl) {
                this.remoteCredit = ((ProtonSenderImpl)this.sender).getRemoteCredit();
            }
        }));
        return this;
    }

    @Override
    public synchronized AmqpSender exceptionHandler(Handler<Throwable> handler) {
        this.exceptionHandler = handler;
        return this;
    }

    @Override
    public AmqpSender write(AmqpMessage data) {
        return this.doSend(data, null);
    }

    @Override
    public AmqpSender write(AmqpMessage data, Handler<AsyncResult<Void>> handler) {
        return this.doSend(data, handler);
    }

    @Override
    public AmqpSender setWriteQueueMaxSize(int maxSize) {
        return this;
    }

    public void end() {
        this.close(null);
    }

    public void end(Handler<AsyncResult<Void>> handler) {
        this.close(handler);
    }

    public synchronized AmqpSender drainHandler(Handler<Void> handler) {
        this.drainHandler = handler;
        return this;
    }

    @Override
    public AmqpSender sendWithAck(AmqpMessage message, Handler<AsyncResult<Void>> acknowledgementHandler) {
        return this.doSend(message, acknowledgementHandler);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close(Handler<AsyncResult<Void>> handler) {
        Handler<AsyncResult<Void>> actualHandler = handler == null ? x -> {} : handler;
        AmqpSenderImpl amqpSenderImpl = this;
        synchronized (amqpSenderImpl) {
            if (this.closed) {
                actualHandler.handle((Object)Future.succeededFuture());
                return;
            }
            this.closed = true;
        }
        this.connection.unregister(this);
        this.connection.runWithTrampoline((Handler<Void>)((Handler)x -> {
            if (this.sender.isOpen()) {
                try {
                    ((ProtonSender)this.sender.closeHandler(v -> actualHandler.handle((Object)v.mapEmpty()))).close();
                }
                catch (Exception e) {
                    actualHandler.handle((Object)Future.failedFuture((Throwable)e));
                }
            } else {
                actualHandler.handle((Object)Future.succeededFuture());
            }
        }));
    }

    @Override
    public String address() {
        return this.sender.getRemoteAddress();
    }
}

