/*
 * Decompiled with CFR 0.152.
 */
package org.kie.kogito.persistence.kafka;

import java.util.Optional;
import java.util.Spliterators;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.kie.kogito.persistence.kafka.KafkaPersistenceUtils;
import org.kie.kogito.process.MutableProcessInstances;
import org.kie.kogito.process.Process;
import org.kie.kogito.process.ProcessInstance;
import org.kie.kogito.process.ProcessInstanceDuplicatedException;
import org.kie.kogito.process.ProcessInstanceReadMode;
import org.kie.kogito.process.impl.AbstractProcessInstance;
import org.kie.kogito.serialization.process.ProcessInstanceMarshallerService;

public class KafkaProcessInstances
implements MutableProcessInstances {
    private Process<?> process;
    private KafkaProducer<String, byte[]> producer;
    private String topic;
    private ReadOnlyKeyValueStore<String, byte[]> store;
    private ProcessInstanceMarshallerService marshaller;
    private CountDownLatch latch = new CountDownLatch(1);

    public KafkaProcessInstances(Process<?> process, KafkaProducer<String, byte[]> producer) {
        this.process = process;
        this.topic = KafkaPersistenceUtils.topicName();
        this.producer = producer;
        this.setMarshaller(ProcessInstanceMarshallerService.newBuilder().withDefaultObjectMarshallerStrategies().build());
    }

    protected Process<?> getProcess() {
        return this.process;
    }

    protected ReadOnlyKeyValueStore<String, byte[]> getStore() {
        if (this.store != null) {
            return this.store;
        }
        return this.getStoreAwait();
    }

    protected void setStore(ReadOnlyKeyValueStore<String, byte[]> store) {
        this.store = store;
        this.latch.countDown();
    }

    private ReadOnlyKeyValueStore<String, byte[]> getStoreAwait() {
        try {
            if (this.latch.await(1L, TimeUnit.MINUTES)) {
                if (this.store == null) {
                    throw new RuntimeException("Failed to obtain Kafka Store for process: " + this.process.id());
                }
                return this.store;
            }
            throw new RuntimeException("Timeout waiting to obtain Kafka Store for process: " + this.process.id());
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to obtain Kafka Store for process: " + this.process.id(), e);
        }
    }

    protected void setMarshaller(ProcessInstanceMarshallerService marshaller) {
        this.marshaller = marshaller;
    }

    public boolean exists(String id) {
        return this.getProcessInstanceById(id).isPresent();
    }

    protected Optional<byte[]> getProcessInstanceById(String id) {
        return Optional.ofNullable((byte[])this.getStore().get((Object)this.getKeyForProcessInstance(id)));
    }

    protected String getKeyForProcessInstance(String id) {
        return String.format("%s-%s", this.getProcess().id(), id);
    }

    protected void sendKafkaRecord(String id, byte[] data) throws ExecutionException, InterruptedException {
        this.producer.send(new ProducerRecord(this.topic, (Object)this.getKeyForProcessInstance(id), (Object)data)).get();
    }

    public void create(String id, ProcessInstance instance) {
        if (this.isActive(instance)) {
            if (this.getProcessInstanceById(id).isPresent()) {
                throw new ProcessInstanceDuplicatedException(id);
            }
            try {
                this.sendKafkaRecord(id, this.marshaller.marshallProcessInstance(instance));
            }
            catch (Exception e) {
                throw new RuntimeException("Unable to persist process instance id: " + id, e);
            }
        }
    }

    public void update(String id, ProcessInstance instance) {
        if (this.isActive(instance)) {
            byte[] data = this.marshaller.marshallProcessInstance(instance);
            try {
                this.sendKafkaRecord(id, data);
                this.disconnect(instance);
            }
            catch (Exception e) {
                throw new RuntimeException("Unable to update process instance id: " + id, e);
            }
        }
    }

    public void remove(String id) {
        try {
            this.sendKafkaRecord(id, null);
        }
        catch (Exception e) {
            throw new RuntimeException("Unable to remove process instance id: " + id, e);
        }
    }

    public Optional<ProcessInstance<?>> findById(String id, ProcessInstanceReadMode mode) {
        return this.getProcessInstanceById(id).map(this.marshaller.createUnmarshallFunction(this.process, mode));
    }

    public Stream<ProcessInstance<?>> stream(ProcessInstanceReadMode mode) {
        KeyValueIterator iterator = this.getStore().prefixScan((Object)this.getProcess().id(), Serdes.String().serializer());
        return (Stream)StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, 16), false).map(k -> (byte[])k.value).map(this.marshaller.createUnmarshallFunction(this.process, mode)).onClose(() -> ((KeyValueIterator)iterator).close());
    }

    protected void disconnect(ProcessInstance<?> instance) {
        ((AbstractProcessInstance)instance).internalRemoveProcessInstance(this.marshaller.createdReloadFunction(() -> this.getProcessInstanceById(instance.id()).orElseThrow()));
    }
}

