/*
 * Decompiled with CFR 0.152.
 */
package net.osomahe.esk.eventstore.boundary;

import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.PreDestroy;
import javax.ejb.Stateless;
import javax.inject.Inject;
import net.osomahe.esk.eventstore.control.TopicService;
import net.osomahe.esk.eventstore.entity.EventGroupKey;
import net.osomahe.esk.eventstore.entity.EventKey;
import net.osomahe.esk.eventstore.entity.EventNotPublishedException;
import net.osomahe.esk.eventstore.entity.EventStoreEvent;
import net.osomahe.esk.eventstore.entity.EventStoreException;
import net.osomahe.esk.eventstore.entity.LoggableEvent;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

@Stateless
public class EventStorePublisher {
    private static final Logger logger = Logger.getLogger(EventStorePublisher.class.getName());
    @Inject
    private KafkaProducer<String, EventStoreEvent> kafkaProducer;
    @Inject
    private TopicService topicService;

    public <T extends EventStoreEvent> RecordMetadata publish(T event) {
        try {
            RecordMetadata metadata = this.publishAsync(event).get();
            if (metadata == null) {
                throw new EventNotPublishedException(event);
            }
            return metadata;
        }
        catch (InterruptedException | ExecutionException e) {
            throw new EventStoreException("Cannot publish the event: " + event, e);
        }
    }

    public <T extends EventStoreEvent> CompletableFuture<RecordMetadata> publishAsync(T event) {
        ProducerRecord record = new ProducerRecord(this.topicService.getTopicName(event.getClass()), this.getPartition(event), (Object)this.getEventKey(event), event);
        if (event.getClass().isAnnotationPresent(LoggableEvent.class)) {
            logger.fine(String.format("EventStoreEvent (%s) publishing %s", event.getClass().getSimpleName(), record));
        }
        CompletionStage futureMetadata = CompletableFuture.supplyAsync(() -> {
            try {
                return (RecordMetadata)this.kafkaProducer.send(record).get();
            }
            catch (InterruptedException | ExecutionException e) {
                throw new EventStoreException("Cannot publish the event: " + event, e);
            }
        }).exceptionally(throwable -> {
            logger.log(Level.SEVERE, "EventStoreEvent was NOT published", (Throwable)throwable);
            return null;
        });
        return futureMetadata;
    }

    private <T extends EventStoreEvent> String getEventKey(T event) {
        Object eventKey = this.getValueForAnnotation(event, event.getClass(), EventKey.class);
        if (eventKey != null) {
            return eventKey.toString();
        }
        return System.currentTimeMillis() + "-" + UUID.randomUUID().toString();
    }

    private <T extends EventStoreEvent> Integer getPartition(T event) {
        Object groupKeyValue = this.getGroupKeyValue(event);
        if (groupKeyValue == null) {
            return null;
        }
        int partitionCount = this.topicService.getPartitionCount(event.getClass());
        return Math.abs(groupKeyValue.hashCode()) % partitionCount;
    }

    private <T extends EventStoreEvent> Object getGroupKeyValue(T event) {
        return this.getValueForAnnotation(event, event.getClass(), EventGroupKey.class);
    }

    private <T extends EventStoreEvent> Object getValueForAnnotation(T event, Class<?> eventClass, Class<? extends Annotation> annotation) {
        Object valueFromField = this.getValueFromField(event, eventClass, annotation);
        if (valueFromField != null) {
            return valueFromField;
        }
        Object valueFromMethod = this.getValueFromMethod(event, eventClass, annotation);
        if (valueFromMethod != null) {
            return valueFromMethod;
        }
        if (eventClass.getSuperclass() != null) {
            return this.getValueForAnnotation(event, eventClass.getSuperclass(), annotation);
        }
        return null;
    }

    private <T extends EventStoreEvent> Object getValueFromField(T event, Class<?> eventClass, Class<? extends Annotation> annotation) {
        for (Field f : eventClass.getDeclaredFields()) {
            if (!f.isAnnotationPresent(annotation)) continue;
            try {
                f.setAccessible(true);
                return f.get(event);
            }
            catch (IllegalAccessException e) {
                logger.log(Level.SEVERE, "Cannot get field value of " + annotation + " for event: " + event, e);
            }
        }
        return null;
    }

    private <T extends EventStoreEvent> Object getValueFromMethod(T event, Class<?> eventClass, Class<? extends Annotation> annotation) {
        for (Method method : eventClass.getDeclaredMethods()) {
            if (!method.isAnnotationPresent(annotation)) continue;
            try {
                if (!method.isAccessible()) {
                    method.setAccessible(true);
                }
                return method.invoke(event, new Object[0]);
            }
            catch (IllegalAccessException | InvocationTargetException e) {
                logger.log(Level.SEVERE, "Cannot get method value of " + annotation + " for event: " + event, e);
            }
        }
        return null;
    }

    @PreDestroy
    public void destroy() {
        this.kafkaProducer.close(5L, TimeUnit.SECONDS);
    }
}

