/*
 * Decompiled with CFR 0.152.
 */
package io.kestra.runner.memory;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.hash.HashCode;
import com.google.common.hash.Hashing;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.queues.QueueService;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.utils.Either;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MemoryQueue<T>
implements QueueInterface<T> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(MemoryQueue.class);
    private static final ObjectMapper mapper = JacksonMapper.ofJson();
    private final ExecutorService executorService;
    private final QueueService queueService;
    private final Class<T> cls;
    private final Map<String, List<Consumer<Either<T, DeserializationException>>>> queues = new ConcurrentHashMap<String, List<Consumer<Either<T, DeserializationException>>>>();

    public MemoryQueue(Class<T> cls, QueueService queueService, ExecutorService executorService) {
        this.executorService = executorService;
        this.queueService = queueService;
        this.cls = cls;
    }

    private static int selectConsumer(String key, int size) {
        if (key == null) {
            return new Random().nextInt(size);
        }
        return Hashing.consistentHash((HashCode)Hashing.crc32().hashString((CharSequence)key, StandardCharsets.UTF_8), (int)size);
    }

    private void produce(String key, T message) {
        if (log.isTraceEnabled()) {
            log.trace("New message: topic '{}', value {}", (Object)this.cls.getName(), message);
        }
        this.queues.forEach((consumerGroup, consumers) -> this.executorService.execute(() -> {
            Consumer consumer;
            MemoryQueue memoryQueue = this;
            synchronized (memoryQueue) {
                if (consumers.isEmpty()) {
                    log.debug("No consumer connected on queue '" + this.cls.getName() + "'");
                    return;
                }
                int index = MemoryQueue.selectConsumer(key, consumers.size());
                consumer = (Consumer)consumers.get(index);
            }
            String source = null;
            try {
                source = mapper.writeValueAsString(message);
                Object serialized = message == null ? null : mapper.readValue(source, this.cls);
                consumer.accept(Either.left((Object)serialized));
            }
            catch (JsonProcessingException e) {
                consumer.accept(Either.right((Object)new DeserializationException((IOException)((Object)e), source)));
            }
        }));
    }

    public void emit(String consumerGroup, T message) {
        this.produce(this.queueService.key(message), message);
    }

    public void emitAsync(String consumerGroup, T message) throws QueueException {
        this.emit(message);
    }

    public void delete(String consumerGroup, T message) throws QueueException {
        this.produce(this.queueService.key(message), null);
    }

    public Runnable receive(String consumerGroup, Consumer<Either<T, DeserializationException>> consumer) {
        return this.receive(consumerGroup, null, consumer);
    }

    public synchronized Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer) {
        String queueName = queueType == null ? UUID.randomUUID().toString() : queueType.getSimpleName();
        if (!this.queues.containsKey(queueName)) {
            this.queues.put(queueName, Collections.synchronizedList(new ArrayList()));
        }
        this.queues.get(queueName).add(consumer);
        int index = this.queues.get(queueName).size() - 1;
        return () -> {
            MemoryQueue memoryQueue = this;
            synchronized (memoryQueue) {
                this.queues.get(queueName).remove(index);
                if (this.queues.get(queueName).isEmpty()) {
                    this.queues.remove(queueName);
                }
            }
        };
    }

    public void pause() {
    }

    public int getSubscribersCount() {
        return this.queues.values().stream().map(List::size).reduce(0, Integer::sum);
    }

    public void close() throws IOException {
        if (!this.executorService.isShutdown()) {
            this.executorService.shutdown();
        }
    }
}

