/*
 * Decompiled with CFR 0.152.
 */
package com.exactpro.th2.common.schema.message.impl.rabbitmq;

import com.exactpro.th2.common.schema.exception.RouterException;
import com.exactpro.th2.common.schema.filter.strategy.FilterStrategy;
import com.exactpro.th2.common.schema.message.FilterFunction;
import com.exactpro.th2.common.schema.message.MessageListener;
import com.exactpro.th2.common.schema.message.MessageQueue;
import com.exactpro.th2.common.schema.message.MessageRouter;
import com.exactpro.th2.common.schema.message.MessageRouterContext;
import com.exactpro.th2.common.schema.message.MessageRouterMonitor;
import com.exactpro.th2.common.schema.message.MessageSender;
import com.exactpro.th2.common.schema.message.MessageSubscriber;
import com.exactpro.th2.common.schema.message.SubscriberMonitor;
import com.exactpro.th2.common.schema.message.configuration.MessageRouterConfiguration;
import com.exactpro.th2.common.schema.message.configuration.QueueConfiguration;
import com.exactpro.th2.common.schema.message.configuration.RouterFilter;
import com.exactpro.th2.common.schema.message.impl.context.DefaultMessageRouterContext;
import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager;
import com.google.protobuf.Message;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractRabbitMessageRouter<T>
implements MessageRouter<T> {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRabbitMessageRouter.class);
    private final AtomicReference<MessageRouterContext> context = new AtomicReference();
    private final AtomicReference<FilterStrategy<Message>> filterStrategy = new AtomicReference<FilterStrategy<Message>>(this.getDefaultFilterStrategy());
    private final ConcurrentMap<String, MessageQueue<T>> queueConnections = new ConcurrentHashMap<String, MessageQueue<T>>();

    @Override
    public void init(@NotNull ConnectionManager connectionManager, @NotNull MessageRouterConfiguration configuration) {
        Objects.requireNonNull(connectionManager, "Connection owner can not be null");
        Objects.requireNonNull(configuration, "Configuration cannot be null");
        this.init(new DefaultMessageRouterContext(connectionManager, MessageRouterMonitor.DEFAULT_MONITOR, configuration));
    }

    @Override
    public void init(@NotNull MessageRouterContext context) {
        Objects.requireNonNull(context, "Context can not be null");
        this.context.updateAndGet(prev -> {
            if (prev == null) {
                return context;
            }
            throw new IllegalStateException("Router is already initialized");
        });
    }

    @Nullable
    private SubscriberMonitor subscribe(String queueAlias, MessageListener<T> callback) {
        MessageQueue<T> queue = this.getMessageQueue(queueAlias);
        MessageSubscriber<T> subscriber = queue.getSubscriber();
        subscriber.addListener(callback);
        try {
            subscriber.start();
        }
        catch (Exception e) {
            throw new RouterException("Can not start subscriber", e);
        }
        return new SubscriberMonitorImpl(subscriber, queue);
    }

    @Override
    @NotNull
    public SubscriberMonitor subscribe(MessageListener<T> callback, String ... queueAttr) {
        Collection<String> attributes = this.addRequiredSubscribeAttributes(queueAttr);
        Map<String, QueueConfiguration> queues = this.getConfiguration().findQueuesByAttr(attributes);
        if (queues.size() != 1) {
            throw new IllegalStateException("Wrong amount of queues for subscribe. Found " + queues.size() + " queues, but must not be more than 1. Search was done by " + attributes + " attributes");
        }
        return this.subscribe(queues.keySet().iterator().next(), callback);
    }

    @Override
    public SubscriberMonitor subscribeAll(MessageListener<T> callback) {
        List<SubscriberMonitor> subscribers = this.getConfiguration().findQueuesByAttr(this.requiredSubscribeAttributes()).keySet().stream().map(alias -> this.subscribe((String)alias, callback)).collect(Collectors.toList());
        if (subscribers.isEmpty()) {
            throw new IllegalStateException("Wrong amount of queues for subscribeAll. Should not be empty. Search was done by " + this.requiredSubscribeAttributes() + " attributes");
        }
        return new MultiplySubscribeMonitorImpl(subscribers);
    }

    @Override
    public SubscriberMonitor subscribeAll(MessageListener<T> callback, String ... queueAttr) {
        Collection<String> attributes = this.addRequiredSubscribeAttributes(queueAttr);
        List<SubscriberMonitor> subscribers = this.getConfiguration().findQueuesByAttr(attributes).keySet().stream().map(queueConfiguration -> this.subscribe((String)queueConfiguration, callback)).collect(Collectors.toList());
        if (subscribers.isEmpty()) {
            throw new IllegalStateException("Wrong amount of queues for subscribeAll. Should not be empty. Search was done by " + attributes + " attributes");
        }
        return new MultiplySubscribeMonitorImpl(subscribers);
    }

    @Override
    public void send(T message) throws IOException {
        Map<String, T> filteredByAttrAndFilter = this.findQueueByFilter(this.requiredSendAttributes().size() > 0 ? this.getConfiguration().findQueuesByAttr(this.requiredSendAttributes()) : this.getConfiguration().getQueues(), message);
        if (filteredByAttrAndFilter.size() != 1) {
            throw new IllegalStateException("Wrong count of queues for send. Should be equal to 1. Find queues = " + filteredByAttrAndFilter.size());
        }
        this.send(filteredByAttrAndFilter);
    }

    @Override
    public void send(T message, String ... queueAttr) throws IOException {
        Map<String, QueueConfiguration> filteredByAttr = this.getConfiguration().findQueuesByAttr(this.addRequiredSendAttributes(queueAttr));
        Map<String, T> filteredByAttrAndFilter = this.findQueueByFilter(filteredByAttr, message);
        if (filteredByAttrAndFilter.size() != 1) {
            throw new IllegalStateException("Wrong size of queues for send. Should be equal to 1");
        }
        this.send(filteredByAttrAndFilter);
    }

    @Override
    public void sendAll(T message, String ... queueAttr) throws IOException {
        Map<String, QueueConfiguration> filteredByAttr = this.getConfiguration().findQueuesByAttr(this.addRequiredSendAttributes(queueAttr));
        Map<String, T> filteredByAttrAndFilter = this.findQueueByFilter(filteredByAttr, message);
        if (filteredByAttrAndFilter.isEmpty()) {
            throw new IllegalStateException("Wrong size of queues for send. Can't be equal to 0");
        }
        this.send(filteredByAttrAndFilter);
    }

    @NotNull
    public FilterStrategy getFilterStrategy() {
        return this.filterStrategy.get();
    }

    @Override
    public void close() {
        LOGGER.info("Closing message router");
        ArrayList<Exception> exceptions = new ArrayList<Exception>();
        for (MessageQueue queue : this.queueConnections.values()) {
            try {
                queue.close();
            }
            catch (Exception e) {
                exceptions.add(e);
            }
        }
        this.queueConnections.clear();
        if (!exceptions.isEmpty()) {
            RouterException exception = new RouterException("Can not close message router");
            exceptions.forEach(exception::addSuppressed);
            throw exception;
        }
        LOGGER.info("Message router has been successfully closed");
    }

    protected abstract MessageQueue<T> createQueue(@NotNull ConnectionManager var1, @NotNull QueueConfiguration var2, @NotNull FilterFunction var3);

    protected abstract Map<String, T> findQueueByFilter(Map<String, QueueConfiguration> var1, T var2);

    protected abstract Set<String> requiredSubscribeAttributes();

    protected abstract Set<String> requiredSendAttributes();

    @NotNull
    protected FilterStrategy<Message> getDefaultFilterStrategy() {
        return FilterStrategy.DEFAULT_FILTER_STRATEGY;
    }

    protected MessageRouterMonitor getMonitor() {
        return this.getContext().getRouterMonitor();
    }

    protected MessageQueue<T> getMessageQueue(String queueAlias) {
        return this.queueConnections.computeIfAbsent(queueAlias, key -> {
            ConnectionManager connectionManager = this.getConnectionManager();
            QueueConfiguration queueByAlias = this.getConfiguration().getQueueByAlias((String)key);
            if (queueByAlias == null) {
                throw new IllegalStateException("Can not find queue for " + queueAlias);
            }
            return this.createQueue(connectionManager, queueByAlias, this::filterMessage);
        });
    }

    @Override
    protected void send(Map<String, T> aliasesAndMessagesToSend) {
        HashMap exceptions = new HashMap();
        aliasesAndMessagesToSend.forEach((queueAlias, message) -> {
            try {
                MessageSender<Object> sender = this.getMessageQueue((String)queueAlias).getSender();
                sender.send(message);
            }
            catch (IOException e) {
                exceptions.put(queueAlias, e);
            }
            catch (Exception e) {
                throw new RouterException("Can not start sender to queue: " + queueAlias, e);
            }
        });
        if (!exceptions.isEmpty()) {
            RouterException exception = new RouterException("Can not send to queue(s): " + String.join((CharSequence)",", exceptions.keySet()));
            exceptions.values().forEach(exception::addSuppressed);
            throw exception;
        }
    }

    protected boolean filterMessage(Message msg, List<? extends RouterFilter> filters) {
        return this.filterStrategy.get().verify(msg, filters);
    }

    @NotNull
    private ConnectionManager getConnectionManager() {
        return this.getContext().getConnectionManager();
    }

    @NotNull
    private MessageRouterConfiguration getConfiguration() {
        return this.getContext().getConfiguration();
    }

    @NotNull
    private MessageRouterContext getContext() {
        MessageRouterContext context = this.context.get();
        if (context == null) {
            throw new IllegalStateException("Router is not initialized");
        }
        return context;
    }

    private Collection<String> addRequiredSubscribeAttributes(String[] queueAttr) {
        HashSet<String> attributes = new HashSet<String>(this.requiredSubscribeAttributes());
        attributes.addAll(Arrays.asList(queueAttr));
        return attributes;
    }

    private Collection<String> addRequiredSendAttributes(String[] queueAttr) {
        HashSet<String> attributes = new HashSet<String>(this.requiredSendAttributes());
        attributes.addAll(Arrays.asList(queueAttr));
        return attributes;
    }

    protected static class MultiplySubscribeMonitorImpl
    implements SubscriberMonitor {
        private final List<SubscriberMonitor> subscriberMonitors;

        public MultiplySubscribeMonitorImpl(List<SubscriberMonitor> subscriberMonitors) {
            this.subscriberMonitors = subscriberMonitors;
        }

        @Override
        public void unsubscribe() throws Exception {
            Throwable exception = null;
            for (SubscriberMonitor monitor : this.subscriberMonitors) {
                try {
                    monitor.unsubscribe();
                }
                catch (Exception e) {
                    if (exception == null) {
                        exception = new Exception("Can not unsubscribe from some subscribe monitors");
                    }
                    exception.addSuppressed(e);
                }
            }
            if (exception != null) {
                throw exception;
            }
        }
    }

    protected static class SubscriberMonitorImpl
    implements SubscriberMonitor {
        private final Object lock;
        private final MessageSubscriber<?> subscriber;

        public SubscriberMonitorImpl(@NotNull MessageSubscriber<?> subscriber, @Nullable Object lock) {
            this.lock = lock == null ? subscriber : lock;
            this.subscriber = subscriber;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void unsubscribe() throws Exception {
            Object object = this.lock;
            synchronized (object) {
                this.subscriber.close();
            }
        }
    }
}

