/*
 * Decompiled with CFR 0.152.
 */
package io.kestra.runner.memory;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.hash.HashCode;
import com.google.common.hash.Hashing;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.queues.QueueService;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.utils.ExecutorsUtils;
import io.micronaut.context.ApplicationContext;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MemoryQueue<T>
implements QueueInterface<T> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(MemoryQueue.class);
    private static final ObjectMapper mapper = JacksonMapper.ofJson();
    private static ExecutorService poolExecutor;
    private final QueueService queueService;
    private final Class<T> cls;
    private final Map<String, List<Consumer<T>>> consumers = new ConcurrentHashMap<String, List<Consumer<T>>>();

    public MemoryQueue(Class<T> cls, ApplicationContext applicationContext) {
        if (poolExecutor == null) {
            ExecutorsUtils executorsUtils = (ExecutorsUtils)applicationContext.getBean(ExecutorsUtils.class);
            poolExecutor = executorsUtils.cachedThreadPool("memory-queue");
        }
        this.queueService = (QueueService)applicationContext.getBean(QueueService.class);
        this.cls = cls;
    }

    private static int selectConsumer(String key, int size) {
        if (key == null) {
            return new Random().nextInt(size);
        }
        return Hashing.consistentHash((HashCode)Hashing.crc32().hashString((CharSequence)key, StandardCharsets.UTF_8), (int)size);
    }

    private void produce(String key, T message) {
        if (log.isTraceEnabled()) {
            log.trace("New message: topic '{}', value {}", (Object)this.cls.getName(), message);
        }
        this.consumers.forEach((consumerGroup, consumers) -> poolExecutor.execute(() -> {
            Consumer consumer;
            MemoryQueue memoryQueue = this;
            synchronized (memoryQueue) {
                if (consumers.size() == 0) {
                    log.debug("No consumer connected on queue '" + this.cls.getName() + "'");
                    return;
                }
                int index = MemoryQueue.selectConsumer(key, consumers.size());
                consumer = (Consumer)consumers.get(index);
            }
            try {
                Object serialized = message == null ? null : mapper.readValue(mapper.writeValueAsString(message), this.cls);
                consumer.accept(serialized);
            }
            catch (JsonProcessingException e) {
                throw new RuntimeException(e);
            }
        }));
    }

    public void emit(T message) {
        this.produce(this.queueService.key(message), message);
    }

    public void emitAsync(T message) throws QueueException {
        this.emit(message);
    }

    public void delete(T message) throws QueueException {
        this.produce(this.queueService.key(message), null);
    }

    public Runnable receive(Consumer<T> consumer) {
        return this.receive(null, consumer);
    }

    public synchronized Runnable receive(Class<?> consumerGroup, Consumer<T> consumer) {
        String consumerGroupName = consumerGroup == null ? UUID.randomUUID().toString() : consumerGroup.getSimpleName();
        if (!this.consumers.containsKey(consumerGroupName)) {
            this.consumers.put(consumerGroupName, Collections.synchronizedList(new ArrayList()));
        }
        this.consumers.get(consumerGroupName).add(consumer);
        int index = this.consumers.get(consumerGroupName).size() - 1;
        return () -> {
            MemoryQueue memoryQueue = this;
            synchronized (memoryQueue) {
                this.consumers.get(consumerGroupName).remove(index);
                if (this.consumers.get(consumerGroupName).size() == 0) {
                    this.consumers.remove(consumerGroupName);
                }
            }
        };
    }

    public void pause() {
    }

    public int getSubscribersCount() {
        return this.consumers.values().stream().map(List::size).reduce(0, Integer::sum);
    }

    public void close() throws IOException {
        if (!poolExecutor.isShutdown()) {
            poolExecutor.shutdown();
        }
    }
}

