/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.rabbitmq.impl;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.GetResponse;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.rabbitmq.RabbitMQClient;
import io.vertx.rabbitmq.impl.ConsumerHandler;
import io.vertx.rabbitmq.impl.Utils;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

public class RabbitMQClientImpl
implements RabbitMQClient,
ShutdownListener {
    private static final Logger log = LoggerFactory.getLogger(RabbitMQClientImpl.class);
    private final Vertx vertx;
    private final JsonObject config;
    private final Integer retries;
    private final boolean includeProperties;
    private Connection connection;
    private Channel channel;

    public RabbitMQClientImpl(Vertx vertx, JsonObject config) {
        this.vertx = vertx;
        this.config = config;
        this.retries = config.getInteger("connectionRetries", null);
        this.includeProperties = config.getBoolean("includeProperties", Boolean.valueOf(false));
    }

    private static Connection newConnection(JsonObject config) throws IOException, TimeoutException {
        Integer networkRecoveryInterval;
        Integer requestedChannelMax;
        Integer handshakeTimeout;
        Integer requestedHeartbeat;
        ConnectionFactory cf = new ConnectionFactory();
        String uri = config.getString("uri");
        if (uri != null) {
            try {
                cf.setUri(uri);
            }
            catch (Exception e) {
                throw new IllegalArgumentException("Invalid rabbitmq connection uri " + uri);
            }
        } else {
            String virtualHost;
            Integer port;
            String host;
            String password;
            String user = config.getString("user");
            if (user != null) {
                cf.setUsername(user);
            }
            if ((password = config.getString("password")) != null) {
                cf.setPassword(password);
            }
            if ((host = config.getString("host")) != null) {
                cf.setHost(host);
            }
            if ((port = config.getInteger("port")) != null) {
                cf.setPort(port.intValue());
            }
            if ((virtualHost = config.getString("virtualHost")) != null) {
                cf.setVirtualHost(virtualHost);
            }
        }
        Integer connectionTimeout = config.getInteger("connectionTimeout");
        if (connectionTimeout != null) {
            cf.setConnectionTimeout(connectionTimeout.intValue());
        }
        if ((requestedHeartbeat = config.getInteger("requestedHeartbeat")) != null) {
            cf.setRequestedHeartbeat(requestedHeartbeat.intValue());
        }
        if ((handshakeTimeout = config.getInteger("handshakeTimeout")) != null) {
            cf.setHandshakeTimeout(handshakeTimeout.intValue());
        }
        if ((requestedChannelMax = config.getInteger("requestedChannelMax")) != null) {
            cf.setRequestedChannelMax(requestedChannelMax.intValue());
        }
        if ((networkRecoveryInterval = config.getInteger("networkRecoveryInterval")) != null) {
            cf.setNetworkRecoveryInterval(networkRecoveryInterval.intValue());
        }
        boolean automaticRecoveryEnabled = config.getBoolean("automaticRecoveryEnabled", Boolean.valueOf(true));
        cf.setAutomaticRecoveryEnabled(automaticRecoveryEnabled);
        return cf.newConnection();
    }

    @Override
    public boolean isConnected() {
        boolean connected = false;
        if (this.connection != null && this.connection.isOpen()) {
            connected = true;
        }
        return connected;
    }

    @Override
    public boolean isOpenChannel() {
        return this.channel != null && this.channel.isOpen();
    }

    @Override
    public void basicAck(long deliveryTag, boolean multiple, Handler<AsyncResult<JsonObject>> resultHandler) {
        this.forChannel(resultHandler, channel -> {
            channel.basicAck(deliveryTag, multiple);
            return null;
        });
    }

    @Override
    public void basicNack(long deliveryTag, boolean multiple, boolean requeue, Handler<AsyncResult<JsonObject>> resultHandler) {
        this.forChannel(resultHandler, channel -> {
            channel.basicNack(deliveryTag, multiple, requeue);
            return null;
        });
    }

    @Override
    public void basicConsume(String queue, String address, Handler<AsyncResult<Void>> resultHandler) {
        this.basicConsume(queue, address, true, resultHandler);
    }

    @Override
    public void basicConsume(String queue, String address, boolean autoAck, Handler<AsyncResult<Void>> resultHandler) {
        this.forChannel(resultHandler, channel -> {
            channel.basicConsume(queue, autoAck, (Consumer)new ConsumerHandler(this.vertx, channel, this.includeProperties, (Handler<AsyncResult<JsonObject>>)((Handler)ar -> {
                if (ar.succeeded()) {
                    this.vertx.eventBus().send(address, ar.result());
                } else {
                    log.error((Object)"Exception occurred inside rabbitmq service consumer.", ar.cause());
                }
            })));
            return null;
        });
    }

    @Override
    public void basicGet(String queue, boolean autoAck, Handler<AsyncResult<JsonObject>> resultHandler) {
        this.forChannel(resultHandler, channel -> {
            GetResponse response = channel.basicGet(queue, autoAck);
            if (response == null) {
                return null;
            }
            JsonObject json = new JsonObject();
            Utils.populate(json, response.getEnvelope());
            if (this.includeProperties) {
                Utils.put("properties", Utils.toJson(response.getProps()), json);
            }
            Utils.put("body", Utils.parse(response.getProps(), response.getBody()), json);
            Utils.put("messageCount", response.getMessageCount(), json);
            return json;
        });
    }

    @Override
    public void basicPublish(String exchange, String routingKey, JsonObject message, Handler<AsyncResult<Void>> resultHandler) {
        this.forChannel(resultHandler, channel -> {
            byte[] body;
            String encoding;
            JsonObject properties = message.getJsonObject("properties");
            String contentType = properties == null ? null : properties.getString("contentType");
            String string = encoding = properties == null ? null : properties.getString("contentEncoding");
            if (contentType != null) {
                switch (contentType) {
                    case "application/json": {
                        body = Utils.encode(encoding, message.getJsonObject("body").toString());
                        break;
                    }
                    case "application/octet-stream": {
                        body = message.getBinary("body");
                        break;
                    }
                    default: {
                        body = Utils.encode(encoding, message.getString("body"));
                        break;
                    }
                }
            } else {
                body = Utils.encode(encoding, message.getString("body"));
            }
            channel.basicPublish(exchange, routingKey, Utils.fromJson(properties), body);
            return null;
        });
    }

    @Override
    public void basicQos(int prefetchCount, Handler<AsyncResult<Void>> resultHandler) {
        this.forChannel(resultHandler, channel -> {
            channel.basicQos(prefetchCount);
            return null;
        });
    }

    @Override
    public void exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, Handler<AsyncResult<Void>> resultHandler) {
        this.forChannel(resultHandler, channel -> {
            channel.exchangeDeclare(exchange, type, durable, autoDelete, null);
            return null;
        });
    }

    @Override
    public void exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, Map<String, String> config, Handler<AsyncResult<Void>> resultHandler) {
        HashMap transformedMap = new HashMap();
        config.forEach(transformedMap::put);
        this.forChannel(resultHandler, channel -> {
            channel.exchangeDeclare(exchange, type, durable, autoDelete, transformedMap);
            return null;
        });
    }

    @Override
    public void exchangeDelete(String exchange, Handler<AsyncResult<Void>> resultHandler) {
        this.forChannel(resultHandler, channel -> {
            channel.exchangeDelete(exchange);
            return null;
        });
    }

    @Override
    public void exchangeBind(String destination, String source, String routingKey, Handler<AsyncResult<Void>> resultHandler) {
        this.forChannel(resultHandler, channel -> {
            channel.exchangeBind(destination, source, routingKey);
            return null;
        });
    }

    @Override
    public void exchangeUnbind(String destination, String source, String routingKey, Handler<AsyncResult<Void>> resultHandler) {
        this.forChannel(resultHandler, channel -> {
            channel.exchangeUnbind(destination, source, routingKey);
            return null;
        });
    }

    @Override
    public void queueDeclareAuto(Handler<AsyncResult<JsonObject>> resultHandler) {
        this.forChannel(resultHandler, channel -> {
            AMQP.Queue.DeclareOk result = channel.queueDeclare();
            return Utils.toJson(result);
        });
    }

    @Override
    public void queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Handler<AsyncResult<JsonObject>> resultHandler) {
        this.forChannel(resultHandler, channel -> {
            AMQP.Queue.DeclareOk result = channel.queueDeclare(queue, durable, exclusive, autoDelete, null);
            return Utils.toJson(result);
        });
    }

    @Override
    public void queueDelete(String queue, Handler<AsyncResult<JsonObject>> resultHandler) {
        this.forChannel(resultHandler, channel -> {
            AMQP.Queue.DeleteOk result = channel.queueDelete(queue);
            return Utils.toJson(result);
        });
    }

    @Override
    public void queueDeleteIf(String queue, boolean ifUnused, boolean ifEmpty, Handler<AsyncResult<JsonObject>> resultHandler) {
        this.forChannel(resultHandler, channel -> {
            AMQP.Queue.DeleteOk result = channel.queueDelete(queue, ifUnused, ifEmpty);
            return Utils.toJson(result);
        });
    }

    @Override
    public void queueBind(String queue, String exchange, String routingKey, Handler<AsyncResult<Void>> resultHandler) {
        this.forChannel(resultHandler, channel -> {
            channel.queueBind(queue, exchange, routingKey);
            return null;
        });
    }

    @Override
    public void messageCount(String queue, Handler<AsyncResult<JsonObject>> resultHandler) {
        this.forChannel(resultHandler, channel -> {
            Long result = channel.messageCount(queue);
            return new JsonObject().put("messageCount", result);
        });
    }

    @Override
    public void start(Handler<AsyncResult<Void>> resultHandler) {
        log.info((Object)"Starting rabbitmq client");
        this.vertx.executeBlocking(future -> {
            try {
                this.connect();
                future.complete();
            }
            catch (IOException | TimeoutException e) {
                log.error((Object)"Could not connect to rabbitmq", (Throwable)e);
                if (this.retries != null && this.retries > 0) {
                    try {
                        this.reconnect();
                    }
                    catch (IOException ioex) {
                        future.fail((Throwable)ioex);
                    }
                }
                future.fail((Throwable)e);
            }
        }, resultHandler);
    }

    @Override
    public void stop(Handler<AsyncResult<Void>> resultHandler) {
        log.info((Object)"Stopping rabbitmq client");
        this.vertx.executeBlocking(future -> {
            try {
                this.disconnect();
                future.complete();
            }
            catch (IOException e) {
                future.fail((Throwable)e);
            }
        }, resultHandler);
    }

    private <T> void forChannel(Handler<AsyncResult<T>> resultHandler, ChannelHandler<T> channelHandler) {
        if (this.connection == null || this.channel == null) {
            resultHandler.handle((Object)Future.failedFuture((String)"Not connected"));
            return;
        }
        if (!this.channel.isOpen()) {
            try {
                log.debug((Object)"channel is close, try create Channel");
                this.channel = this.connection.createChannel();
            }
            catch (IOException e) {
                log.debug((Object)"create channel error");
                resultHandler.handle((Object)Future.failedFuture((Throwable)e));
            }
        }
        this.vertx.executeBlocking(future -> {
            try {
                Object t = channelHandler.handle(this.channel);
                future.complete(t);
            }
            catch (Throwable t) {
                future.fail(t);
            }
        }, resultHandler);
    }

    private void connect() throws IOException, TimeoutException {
        log.debug((Object)"Connecting to rabbitmq...");
        this.connection = RabbitMQClientImpl.newConnection(this.config);
        this.connection.addShutdownListener((ShutdownListener)this);
        this.channel = this.connection.createChannel();
        log.debug((Object)"Connected to rabbitmq !");
    }

    private void disconnect() throws IOException {
        try {
            log.debug((Object)"Disconnecting from rabbitmq...");
            this.connection.close();
            log.debug((Object)"Disconnected from rabbitmq !");
        }
        finally {
            this.connection = null;
            this.channel = null;
        }
    }

    private void reconnect() throws IOException {
        if (this.retries == null || this.retries < 1) {
            return;
        }
        log.info((Object)"Attempting to reconnect to rabbitmq...");
        AtomicInteger attempts = new AtomicInteger(0);
        int retries = this.retries;
        long delay = this.config.getLong("connectionRetryDelay", Long.valueOf(10000L));
        this.vertx.setPeriodic(delay, id -> {
            int attempt = attempts.incrementAndGet();
            if (attempt == retries) {
                this.vertx.cancelTimer(id.longValue());
                log.info((Object)("Max number of connect attempts (" + retries + ") reached. Will not attempt to connect again"));
            } else {
                try {
                    log.debug((Object)("Reconnect attempt # " + attempt));
                    this.connect();
                    this.vertx.cancelTimer(id.longValue());
                    log.info((Object)("Successfully reconnected to rabbitmq (attempt # " + attempt + ")"));
                }
                catch (IOException | TimeoutException e) {
                    log.debug((Object)("Failed to connect attempt # " + attempt), (Throwable)e);
                }
            }
        });
    }

    public void shutdownCompleted(ShutdownSignalException cause) {
        if (cause.isInitiatedByApplication()) {
            return;
        }
        log.info((Object)"RabbitMQ connection shutdown! The client will attempt to reconnect automatically", (Throwable)cause);
    }

    private static interface ChannelHandler<T> {
        public T handle(Channel var1) throws Exception;
    }
}

