/*
 * Decompiled with CFR 0.152.
 */
package com.exactpro.th2.common.schema.message.impl.rabbitmq;

import com.exactpro.th2.common.metrics.HealthMetrics;
import com.exactpro.th2.common.schema.message.FilterFunction;
import com.exactpro.th2.common.schema.message.MessageListener;
import com.exactpro.th2.common.schema.message.MessageSubscriber;
import com.exactpro.th2.common.schema.message.SubscriberMonitor;
import com.exactpro.th2.common.schema.message.configuration.RouterFilter;
import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.SubscribeTarget;
import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager;
import com.google.protobuf.Message;
import com.rabbitmq.client.Delivery;
import io.prometheus.client.Counter;
import io.prometheus.client.Histogram;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractRabbitSubscriber<T>
implements MessageSubscriber<T> {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRabbitSubscriber.class);
    private final List<MessageListener<T>> listeners = new CopyOnWriteArrayList<MessageListener<T>>();
    private final AtomicReference<SubscribeTarget> subscribeTarget = new AtomicReference();
    private final AtomicReference<ConnectionManager> connectionManager = new AtomicReference();
    private final AtomicReference<SubscriberMonitor> consumerMonitor = new AtomicReference();
    private final AtomicReference<FilterFunction> filterFunc = new AtomicReference();
    private final HealthMetrics healthMetrics = new HealthMetrics(this);

    @Override
    public void init(@NotNull ConnectionManager connectionManager, @NotNull String exchangeName, @NotNull SubscribeTarget subscribeTarget) {
        this.init(connectionManager, new SubscribeTarget(subscribeTarget.getQueue(), subscribeTarget.getRoutingKey(), exchangeName), FilterFunction.DEFAULT_FILTER_FUNCTION);
    }

    @Override
    public void init(@NotNull ConnectionManager connectionManager, @NotNull SubscribeTarget subscribeTarget, @NotNull FilterFunction filterFunc) {
        Objects.requireNonNull(connectionManager, "Connection can not be null");
        Objects.requireNonNull(subscribeTarget, "Subscriber target can not be null");
        Objects.requireNonNull(filterFunc, "Filter function can not be null");
        if (this.connectionManager.get() != null || this.subscribeTarget.get() != null || this.filterFunc.get() != null) {
            throw new IllegalStateException("Subscriber is already initialize");
        }
        this.connectionManager.set(connectionManager);
        this.subscribeTarget.set(subscribeTarget);
        this.filterFunc.set(filterFunc);
    }

    @Override
    public void start() throws Exception {
        ConnectionManager connectionManager = this.connectionManager.get();
        SubscribeTarget target = this.subscribeTarget.get();
        if (connectionManager == null || target == null) {
            throw new IllegalStateException("Subscriber is not initialized");
        }
        try {
            String queue = target.getQueue();
            String routingKey = target.getRoutingKey();
            String exchangeName = target.getExchange();
            this.consumerMonitor.updateAndGet(monitor -> {
                if (monitor == null) {
                    try {
                        monitor = connectionManager.basicConsume(queue, this::handle, this::canceled);
                        LOGGER.info("Start listening exchangeName='{}', routing key='{}', queue name='{}'", new Object[]{exchangeName, routingKey, queue});
                    }
                    catch (IOException e) {
                        throw new IllegalStateException("Can not start subscribe to queue = " + queue, e);
                    }
                }
                return monitor;
            });
        }
        catch (Exception e) {
            throw new IllegalStateException("Can not start listening", e);
        }
    }

    @Override
    public void addListener(MessageListener<T> messageListener) {
        this.listeners.add(messageListener);
    }

    @Override
    public void close() throws Exception {
        ConnectionManager connectionManager = this.connectionManager.get();
        if (connectionManager == null) {
            throw new IllegalStateException("Subscriber is not initialized");
        }
        SubscriberMonitor monitor = this.consumerMonitor.getAndSet(null);
        if (monitor != null) {
            monitor.unsubscribe();
        }
        this.listeners.forEach(MessageListener::onClose);
        this.listeners.clear();
    }

    protected boolean callFilterFunction(Message message, List<? extends RouterFilter> filters) {
        FilterFunction filterFunction = this.filterFunc.get();
        if (filterFunction == null) {
            throw new IllegalStateException("Subscriber is not initialized");
        }
        return (Boolean)filterFunction.apply(message, filters);
    }

    protected abstract List<T> valueFromBytes(byte[] var1) throws Exception;

    protected abstract String toShortTraceString(T var1);

    protected abstract String toShortDebugString(T var1);

    @Nullable
    protected abstract T filter(T var1) throws Exception;

    protected abstract Counter getDeliveryCounter();

    protected abstract Counter getContentCounter();

    protected abstract Histogram getProcessingTimer();

    protected abstract String[] extractLabels(T var1);

    protected abstract int extractCountFrom(T var1);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handle(String consumeTag, Delivery delivery) {
        Histogram.Timer processTimer = this.getProcessingTimer().startTimer();
        try {
            List<T> values = this.valueFromBytes(delivery.getBody());
            for (T value : values) {
                Objects.requireNonNull(value, "Received value is null");
                String[] labels = this.extractLabels(value);
                Objects.requireNonNull(labels, "Labels list extracted from received value is null");
                Counter counter = this.getDeliveryCounter();
                Counter contentCounter = this.getContentCounter();
                if (labels.length == 0) {
                    counter.inc();
                    contentCounter.inc((double)this.extractCountFrom(value));
                } else {
                    ((Counter.Child)counter.labels(labels)).inc();
                    ((Counter.Child)contentCounter.labels(labels)).inc((double)this.extractCountFrom(value));
                }
                if (LOGGER.isTraceEnabled()) {
                    LOGGER.trace("Received message: {}", (Object)this.toShortTraceString(value));
                } else if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Received message: {}", (Object)this.toShortDebugString(value));
                }
                T filteredValue = this.filter(value);
                if (Objects.isNull(filteredValue)) {
                    LOGGER.debug("Message is filtered");
                    return;
                }
                for (MessageListener<T> listener : this.listeners) {
                    try {
                        listener.handler(consumeTag, filteredValue);
                    }
                    catch (Exception listenerExc) {
                        LOGGER.warn("Message listener from class '{}' threw exception", listener.getClass(), (Object)listenerExc);
                    }
                }
            }
        }
        catch (Exception e) {
            LOGGER.error("Can not parse value from delivery for: {}", (Object)consumeTag, (Object)e);
        }
        finally {
            processTimer.observeDuration();
        }
    }

    private void resubscribe() {
        SubscribeTarget target = this.subscribeTarget.get();
        String queue = target.getQueue();
        String routingKey = target.getRoutingKey();
        String exchangeName = target.getExchange();
        LOGGER.info("Try to resubscribe subscriber for exchangeName='{}', routing key='{}', queue name='{}'", new Object[]{exchangeName, routingKey, queue});
        SubscriberMonitor monitor = this.consumerMonitor.getAndSet(null);
        if (monitor != null) {
            try {
                monitor.unsubscribe();
            }
            catch (Exception e) {
                LOGGER.info("Can not unsubscribe on resubscribe for exchangeName='{}', routing key='{}', queue name='{}'", new Object[]{exchangeName, routingKey, queue});
            }
        }
        try {
            this.start();
        }
        catch (Exception e) {
            LOGGER.error("Can not resubscribe subscriber for exchangeName='{}', routing key='{}', queue name='{}'", new Object[]{exchangeName, routingKey, queue});
            this.healthMetrics.disable();
        }
    }

    private void canceled(String consumerTag) {
        LOGGER.warn("Consuming cancelled for: '{}'", (Object)consumerTag);
        this.healthMetrics.getReadinessMonitor().disable();
        this.resubscribe();
    }
}

