/*
 * Decompiled with CFR 0.152.
 */
package io.eventuate.local.java.events;

import io.eventuate.SubscriberOptions;
import io.eventuate.common.eventuate.local.PublishedEvent;
import io.eventuate.common.id.Int128;
import io.eventuate.common.json.mapper.JSonMapper;
import io.eventuate.javaclient.commonimpl.common.SerializedEvent;
import io.eventuate.javaclient.commonimpl.events.AggregateEvents;
import io.eventuate.local.java.common.EtopEventContext;
import io.eventuate.messaging.kafka.basic.consumer.EventuateKafkaConsumer;
import io.eventuate.messaging.kafka.basic.consumer.EventuateKafkaConsumerConfigurationProperties;
import io.eventuate.messaging.kafka.basic.consumer.KafkaConsumerFactory;
import io.eventuate.messaging.kafka.common.AggregateTopicMapping;
import io.eventuate.messaging.kafka.common.EventuateKafkaConfigurationProperties;
import io.eventuate.messaging.kafka.common.EventuateKafkaMultiMessageConverter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.PreDestroy;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventuateKafkaAggregateSubscriptions
implements AggregateEvents {
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    private EventuateKafkaConfigurationProperties eventuateLocalAggregateStoreConfiguration;
    private EventuateKafkaConsumerConfigurationProperties eventuateKafkaConsumerConfigurationProperties;
    private EventuateKafkaMultiMessageConverter eventuateKafkaMultiMessageConverter = new EventuateKafkaMultiMessageConverter();
    private KafkaConsumerFactory kafkaConsumerFactory;
    private final List<EventuateKafkaConsumer> consumers = new ArrayList<EventuateKafkaConsumer>();

    public EventuateKafkaAggregateSubscriptions(EventuateKafkaConfigurationProperties eventuateLocalAggregateStoreConfiguration, EventuateKafkaConsumerConfigurationProperties eventuateKafkaConsumerConfigurationProperties, KafkaConsumerFactory kafkaConsumerFactory) {
        this.eventuateLocalAggregateStoreConfiguration = eventuateLocalAggregateStoreConfiguration;
        this.eventuateKafkaConsumerConfigurationProperties = eventuateKafkaConsumerConfigurationProperties;
        this.kafkaConsumerFactory = kafkaConsumerFactory;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @PreDestroy
    public void cleanUp() {
        List<EventuateKafkaConsumer> list = this.consumers;
        synchronized (list) {
            this.consumers.stream().forEach(EventuateKafkaConsumer::stop);
        }
        this.logger.debug("Waiting for consumers to commit");
        try {
            TimeUnit.SECONDS.sleep(2L);
        }
        catch (InterruptedException e) {
            this.logger.error("Error waiting", (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addConsumer(EventuateKafkaConsumer consumer) {
        List<EventuateKafkaConsumer> list = this.consumers;
        synchronized (list) {
            this.consumers.add(consumer);
        }
    }

    public CompletableFuture<?> subscribe(String subscriberId, Map<String, Set<String>> aggregatesAndEvents, SubscriberOptions subscriberOptions, Function<SerializedEvent, CompletableFuture<?>> handler) {
        this.logger.info("Subscribing: subscriberId = {}, aggregatesAndEvents = {}, options = {}", new Object[]{subscriberId, aggregatesAndEvents, subscriberOptions});
        List topics = aggregatesAndEvents.keySet().stream().map(AggregateTopicMapping::aggregateTypeToTopic).collect(Collectors.toList());
        this.logger.info("Creating consumer: subscriberId = {}, aggregatesAndEvents = {}, options = {}", new Object[]{subscriberId, aggregatesAndEvents, subscriberOptions});
        EventuateKafkaConsumer consumer = new EventuateKafkaConsumer(subscriberId, (record, callback) -> {
            List<SerializedEvent> serializedEvents = this.toSerializedEvents((ConsumerRecord<String, byte[]>)record);
            for (SerializedEvent se : serializedEvents) {
                if (((Set)aggregatesAndEvents.get(se.getEntityType())).contains(se.getEventType())) {
                    ((CompletableFuture)handler.apply(se)).whenComplete((result, t) -> callback.accept(null, t));
                    continue;
                }
                callback.accept(null, null);
            }
            return null;
        }, topics, this.eventuateLocalAggregateStoreConfiguration.getBootstrapServers(), this.eventuateKafkaConsumerConfigurationProperties, this.kafkaConsumerFactory);
        this.addConsumer(consumer);
        this.logger.info("Starting consumer: subscriberId = {}, aggregatesAndEvents = {}, options = {}", new Object[]{subscriberId, aggregatesAndEvents, subscriberOptions});
        consumer.start();
        this.logger.info("Subscribed: subscriberId = {}, aggregatesAndEvents = {}, options = {}", new Object[]{subscriberId, aggregatesAndEvents, subscriberOptions});
        return CompletableFuture.completedFuture(null);
    }

    private List<SerializedEvent> toSerializedEvents(ConsumerRecord<String, byte[]> record) {
        return this.eventuateKafkaMultiMessageConverter.convertBytesToValues((byte[])record.value()).stream().map(value -> this.jsonToSerializedEvent((String)value, record)).collect(Collectors.toList());
    }

    private SerializedEvent jsonToSerializedEvent(String value, ConsumerRecord<String, byte[]> record) {
        PublishedEvent pe = (PublishedEvent)JSonMapper.fromJson((String)value, PublishedEvent.class);
        return new SerializedEvent(Int128.fromString((String)pe.getId()), pe.getEntityId(), pe.getEntityType(), pe.getEventData(), pe.getEventType(), Integer.valueOf(record.partition()), Long.valueOf(record.offset()), EtopEventContext.make((String)pe.getId(), (String)record.topic(), (int)record.partition(), (long)record.offset()), pe.getMetadata());
    }
}

