/*
 * Decompiled with CFR 0.152.
 */
package net.osomahe.esk.eventstore.control;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import javax.ejb.Schedule;
import javax.ejb.Singleton;
import javax.ejb.Startup;
import javax.enterprise.concurrent.ManagedScheduledExecutorService;
import javax.enterprise.event.Event;
import javax.inject.Inject;
import javax.json.JsonObject;
import javax.json.bind.Jsonb;
import javax.json.bind.JsonbBuilder;
import net.osomahe.esk.config.boundary.ConfigurationBoundary;
import net.osomahe.esk.eventstore.control.EventDeserializer;
import net.osomahe.esk.eventstore.control.EventSubscriptionDataStore;
import net.osomahe.esk.eventstore.control.TopicService;
import net.osomahe.esk.eventstore.entity.AsyncEvent;
import net.osomahe.esk.eventstore.entity.EventExpirationSecs;
import net.osomahe.esk.eventstore.entity.EventName;
import net.osomahe.esk.eventstore.entity.EventStoreEvent;
import net.osomahe.esk.eventstore.entity.LoggableEvent;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

@Singleton
@Startup
public class EventStoreSubscriber {
    private static final Logger logger = Logger.getLogger(EventStoreSubscriber.class.getName());
    @Inject
    private ConfigurationBoundary config;
    @Inject
    private EventSubscriptionDataStore eventDataStore;
    @Inject
    private TopicService topicService;
    @Inject
    private Event<EventStoreEvent> events;
    @Resource
    private ManagedScheduledExecutorService mses;
    private Map<String, Map<String, Class<? extends EventStoreEvent>>> mapTopics = new ConcurrentHashMap<String, Map<String, Class<? extends EventStoreEvent>>>();
    private Map<Class<? extends EventStoreEvent>, Long> mapExpiration = new ConcurrentHashMap<Class<? extends EventStoreEvent>, Long>();
    private Jsonb jsonb;
    private KafkaConsumer<String, JsonObject> consumer;
    private ScheduledFuture<?> sfConsumerPoll;
    private String applicationName;

    @PostConstruct
    public void init() {
        this.jsonb = JsonbBuilder.create();
        this.eventDataStore.getEventClasses().forEach(this::subscribeForTopic);
        Properties config = this.config.getKafkaConsumerConfig();
        this.applicationName = config.getProperty("group.id");
        logger.info(String.format("Subscribing as %s for topics %s", this.applicationName, this.mapTopics));
        if (this.mapTopics.size() > 0) {
            this.consumer = new KafkaConsumer(config, (Deserializer)new StringDeserializer(), (Deserializer)new EventDeserializer());
            this.consumer.subscribe(this.mapTopics.keySet());
            this.sfConsumerPoll = this.mses.scheduleAtFixedRate(this::pollMessages, 1000L, 200L, TimeUnit.MILLISECONDS);
        }
    }

    @Schedule(hour="*", minute="*", persistent=false)
    public void checkLiveness() {
        if (this.consumer != null && (this.sfConsumerPoll.isCancelled() || this.sfConsumerPoll.isDone())) {
            logger.warning(String.format("KafkaConsumer polling has to be restarted for %s ", this.applicationName));
            this.sfConsumerPoll = this.mses.scheduleAtFixedRate(this::pollMessages, 1000L, 200L, TimeUnit.MILLISECONDS);
        }
    }

    private void subscribeForTopic(Class<? extends EventStoreEvent> eventClass) {
        Long eventExpiration;
        String topicName = this.topicService.getTopicName(eventClass);
        if (!this.mapTopics.containsKey(topicName)) {
            this.mapTopics.put(topicName, new ConcurrentHashMap());
        }
        this.mapTopics.get(topicName).put(this.getEventName(eventClass), eventClass);
        if (!this.mapExpiration.containsKey(eventClass) && (eventExpiration = this.getEventExpiration(eventClass)) != null) {
            this.mapExpiration.put(eventClass, eventExpiration);
        }
    }

    private Long getEventExpiration(Class<? extends EventStoreEvent> eventClass) {
        EventExpirationSecs expirationSecs = eventClass.getAnnotation(EventExpirationSecs.class);
        if (expirationSecs != null) {
            return expirationSecs.value();
        }
        return null;
    }

    private String getEventName(Class<? extends EventStoreEvent> eventClass) {
        EventName eventName = eventClass.getAnnotation(EventName.class);
        if (eventName != null) {
            return eventName.value();
        }
        return eventClass.getSimpleName();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void pollMessages() {
        KafkaConsumer<String, JsonObject> kafkaConsumer = this.consumer;
        synchronized (kafkaConsumer) {
            try {
                ConsumerRecords records = this.consumer.poll(Duration.of(1L, ChronoUnit.SECONDS));
                for (ConsumerRecord rcd : records) {
                    JsonObject event = (JsonObject)rcd.value();
                    String eventName = event.getString("name");
                    Map<String, Class<? extends EventStoreEvent>> mapEvents = this.mapTopics.get(rcd.topic());
                    if (!mapEvents.containsKey(eventName)) continue;
                    Class<? extends EventStoreEvent> eventClass = mapEvents.get(eventName);
                    if (this.isEventExpired((ConsumerRecord<String, JsonObject>)rcd, eventClass)) {
                        logger.fine(String.format("Skipping expired event: %s", event));
                        continue;
                    }
                    EventStoreEvent data = (EventStoreEvent)this.jsonb.fromJson(event.getJsonObject("data").toString(), eventClass);
                    if (data.getClass().isAnnotationPresent(LoggableEvent.class)) {
                        logger.fine(String.format("EventStoreEvent (%s) firing for %s %s", data.getClass().getSimpleName(), this.applicationName, data));
                    }
                    try {
                        if (eventClass.isAnnotationPresent(AsyncEvent.class)) {
                            this.events.fireAsync((Object)data);
                            continue;
                        }
                        this.events.fire((Object)data);
                    }
                    catch (Exception e) {
                        logger.log(Level.SEVERE, "Error in firing polled kafka messages", e);
                    }
                }
            }
            catch (Exception e) {
                logger.log(Level.SEVERE, "Error in polling kafka messages", e);
            }
        }
    }

    private boolean isEventExpired(ConsumerRecord<String, JsonObject> consumerRecord, Class<? extends EventStoreEvent> eventClass) {
        if (this.mapExpiration.containsKey(eventClass) && consumerRecord.timestampType() == TimestampType.CREATE_TIME) {
            long createMilis = consumerRecord.timestamp();
            long expirationSecs = this.mapExpiration.get(eventClass);
            long nowMilis = System.currentTimeMillis();
            return nowMilis > createMilis + expirationSecs * 1000L;
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @PreDestroy
    public void destroy() {
        if (this.sfConsumerPoll != null) {
            KafkaConsumer<String, JsonObject> kafkaConsumer = this.consumer;
            synchronized (kafkaConsumer) {
                this.sfConsumerPoll.cancel(false);
                this.consumer.close(5L, TimeUnit.SECONDS);
            }
        }
    }
}

