/*
 * Decompiled with CFR 0.152.
 */
package com.exactpro.th2.common.schema.message.impl.rabbitmq.connection;

import com.exactpro.th2.common.metrics.HealthMetrics;
import com.exactpro.th2.common.schema.message.SubscriberMonitor;
import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.ConnectionManagerConfiguration;
import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.RabbitMQConfiguration;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BlockedListener;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.ExceptionHandler;
import com.rabbitmq.client.Recoverable;
import com.rabbitmq.client.RecoveryListener;
import com.rabbitmq.client.ShutdownNotifier;
import com.rabbitmq.client.TopologyRecoveryException;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConnectionManager
implements AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionManager.class);
    private final Connection connection;
    private final Map<PinId, ChannelHolder> channelsByPin = new ConcurrentHashMap<PinId, ChannelHolder>();
    private final AtomicInteger connectionRecoveryAttempts = new AtomicInteger(0);
    private final AtomicBoolean connectionIsClosed = new AtomicBoolean(false);
    private final ConnectionManagerConfiguration configuration;
    private final String subscriberName;
    private final AtomicInteger nextSubscriberId = new AtomicInteger(1);
    private final ExecutorService sharedExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("rabbitmq-shared-pool-%d").build());
    private final HealthMetrics metrics = new HealthMetrics(this);
    private final RecoveryListener recoveryListener = new RecoveryListener(){

        public void handleRecovery(Recoverable recoverable) {
            LOGGER.debug("Count tries to recovery connection reset to 0");
            ConnectionManager.this.connectionRecoveryAttempts.set(0);
            ConnectionManager.this.metrics.getReadinessMonitor().enable();
            LOGGER.debug("Set RabbitMQ readiness to true");
        }

        public void handleRecoveryStarted(Recoverable recoverable) {
        }
    };

    public ConnectionManagerConfiguration getConfiguration() {
        return this.configuration;
    }

    public ConnectionManager(@NotNull RabbitMQConfiguration rabbitMQConfiguration, @NotNull ConnectionManagerConfiguration connectionManagerConfiguration, Runnable onFailedRecoveryConnection) {
        Objects.requireNonNull(rabbitMQConfiguration, "RabbitMQ configuration cannot be null");
        this.configuration = Objects.requireNonNull(connectionManagerConfiguration, "Connection manager configuration can not be null");
        String subscriberNameTmp = (String)ObjectUtils.defaultIfNull((Object)connectionManagerConfiguration.getSubscriberName(), (Object)rabbitMQConfiguration.getSubscriberName());
        if (StringUtils.isBlank((CharSequence)subscriberNameTmp)) {
            this.subscriberName = "rabbit_mq_subscriber." + System.currentTimeMillis();
            LOGGER.info("Subscribers will use default name: {}", (Object)this.subscriberName);
        } else {
            this.subscriberName = subscriberNameTmp + "." + System.currentTimeMillis();
        }
        ConnectionFactory factory = new ConnectionFactory();
        String virtualHost = rabbitMQConfiguration.getVHost();
        String username = rabbitMQConfiguration.getUsername();
        String password = rabbitMQConfiguration.getPassword();
        factory.setHost(rabbitMQConfiguration.getHost());
        factory.setPort(rabbitMQConfiguration.getPort());
        if (StringUtils.isNotBlank((CharSequence)virtualHost)) {
            factory.setVirtualHost(virtualHost);
        }
        if (StringUtils.isNotBlank((CharSequence)username)) {
            factory.setUsername(username);
        }
        if (StringUtils.isNotBlank((CharSequence)password)) {
            factory.setPassword(password);
        }
        if (connectionManagerConfiguration.getConnectionTimeout() > 0) {
            factory.setConnectionTimeout(connectionManagerConfiguration.getConnectionTimeout());
        }
        factory.setExceptionHandler(new ExceptionHandler(){

            public void handleUnexpectedConnectionDriverException(Connection conn, Throwable exception) {
                this.turnOffReadiness(exception);
            }

            public void handleReturnListenerException(Channel channel, Throwable exception) {
                this.turnOffReadiness(exception);
            }

            public void handleConfirmListenerException(Channel channel, Throwable exception) {
                this.turnOffReadiness(exception);
            }

            public void handleBlockedListenerException(Connection connection, Throwable exception) {
                this.turnOffReadiness(exception);
            }

            public void handleConsumerException(Channel channel, Throwable exception, Consumer consumer, String consumerTag, String methodName) {
                this.turnOffReadiness(exception);
            }

            public void handleConnectionRecoveryException(Connection conn, Throwable exception) {
                this.turnOffReadiness(exception);
            }

            public void handleChannelRecoveryException(Channel ch, Throwable exception) {
                this.turnOffReadiness(exception);
            }

            public void handleTopologyRecoveryException(Connection conn, Channel ch, TopologyRecoveryException exception) {
                this.turnOffReadiness((Throwable)exception);
            }

            private void turnOffReadiness(Throwable exception) {
                ConnectionManager.this.metrics.getReadinessMonitor().disable();
                LOGGER.debug("Set RabbitMQ readiness to false. RabbitMQ error", exception);
            }
        });
        factory.setAutomaticRecoveryEnabled(true);
        factory.setConnectionRecoveryTriggeringCondition(shutdownSignal -> {
            if (this.connectionIsClosed.get()) {
                return false;
            }
            int tmpCountTriesToRecovery = this.connectionRecoveryAttempts.get();
            if (tmpCountTriesToRecovery < connectionManagerConfiguration.getMaxRecoveryAttempts()) {
                LOGGER.info("Try to recovery connection to RabbitMQ. Count tries = {}", (Object)(tmpCountTriesToRecovery + 1));
                return true;
            }
            LOGGER.error("Can not connect to RabbitMQ. Count tries = {}", (Object)tmpCountTriesToRecovery);
            if (onFailedRecoveryConnection == null) {
                throw new IllegalStateException("Cannot recover connection to RabbitMQ");
            }
            onFailedRecoveryConnection.run();
            return false;
        });
        factory.setRecoveryDelayHandler(recoveryAttempts -> {
            int tmpCountTriesToRecovery = this.connectionRecoveryAttempts.getAndIncrement();
            int recoveryDelay = connectionManagerConfiguration.getMinConnectionRecoveryTimeout() + (connectionManagerConfiguration.getMaxRecoveryAttempts() > 1 ? (connectionManagerConfiguration.getMaxConnectionRecoveryTimeout() - connectionManagerConfiguration.getMinConnectionRecoveryTimeout()) / (connectionManagerConfiguration.getMaxRecoveryAttempts() - 1) * tmpCountTriesToRecovery : 0);
            LOGGER.info("Recovery delay for '{}' try = {}", (Object)tmpCountTriesToRecovery, (Object)recoveryDelay);
            return recoveryDelay;
        });
        factory.setSharedExecutor(this.sharedExecutor);
        try {
            this.connection = factory.newConnection();
            this.metrics.getReadinessMonitor().enable();
            LOGGER.debug("Set RabbitMQ readiness to true");
        }
        catch (IOException | TimeoutException e) {
            this.metrics.getReadinessMonitor().disable();
            LOGGER.debug("Set RabbitMQ readiness to false. Can not create connection", (Throwable)e);
            throw new IllegalStateException("Failed to create RabbitMQ connection using configuration", e);
        }
        this.connection.addBlockedListener(new BlockedListener(){

            public void handleBlocked(String reason) throws IOException {
                LOGGER.warn("RabbitMQ blocked connection: {}", (Object)reason);
            }

            public void handleUnblocked() throws IOException {
                LOGGER.warn("RabbitMQ unblocked connection");
            }
        });
        if (!(this.connection instanceof Recoverable)) {
            throw new IllegalStateException("Connection does not implement Recoverable. Can not add RecoveryListener to it");
        }
        Recoverable recoverableConnection = (Recoverable)this.connection;
        recoverableConnection.addRecoveryListener(this.recoveryListener);
        LOGGER.debug("Recovery listener was added to connection.");
    }

    public boolean isOpen() {
        return this.connection.isOpen() && !this.connectionIsClosed.get();
    }

    @Override
    public void close() {
        if (this.connectionIsClosed.getAndSet(true)) {
            return;
        }
        int closeTimeout = this.configuration.getConnectionCloseTimeout();
        if (this.connection.isOpen()) {
            try {
                this.connection.close(closeTimeout);
            }
            catch (IOException e) {
                LOGGER.error("Cannot close connection", (Throwable)e);
            }
        }
        this.shutdownSharedExecutor(closeTimeout);
    }

    public void basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body) throws IOException {
        ChannelHolder holder = this.getChannelFor(PinId.forRoutingKey(routingKey));
        holder.withLock(channel -> channel.basicPublish(exchange, routingKey, props, body));
    }

    public SubscriberMonitor basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException {
        ChannelHolder holder = this.getChannelFor(PinId.forQueue(queue));
        String tag = holder.mapWithLock(channel -> channel.basicConsume(queue, false, this.subscriberName + "_" + this.nextSubscriberId.getAndIncrement(), (tagTmp, delivery) -> {
            try {
                try {
                    deliverCallback.handle(tagTmp, delivery);
                }
                finally {
                    holder.withLock(ch -> this.basicAck(ch, delivery.getEnvelope().getDeliveryTag()));
                }
            }
            catch (IOException | RuntimeException e) {
                LOGGER.error("Cannot handle delivery for tag {}: {}", new Object[]{tagTmp, e.getMessage(), e});
            }
        }, cancelCallback));
        return new RabbitMqSubscriberMonitor(holder, tag, this::basicCancel);
    }

    private void basicCancel(Channel channel, String consumerTag) throws IOException {
        channel.basicCancel(consumerTag);
    }

    private void shutdownSharedExecutor(int closeTimeout) {
        this.sharedExecutor.shutdown();
        try {
            if (!this.sharedExecutor.awaitTermination(closeTimeout, TimeUnit.MILLISECONDS)) {
                LOGGER.error("Executor is not terminated during {} millis", (Object)closeTimeout);
                List<Runnable> runnables = this.sharedExecutor.shutdownNow();
                LOGGER.error("{} task(s) was(were) not finished", (Object)runnables.size());
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private ChannelHolder getChannelFor(PinId pinId) {
        return this.channelsByPin.computeIfAbsent(pinId, ignore -> {
            LOGGER.trace("Creating channel holder for {}", (Object)pinId);
            return new ChannelHolder(this::createChannel, this::waitForConnectionRecovery);
        });
    }

    private Channel createChannel() {
        this.waitForConnectionRecovery((ShutdownNotifier)this.connection);
        try {
            Channel channel = this.connection.createChannel();
            Objects.requireNonNull(channel, () -> "No channels are available in the connection. Max channel number: " + this.connection.getChannelMax());
            channel.basicQos(this.configuration.getPrefetchCount());
            channel.addReturnListener(ret -> LOGGER.warn("Can not router message to exchange '{}', routing key '{}'. Reply code '{}' and text = {}", new Object[]{ret.getExchange(), ret.getRoutingKey(), ret.getReplyCode(), ret.getReplyText()}));
            return channel;
        }
        catch (IOException e) {
            throw new IllegalStateException("Can not create channel", e);
        }
    }

    private void waitForConnectionRecovery(ShutdownNotifier notifier) {
        this.waitForConnectionRecovery(notifier, true);
    }

    private void waitForConnectionRecovery(ShutdownNotifier notifier, boolean waitForRecovery) {
        if (this.isConnectionRecovery(notifier)) {
            if (waitForRecovery) {
                this.waitForRecovery(notifier);
            } else {
                LOGGER.warn("Skip waiting for connection recovery");
            }
        }
        if (this.connectionIsClosed.get()) {
            throw new IllegalStateException("Connection is already closed");
        }
    }

    private void waitForRecovery(ShutdownNotifier notifier) {
        LOGGER.warn("Start waiting for connection recovery");
        while (this.isConnectionRecovery(notifier)) {
            try {
                Thread.sleep(1L);
            }
            catch (InterruptedException e) {
                LOGGER.error("Wait for connection recovery was interrupted", (Throwable)e);
                break;
            }
        }
        LOGGER.info("Stop waiting for connection recovery");
    }

    private boolean isConnectionRecovery(ShutdownNotifier notifier) {
        return !notifier.isOpen() && !this.connectionIsClosed.get();
    }

    private void basicAck(Channel channel, long deliveryTag) throws IOException {
        channel.basicAck(deliveryTag, false);
    }

    private static interface ChannelConsumer {
        public void consume(Channel var1) throws IOException;
    }

    private static interface ChannelMapper<T> {
        public T map(Channel var1) throws IOException;
    }

    private static class ChannelHolder {
        private final Lock lock = new ReentrantLock();
        private final Supplier<Channel> supplier;
        private final BiConsumer<ShutdownNotifier, Boolean> reconnectionChecker;
        private Channel channel;

        public ChannelHolder(Supplier<Channel> supplier, BiConsumer<ShutdownNotifier, Boolean> reconnectionChecker) {
            this.supplier = Objects.requireNonNull(supplier, "'Supplier' parameter");
            this.reconnectionChecker = Objects.requireNonNull(reconnectionChecker, "'Reconnection checker' parameter");
        }

        public void withLock(ChannelConsumer consumer) throws IOException {
            this.withLock(true, consumer);
        }

        public void withLock(boolean waitForRecovery, ChannelConsumer consumer) throws IOException {
            this.lock.lock();
            try {
                consumer.consume(this.getChannel(waitForRecovery));
            }
            finally {
                this.lock.unlock();
            }
        }

        public <T> T mapWithLock(ChannelMapper<T> mapper) throws IOException {
            this.lock.lock();
            try {
                T t = mapper.map(this.getChannel());
                return t;
            }
            finally {
                this.lock.unlock();
            }
        }

        private Channel getChannel() {
            return this.getChannel(true);
        }

        private Channel getChannel(boolean waitForRecovery) {
            if (this.channel == null) {
                this.channel = this.supplier.get();
            }
            this.reconnectionChecker.accept((ShutdownNotifier)this.channel, waitForRecovery);
            return this.channel;
        }
    }

    private static class PinId {
        private final String routingKey;
        private final String queue;

        public static PinId forRoutingKey(String routingKey) {
            return new PinId(routingKey, null);
        }

        public static PinId forQueue(String queue) {
            return new PinId(null, queue);
        }

        private PinId(String routingKey, String queue) {
            if (routingKey == null && queue == null) {
                throw new NullPointerException("Either routingKey or queue must be set");
            }
            this.routingKey = routingKey;
            this.queue = queue;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            PinId pinId = (PinId)o;
            return new EqualsBuilder().append((Object)this.routingKey, (Object)pinId.routingKey).append((Object)this.queue, (Object)pinId.queue).isEquals();
        }

        public int hashCode() {
            return new HashCodeBuilder(17, 37).append((Object)this.routingKey).append((Object)this.queue).toHashCode();
        }

        public String toString() {
            return new ToStringBuilder((Object)this, ToStringStyle.JSON_STYLE).append("routingKey", (Object)this.routingKey).append("queue", (Object)this.queue).toString();
        }
    }

    private static interface CancelAction {
        public void execute(Channel var1, String var2) throws IOException;
    }

    private static class RabbitMqSubscriberMonitor
    implements SubscriberMonitor {
        private final ChannelHolder holder;
        private final String tag;
        private final CancelAction action;

        public RabbitMqSubscriberMonitor(ChannelHolder holder, String tag, CancelAction action) {
            this.holder = holder;
            this.tag = tag;
            this.action = action;
        }

        @Override
        public void unsubscribe() throws Exception {
            this.holder.withLock(false, channel -> this.action.execute(channel, this.tag));
        }
    }
}

