/*
 * Decompiled with CFR 0.152.
 */
package com.exactpro.th2.common.schema.message.impl.rabbitmq;

import com.exactpro.th2.common.schema.message.FilterFunction;
import com.exactpro.th2.common.schema.message.MessageQueue;
import com.exactpro.th2.common.schema.message.MessageSender;
import com.exactpro.th2.common.schema.message.MessageSubscriber;
import com.exactpro.th2.common.schema.message.configuration.QueueConfiguration;
import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager;
import java.util.ArrayList;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import org.jetbrains.annotations.NotNull;

public abstract class AbstractRabbitQueue<T>
implements MessageQueue<T> {
    private final AtomicReference<ConnectionManager> connectionManager = new AtomicReference();
    private final AtomicReference<QueueConfiguration> queueConfiguration = new AtomicReference();
    private final AtomicReference<FilterFunction> filterFunc = new AtomicReference();
    private final AtomicReference<MessageSender<T>> sender = new AtomicReference();
    private final AtomicReference<MessageSubscriber<T>> subscriber = new AtomicReference();

    @Override
    public void init(@NotNull ConnectionManager connectionManager, @NotNull QueueConfiguration queueConfiguration) {
        this.init(connectionManager, queueConfiguration, FilterFunction.DEFAULT_FILTER_FUNCTION);
    }

    @Override
    public void init(@NotNull ConnectionManager connectionManager, @NotNull QueueConfiguration queueConfiguration, @NotNull FilterFunction filterFunc) {
        if (this.isInit()) {
            throw new IllegalStateException("Queue is already initialize");
        }
        Objects.requireNonNull(connectionManager, "Connection can not be null");
        Objects.requireNonNull(queueConfiguration, "Queue configuration can not be null");
        Objects.requireNonNull(filterFunc, "Filter function can not be null");
        this.connectionManager.set(connectionManager);
        this.queueConfiguration.set(queueConfiguration);
        this.filterFunc.set(filterFunc);
    }

    @Override
    public MessageSubscriber<T> getSubscriber() {
        ConnectionManager connectionManger = this.connectionManager.get();
        QueueConfiguration queueConfiguration = this.queueConfiguration.get();
        FilterFunction filterFunction = this.filterFunc.get();
        if (connectionManger == null || queueConfiguration == null || filterFunction == null) {
            throw new IllegalStateException("Queue is not initialized");
        }
        if (!queueConfiguration.isReadable()) {
            throw new IllegalStateException("Queue can not read");
        }
        return this.subscriber.updateAndGet(subscriber -> {
            if (subscriber == null) {
                return this.createSubscriber(connectionManger, queueConfiguration, filterFunction);
            }
            return subscriber;
        });
    }

    @Override
    public MessageSender<T> getSender() {
        ConnectionManager connectionManager = this.connectionManager.get();
        QueueConfiguration queueConfiguration = this.queueConfiguration.get();
        FilterFunction filterFunction = this.filterFunc.get();
        if (connectionManager == null || queueConfiguration == null || filterFunction == null) {
            throw new IllegalStateException("Queue is not initialized");
        }
        if (!queueConfiguration.isWritable()) {
            throw new IllegalStateException("Queue can not write");
        }
        return this.sender.updateAndGet(sender -> {
            if (sender == null) {
                return this.createSender(connectionManager, queueConfiguration);
            }
            return sender;
        });
    }

    @Override
    public void close() throws Exception {
        ArrayList exceptions = new ArrayList();
        this.subscriber.updateAndGet(subscriber -> {
            if (subscriber != null) {
                try {
                    subscriber.close();
                }
                catch (Exception e) {
                    exceptions.add(e);
                }
            }
            return null;
        });
        if (!exceptions.isEmpty()) {
            Exception exception = new Exception("Can not close message queue");
            exceptions.forEach(exception::addSuppressed);
            throw exception;
        }
    }

    public boolean isInit() {
        return this.connectionManager.get() != null || this.queueConfiguration.get() != null || this.filterFunc.get() != null;
    }

    protected abstract MessageSender<T> createSender(@NotNull ConnectionManager var1, @NotNull QueueConfiguration var2);

    protected abstract MessageSubscriber<T> createSubscriber(@NotNull ConnectionManager var1, @NotNull QueueConfiguration var2, @NotNull FilterFunction var3);
}

