/*
 * Decompiled with CFR 0.152.
 */
package io.quarkiverse.zeebe.runtime.devmode.store;

import io.camunda.zeebe.protocol.record.ImmutableRecord;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.RecordValue;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.IncidentIntent;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.protocol.record.intent.MessageIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.value.BpmnElementType;
import io.camunda.zeebe.protocol.record.value.ErrorRecordValue;
import io.camunda.zeebe.protocol.record.value.EscalationRecordValue;
import io.camunda.zeebe.protocol.record.value.IncidentRecordValue;
import io.camunda.zeebe.protocol.record.value.JobRecordValue;
import io.camunda.zeebe.protocol.record.value.MessageRecordValue;
import io.camunda.zeebe.protocol.record.value.MessageStartEventSubscriptionRecordValue;
import io.camunda.zeebe.protocol.record.value.ProcessInstanceRecordValue;
import io.camunda.zeebe.protocol.record.value.ProcessMessageSubscriptionRecordValue;
import io.camunda.zeebe.protocol.record.value.SignalRecordValue;
import io.camunda.zeebe.protocol.record.value.SignalSubscriptionRecordValue;
import io.camunda.zeebe.protocol.record.value.TimerRecordValue;
import io.camunda.zeebe.protocol.record.value.VariableRecordValue;
import io.camunda.zeebe.protocol.record.value.deployment.ImmutableProcess;
import io.camunda.zeebe.protocol.record.value.deployment.Process;
import io.quarkiverse.zeebe.runtime.devmode.store.RecordStoreItem;
import io.quarkiverse.zeebe.runtime.devmode.store.Store;
import io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public class RecordStore {
    public static final BroadcastProcessor<Notification> NOTIFICATIONS = BroadcastProcessor.create();
    public static final Store<ProcessInstanceRecordValue> ELEMENT_INSTANCES = Store.create();
    public static final Store<ProcessInstanceRecordValue> INSTANCES = Store.create();
    public static final Store<Process> PROCESS_DEFINITIONS = Store.create();
    public static final Store<Process> PROCESS_DEFINITIONS_XML = Store.create();
    public static final Store<JobRecordValue> JOBS = Store.create();
    public static final Store<JobRecordValue> USER_TASKS = Store.create();
    public static final Store<VariableRecordValue> VARIABLES = Store.create();
    public static final Store<ErrorRecordValue> ERRORS = Store.create();
    public static final Store<TimerRecordValue> TIMERS = Store.create();
    public static final Store<SignalRecordValue> SIGNALS = Store.create();
    public static final Store<MessageRecordValue> MESSAGES = Store.create();
    public static final Store<IncidentRecordValue> INCIDENTS = Store.create();
    public static final Store<SignalSubscriptionRecordValue> SIGNAL_SUBSCRIPTIONS = Store.create();
    public static final Store<EscalationRecordValue> ESCALATIONS = Store.create();
    public static final Store<ProcessMessageSubscriptionRecordValue> PROCESS_MESSAGE_SUBSCRIPTIONS = Store.create();
    public static final Store<MessageStartEventSubscriptionRecordValue> START_EVENT_SUBSCRIPTIONS = Store.create();
    static Set<BpmnElementType> PROCESS_ELEMENTS_TYPES = Set.of(BpmnElementType.PROCESS, BpmnElementType.MULTI_INSTANCE_BODY);
    static Set<Intent> PROCESS_ELEMENT_INTENTS = Set.of(ProcessInstanceIntent.ELEMENT_ACTIVATED, ProcessInstanceIntent.ELEMENT_COMPLETED);

    public static void importProcessInstance(Record<ProcessInstanceRecordValue> record) {
        ELEMENT_INSTANCES.putIfAbsent(record, r -> r.getPartitionId() + "-" + r.getPosition());
        NotificationType type = NotificationType.UPDATED;
        ProcessInstanceIntent intent = (ProcessInstanceIntent)record.getIntent();
        if (((ProcessInstanceRecordValue)record.getValue()).getProcessInstanceKey() == record.getKey()) {
            RecordStoreItem<ProcessInstanceRecordValue> item = INSTANCES.put(record, r -> ((ProcessInstanceRecordValue)r.getValue()).getProcessInstanceKey());
            switch (intent) {
                case ELEMENT_ACTIVATED: {
                    item.data().put("start", RecordStore.localDateTime(record.getTimestamp()));
                    item.data().put("end", "");
                    item.data().put("state", "ACTIVE");
                    type = NotificationType.CREATED;
                    RecordStoreItem<Process> pd = PROCESS_DEFINITIONS.get(((ProcessInstanceRecordValue)record.getValue()).getProcessDefinitionKey());
                    if (pd == null) break;
                    pd.data().merge("active", 0, (v, n) -> (Integer)v + 1);
                    pd.data().putIfAbsent("ended", 0);
                    break;
                }
                case ELEMENT_TERMINATED: 
                case ELEMENT_COMPLETED: {
                    if (intent == ProcessInstanceIntent.ELEMENT_COMPLETED) {
                        item.data().put("state", "COMPLETED");
                    } else {
                        item.data().put("state", "TERMINATED");
                    }
                    item.data().put("end", RecordStore.localDateTime(record.getTimestamp()));
                    RecordStoreItem<Process> pd = PROCESS_DEFINITIONS.get(((ProcessInstanceRecordValue)record.getValue()).getProcessDefinitionKey());
                    if (pd != null) {
                        pd.data().merge("active", 0, (v, n) -> (Integer)v - 1);
                        pd.data().merge("ended", 0, (v, n) -> (Integer)v + 1);
                    }
                    type = NotificationType.ENDED;
                }
            }
        }
        if (intent == ProcessInstanceIntent.ELEMENT_ACTIVATED || intent == ProcessInstanceIntent.ELEMENT_TERMINATED || intent == ProcessInstanceIntent.ELEMENT_COMPLETED) {
            ProcessInstanceRecordValue value = (ProcessInstanceRecordValue)record.getValue();
            RecordStore.sendEvent(ValueType.PROCESS_INSTANCE, type, Map.of("processInstanceKey", value.getProcessInstanceKey(), "processDefinitionKey", value.getProcessDefinitionKey()));
        }
    }

    public static void importProcess(Record<Process> record) {
        if (record.getPartitionId() != 1) {
            return;
        }
        RecordStoreItem<Process> e = PROCESS_DEFINITIONS.get(((Process)record.getValue()).getProcessDefinitionKey());
        if (e != null) {
            return;
        }
        ImmutableRecord ic = (ImmutableRecord)record;
        ImmutableProcess p = (ImmutableProcess)record.getValue();
        RecordStoreItem<Process> item = PROCESS_DEFINITIONS.put((Record<Process>)ic.withValue((RecordValue)p.withResource(new byte[0])), r -> ((Process)r.getValue()).getProcessDefinitionKey());
        item.data().put("time", RecordStore.localDateTime(record.getTimestamp()));
        item.data().put("active", 0);
        item.data().put("ended", 0);
        PROCESS_DEFINITIONS_XML.put(record, r -> ((Process)r.getValue()).getProcessDefinitionKey());
        RecordStore.sendEvent(ValueType.PROCESS, NotificationType.DEPLOYED);
    }

    public static void importJob(Record<JobRecordValue> record) {
        JobRecordValue value = (JobRecordValue)record.getValue();
        if ("io.camunda.zeebe:userTask".equals(value.getType())) {
            RecordStoreItem<JobRecordValue> ut = USER_TASKS.put(record, Record::getKey);
            Map headers = value.getCustomHeaders();
            ut.data().put("users", headers.get("io.camunda.zeebe:candidateUsers"));
            ut.data().put("groups", headers.get("io.camunda.zeebe:candidateGroups"));
            ut.data().put("assignee", headers.get("io.camunda.zeebe:assignee"));
            ut.data().put("dueDate", headers.get("io.camunda.zeebe:dueDate"));
            ut.data().put("followUpDate", headers.get("io.camunda.zeebe:followUpDate"));
            ut.data().put("time", RecordStore.localDateTime(record.getTimestamp()));
            JobIntent uti = (JobIntent)record.getIntent();
            if (uti == JobIntent.CREATED) {
                ut.data().put("created", RecordStore.localDateTime(record.getTimestamp()));
            }
            RecordStore.sendEvent(ValueType.USER_TASK, NotificationType.UPDATED);
            return;
        }
        RecordStoreItem<JobRecordValue> job = JOBS.put(record, Record::getKey);
        job.data().put("time", RecordStore.localDateTime(record.getTimestamp()));
        RecordStore.sendEvent(ValueType.JOB, NotificationType.UPDATED);
    }

    public static void importVariable(Record<VariableRecordValue> record) {
        RecordStoreItem<VariableRecordValue> variable = VARIABLES.putIfAbsent(record, r -> r.getPartitionId() + "#" + r.getPosition());
        if (variable != null) {
            variable.data().put("time", RecordStore.localDateTime(record.getTimestamp()));
            VariableRecordValue value = (VariableRecordValue)record.getValue();
            RecordStore.sendEvent(ValueType.VARIABLE, NotificationType.UPDATED, Map.of("name", value.getName(), "processInstanceKey", value.getProcessInstanceKey(), "processDefinitionKey", value.getProcessDefinitionKey()));
        }
    }

    public static void importError(Record<ErrorRecordValue> record) {
        RecordStoreItem<ErrorRecordValue> error = ERRORS.putIfAbsent(record, Record::getPosition);
        if (error != null) {
            error.data().put("created", RecordStore.localDateTime(record.getTimestamp()));
            RecordStore.sendEvent(ValueType.ERROR, NotificationType.UPDATED);
        }
    }

    public static void importTimer(Record<TimerRecordValue> record) {
        RecordStoreItem<TimerRecordValue> timer = TIMERS.put(record, Record::getKey);
        timer.data().put("dueDate", RecordStore.localDateTime(((TimerRecordValue)record.getValue()).getDueDate()));
        timer.data().put("time", RecordStore.localDateTime(record.getTimestamp()));
    }

    public static void importSignal(Record<SignalRecordValue> record) {
        RecordStoreItem<SignalRecordValue> signal = SIGNALS.putIfAbsent(record, Record::getKey);
        signal.data().put("time", RecordStore.localDateTime(record.getTimestamp()));
        RecordStore.sendEvent(ValueType.SIGNAL, NotificationType.UPDATED, Map.of("signalName", ((SignalRecordValue)record.getValue()).getSignalName()));
    }

    public static void importMessage(Record<MessageRecordValue> record) {
        RecordStoreItem<MessageRecordValue> msg = MESSAGES.put(record, Record::getKey);
        if (msg != null) {
            MessageIntent intent = (MessageIntent)record.getIntent();
            if (MessageIntent.PUBLISHED == intent) {
                msg.data().put("name", ((MessageRecordValue)msg.record().getValue()).getName());
                msg.data().put("messageId", ((MessageRecordValue)msg.record().getValue()).getMessageId());
                msg.data().put("correlationKey", ((MessageRecordValue)msg.record().getValue()).getCorrelationKey());
            }
            msg.data().put("time", RecordStore.localDateTime(record.getTimestamp()));
            MessageRecordValue value = (MessageRecordValue)record.getValue();
            RecordStore.sendEvent(ValueType.MESSAGE, NotificationType.UPDATED, Map.of("name", value.getName(), "messageId", value.getMessageId(), "correlationKey", value.getCorrelationKey()));
        }
    }

    public static void importIncident(Record<IncidentRecordValue> record) {
        RecordStoreItem<IncidentRecordValue> incident = INCIDENTS.put(record, Record::getKey);
        IncidentIntent intent = (IncidentIntent)record.getIntent();
        NotificationType type = NotificationType.UPDATED;
        if (intent == IncidentIntent.CREATED) {
            incident.data().put("created", RecordStore.localDateTime(record.getTimestamp()));
            incident.data().put("resolved", "");
            type = NotificationType.CREATED;
        }
        if (intent == IncidentIntent.RESOLVED) {
            incident.data().put("resolved", RecordStore.localDateTime(record.getTimestamp()));
        }
        RecordStore.sendEvent(ValueType.INCIDENT, type, Map.of("processInstanceKey", ((IncidentRecordValue)record.getValue()).getProcessInstanceKey(), "processDefinitionKey", ((IncidentRecordValue)record.getValue()).getProcessDefinitionKey()));
    }

    public static void importSignalSubscription(Record<SignalSubscriptionRecordValue> record) {
        RecordStoreItem<SignalSubscriptionRecordValue> signal = SIGNAL_SUBSCRIPTIONS.putIfAbsent(record, Record::getKey);
        if (signal != null) {
            signal.data().put("time", RecordStore.localDateTime(record.getTimestamp()));
        }
    }

    public static void importEscalation(Record<EscalationRecordValue> record) {
        RecordStoreItem<EscalationRecordValue> escalation = ESCALATIONS.putIfAbsent(record, Record::getKey);
        if (escalation != null) {
            escalation.data().put("time", RecordStore.localDateTime(record.getTimestamp()));
        }
    }

    public static void importMessageSubscription(Record<ProcessMessageSubscriptionRecordValue> record) {
        RecordStoreItem<ProcessMessageSubscriptionRecordValue> item = PROCESS_MESSAGE_SUBSCRIPTIONS.put(record, r -> ((ProcessMessageSubscriptionRecordValue)r.getValue()).getElementInstanceKey() + "#" + ((ProcessMessageSubscriptionRecordValue)r.getValue()).getMessageName());
        item.data().put("time", RecordStore.localDateTime(record.getTimestamp()));
    }

    public static void importMessageStartEventSubscription(Record<MessageStartEventSubscriptionRecordValue> record) {
        RecordStoreItem<MessageStartEventSubscriptionRecordValue> item = START_EVENT_SUBSCRIPTIONS.put(record, r -> ((MessageStartEventSubscriptionRecordValue)r.getValue()).getProcessDefinitionKey() + "#" + ((MessageStartEventSubscriptionRecordValue)r.getValue()).getMessageName());
        item.data().put("time", RecordStore.localDateTime(record.getTimestamp()));
    }

    private static void sendEvent(ValueType event, NotificationType type) {
        NOTIFICATIONS.onNext((Object)new Notification(event, type, Map.of()));
    }

    private static void sendEvent(ValueType event, NotificationType type, Map<String, Object> data) {
        NOTIFICATIONS.onNext((Object)new Notification(event, type, data));
    }

    private static String localDateTime(long timestamp) {
        return LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneOffset.systemDefault()).toString();
    }

    public static Map<String, Map<String, Long>> findProcessElements(Long pdk) {
        return ELEMENT_INSTANCES.findBy(record -> ((ProcessInstanceRecordValue)record.getValue()).getProcessDefinitionKey() == pdk.longValue() && !PROCESS_ELEMENTS_TYPES.contains(((ProcessInstanceRecordValue)record.getValue()).getBpmnElementType()) && PROCESS_ELEMENT_INTENTS.contains(record.getIntent())).collect(Collectors.groupingBy(x -> ((ProcessInstanceRecordValue)x.record().getValue()).getElementId(), Collectors.mapping(x -> x.record().getIntent(), Collectors.groupingBy(Intent::name, Collectors.counting()))));
    }

    public static enum NotificationType {
        ERROR,
        DEPLOYED,
        UPDATED,
        CREATED,
        ENDED;

    }

    public record Notification(ValueType event, NotificationType type, Map<String, Object> data) {
    }
}

