/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.amqpbridge.impl;

import io.vertx.amqpbridge.impl.AmqpBridgeImpl;
import io.vertx.amqpbridge.impl.MessageTranslatorImpl;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.VertxException;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageProducer;
import io.vertx.core.json.JsonObject;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonSender;
import io.vertx.proton.impl.ProtonSenderImpl;

public class AmqpProducerImpl
implements MessageProducer<JsonObject> {
    private final ProtonSender sender;
    private final MessageTranslatorImpl translator = new MessageTranslatorImpl();
    private final AmqpBridgeImpl bridge;
    private final String amqpAddress;
    private boolean closed;
    private Handler<Throwable> exceptionHandler;
    private Handler<Void> drainHandler;
    private long remoteCredit = 0L;

    public AmqpProducerImpl(AmqpBridgeImpl bridge, ProtonConnection connection, String amqpAddress) {
        if (!bridge.onContextEventLoop()) {
            throw new IllegalStateException("Should be executing on the bridge context thread");
        }
        this.bridge = bridge;
        this.amqpAddress = amqpAddress;
        this.sender = connection.createSender(amqpAddress);
        this.sender.closeHandler(res -> {
            Handler<Throwable> eh = null;
            boolean closeSender = false;
            AmqpProducerImpl amqpProducerImpl = this;
            synchronized (amqpProducerImpl) {
                if (!this.closed && this.exceptionHandler != null) {
                    eh = this.exceptionHandler;
                }
                if (!this.closed) {
                    this.closed = true;
                    closeSender = true;
                }
            }
            if (eh != null) {
                if (res.succeeded()) {
                    eh.handle((Object)new VertxException("Producer closed remotely"));
                } else {
                    eh.handle((Object)new VertxException("Producer closed remotely with error", res.cause()));
                }
            }
            if (closeSender) {
                this.sender.close();
            }
        });
        this.sender.sendQueueDrainHandler(s -> {
            Handler<Void> dh = null;
            AmqpProducerImpl amqpProducerImpl = this;
            synchronized (amqpProducerImpl) {
                this.remoteCredit = ((ProtonSenderImpl)this.sender).getRemoteCredit();
                if (this.drainHandler != null) {
                    dh = this.drainHandler;
                }
            }
            if (dh != null) {
                dh.handle(null);
            }
        });
        this.sender.open();
    }

    public synchronized boolean writeQueueFull() {
        return this.remoteCredit <= 0L;
    }

    public MessageProducer<JsonObject> send(JsonObject messageBody) {
        return this.send(messageBody, (Handler)null);
    }

    public <R> MessageProducer<JsonObject> send(JsonObject messageBody, Handler<AsyncResult<Message<R>>> replyHandler) {
        return this.doSend(messageBody, replyHandler, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected <R> MessageProducer<JsonObject> doSend(JsonObject messageBody, Handler<AsyncResult<Message<R>>> replyHandler, String toAddress) {
        if (replyHandler != null) {
            this.bridge.verifyReplyToAddressAvailable();
        }
        org.apache.qpid.proton.message.Message msg = this.translator.convertToAmqpMessage(messageBody);
        if (toAddress != null) {
            msg.setAddress(toAddress);
        }
        AmqpProducerImpl amqpProducerImpl = this;
        synchronized (amqpProducerImpl) {
            --this.remoteCredit;
        }
        this.bridge.runOnContext(true, (Handler<Void>)((Handler)v -> {
            if (replyHandler != null) {
                this.bridge.registerReplyToHandler(msg, replyHandler);
            }
            this.sender.send(msg);
            AmqpProducerImpl amqpProducerImpl = this;
            synchronized (amqpProducerImpl) {
                this.remoteCredit = ((ProtonSenderImpl)this.sender).getRemoteCredit();
            }
        }));
        return this;
    }

    public synchronized MessageProducer<JsonObject> exceptionHandler(Handler<Throwable> handler) {
        this.exceptionHandler = handler;
        return this;
    }

    public MessageProducer<JsonObject> write(JsonObject data) {
        return this.send(data, (Handler)null);
    }

    public MessageProducer<JsonObject> setWriteQueueMaxSize(int maxSize) {
        return this;
    }

    public synchronized MessageProducer<JsonObject> drainHandler(Handler<Void> handler) {
        this.drainHandler = handler;
        return this;
    }

    public MessageProducer<JsonObject> deliveryOptions(DeliveryOptions options) {
        throw new UnsupportedOperationException("DeliveryOptions are not supported by this producer");
    }

    public String address() {
        return this.amqpAddress;
    }

    public void end() {
        this.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() {
        AmqpProducerImpl amqpProducerImpl = this;
        synchronized (amqpProducerImpl) {
            this.closed = true;
        }
        this.bridge.runOnContext(true, (Handler<Void>)((Handler)v -> this.sender.close()));
    }
}

