/*
 * Decompiled with CFR 0.152.
 */
package io.zeebe.hazelcast.connect.java;

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.hazelcast.client.HazelcastClientNotActiveException;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.ringbuffer.Ringbuffer;
import com.hazelcast.ringbuffer.StaleSequenceException;
import io.zeebe.exporter.proto.Schema;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZeebeHazelcast
implements AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger(ZeebeHazelcast.class);
    private static final List<Class<? extends Message>> RECORD_MESSAGE_TYPES = new ArrayList<Class<? extends Message>>();
    private final Ringbuffer<byte[]> ringbuffer;
    private final Map<Class<?>, List<Consumer<?>>> listeners;
    private final Consumer<Long> postProcessListener;
    private long sequence;
    private Future<?> future;
    private ExecutorService executorService;
    private volatile boolean isClosed = false;

    private ZeebeHazelcast(Ringbuffer<byte[]> ringbuffer, long sequence, Map<Class<?>, List<Consumer<?>>> listeners, Consumer<Long> postProcessListener) {
        this.ringbuffer = ringbuffer;
        this.sequence = sequence;
        this.listeners = listeners;
        this.postProcessListener = postProcessListener;
    }

    public static Builder newBuilder(HazelcastInstance hazelcastInstance) {
        return new Builder(hazelcastInstance);
    }

    private void start() {
        this.executorService = Executors.newSingleThreadExecutor();
        this.future = this.executorService.submit(this::readFromBuffer);
    }

    public boolean isClosed() {
        return this.isClosed;
    }

    @Override
    public void close() throws Exception {
        LOGGER.info("Closing. Stop reading from ringbuffer. Current sequence: '{}'", (Object)this.getSequence());
        this.isClosed = true;
        if (this.future != null) {
            this.future.cancel(true);
        }
        if (this.executorService != null) {
            this.executorService.shutdown();
        }
    }

    public long getSequence() {
        return this.sequence;
    }

    private void readFromBuffer() {
        while (!this.isClosed) {
            this.readNext();
        }
    }

    private void readNext() {
        block9: {
            LOGGER.trace("Read from ring-buffer with sequence '{}'", (Object)this.sequence);
            try {
                byte[] item = (byte[])this.ringbuffer.readOne(this.sequence);
                Schema.Record genericRecord = Schema.Record.parseFrom((byte[])item);
                this.handleRecord(genericRecord);
                ++this.sequence;
                this.postProcessListener.accept(this.sequence);
            }
            catch (InvalidProtocolBufferException e) {
                LOGGER.error("Failed to deserialize Protobuf message at sequence '{}'", (Object)this.sequence, (Object)e);
                ++this.sequence;
            }
            catch (StaleSequenceException e) {
                long headSequence = e.getHeadSeq();
                LOGGER.warn("Fail to read from ring-buffer at sequence '{}'. The sequence is reported as stale. Continue with new head sequence at '{}'", new Object[]{this.sequence, headSequence, e});
                this.sequence = headSequence;
            }
            catch (IllegalArgumentException e) {
                long headSequence = this.ringbuffer.headSequence();
                LOGGER.warn("Fail to read from ring-buffer at sequence '{}'. Continue with head sequence at '{}'", new Object[]{this.sequence, headSequence, e});
                this.sequence = headSequence;
            }
            catch (HazelcastClientNotActiveException e) {
                LOGGER.warn("Lost connection to the Hazelcast server", (Throwable)e);
                try {
                    this.close();
                }
                catch (Exception closingFailure) {
                    LOGGER.debug("Failure while closing the client", (Throwable)closingFailure);
                }
            }
            catch (InterruptedException e) {
                LOGGER.debug("Interrupted while reading from ring-buffer with sequence '{}'", (Object)this.sequence);
                throw new RuntimeException("Interrupted while reading from ring-buffer", e);
            }
            catch (Exception e) {
                if (this.isClosed) break block9;
                LOGGER.error("Fail to read from ring-buffer at sequence '{}'. Will try again.", (Object)this.sequence, (Object)e);
            }
        }
    }

    private void handleRecord(Schema.Record genericRecord) throws InvalidProtocolBufferException {
        for (Class<? extends Message> type : RECORD_MESSAGE_TYPES) {
            boolean handled = this.handleRecord(genericRecord, type);
            if (!handled) continue;
            return;
        }
    }

    private <T extends Message> boolean handleRecord(Schema.Record genericRecord, Class<T> t) throws InvalidProtocolBufferException {
        if (genericRecord.getRecord().is(t)) {
            Message record = genericRecord.getRecord().unpack(t);
            this.listeners.getOrDefault(t, List.of()).forEach(listener -> listener.accept(record));
            return true;
        }
        return false;
    }

    static {
        RECORD_MESSAGE_TYPES.add(Schema.DeploymentRecord.class);
        RECORD_MESSAGE_TYPES.add(Schema.DeploymentDistributionRecord.class);
        RECORD_MESSAGE_TYPES.add(Schema.ErrorRecord.class);
        RECORD_MESSAGE_TYPES.add(Schema.IncidentRecord.class);
        RECORD_MESSAGE_TYPES.add(Schema.JobRecord.class);
        RECORD_MESSAGE_TYPES.add(Schema.JobBatchRecord.class);
        RECORD_MESSAGE_TYPES.add(Schema.MessageStartEventSubscriptionRecord.class);
        RECORD_MESSAGE_TYPES.add(Schema.MessageSubscriptionRecord.class);
        RECORD_MESSAGE_TYPES.add(Schema.MessageRecord.class);
        RECORD_MESSAGE_TYPES.add(Schema.ProcessRecord.class);
        RECORD_MESSAGE_TYPES.add(Schema.ProcessEventRecord.class);
        RECORD_MESSAGE_TYPES.add(Schema.ProcessInstanceRecord.class);
        RECORD_MESSAGE_TYPES.add(Schema.ProcessInstanceCreationRecord.class);
        RECORD_MESSAGE_TYPES.add(Schema.ProcessMessageSubscriptionRecord.class);
        RECORD_MESSAGE_TYPES.add(Schema.TimerRecord.class);
        RECORD_MESSAGE_TYPES.add(Schema.VariableRecord.class);
        RECORD_MESSAGE_TYPES.add(Schema.VariableDocumentRecord.class);
    }

    public static class Builder {
        private final HazelcastInstance hazelcastInstance;
        private final Map<Class<?>, List<Consumer<?>>> listeners = new HashMap();
        private String name = "zeebe";
        private long readFromSequence = -1L;
        private boolean readFromHead = false;
        private Consumer<Long> postProcessListener = sequence -> {};

        private Builder(HazelcastInstance hazelcastInstance) {
            this.hazelcastInstance = hazelcastInstance;
        }

        public Builder name(String name) {
            this.name = name;
            return this;
        }

        public Builder readFrom(long sequence) {
            this.readFromSequence = sequence;
            this.readFromHead = false;
            return this;
        }

        public Builder readFromHead() {
            this.readFromSequence = -1L;
            this.readFromHead = true;
            return this;
        }

        public Builder readFromTail() {
            this.readFromSequence = -1L;
            this.readFromHead = false;
            return this;
        }

        public Builder postProcessListener(Consumer<Long> listener) {
            this.postProcessListener = listener;
            return this;
        }

        private <T extends Message> void addListener(Class<T> recordType, Consumer<T> listener) {
            List recordListeners = this.listeners.getOrDefault(recordType, new ArrayList());
            recordListeners.add(listener);
            this.listeners.put(recordType, recordListeners);
        }

        public Builder addDeploymentListener(Consumer<Schema.DeploymentRecord> listener) {
            this.addListener(Schema.DeploymentRecord.class, listener);
            return this;
        }

        public Builder addDeploymentDistributionListener(Consumer<Schema.DeploymentDistributionRecord> listener) {
            this.addListener(Schema.DeploymentDistributionRecord.class, listener);
            return this;
        }

        public Builder addProcessListener(Consumer<Schema.ProcessRecord> listener) {
            this.addListener(Schema.ProcessRecord.class, listener);
            return this;
        }

        public Builder addProcessInstanceListener(Consumer<Schema.ProcessInstanceRecord> listener) {
            this.addListener(Schema.ProcessInstanceRecord.class, listener);
            return this;
        }

        public Builder addProcessEventListener(Consumer<Schema.ProcessEventRecord> listener) {
            this.addListener(Schema.ProcessEventRecord.class, listener);
            return this;
        }

        public Builder addVariableListener(Consumer<Schema.VariableRecord> listener) {
            this.addListener(Schema.VariableRecord.class, listener);
            return this;
        }

        public Builder addVariableDocumentListener(Consumer<Schema.VariableDocumentRecord> listener) {
            this.addListener(Schema.VariableDocumentRecord.class, listener);
            return this;
        }

        public Builder addJobListener(Consumer<Schema.JobRecord> listener) {
            this.addListener(Schema.JobRecord.class, listener);
            return this;
        }

        public Builder addJobBatchListener(Consumer<Schema.JobBatchRecord> listener) {
            this.addListener(Schema.JobBatchRecord.class, listener);
            return this;
        }

        public Builder addIncidentListener(Consumer<Schema.IncidentRecord> listener) {
            this.addListener(Schema.IncidentRecord.class, listener);
            return this;
        }

        public Builder addTimerListener(Consumer<Schema.TimerRecord> listener) {
            this.addListener(Schema.TimerRecord.class, listener);
            return this;
        }

        public Builder addMessageListener(Consumer<Schema.MessageRecord> listener) {
            this.addListener(Schema.MessageRecord.class, listener);
            return this;
        }

        public Builder addMessageSubscriptionListener(Consumer<Schema.MessageSubscriptionRecord> listener) {
            this.addListener(Schema.MessageSubscriptionRecord.class, listener);
            return this;
        }

        public Builder addMessageStartEventSubscriptionListener(Consumer<Schema.MessageStartEventSubscriptionRecord> listener) {
            this.addListener(Schema.MessageStartEventSubscriptionRecord.class, listener);
            return this;
        }

        public Builder addProcessMessageSubscriptionListener(Consumer<Schema.ProcessMessageSubscriptionRecord> listener) {
            this.addListener(Schema.ProcessMessageSubscriptionRecord.class, listener);
            return this;
        }

        public Builder addProcessInstanceCreationListener(Consumer<Schema.ProcessInstanceCreationRecord> listener) {
            this.addListener(Schema.ProcessInstanceCreationRecord.class, listener);
            return this;
        }

        public Builder addErrorListener(Consumer<Schema.ErrorRecord> listener) {
            this.addListener(Schema.ErrorRecord.class, listener);
            return this;
        }

        private long getSequence(Ringbuffer<?> ringbuffer) {
            long headSequence = ringbuffer.headSequence();
            long tailSequence = ringbuffer.tailSequence();
            if (this.readFromSequence > 0L) {
                if (this.readFromSequence > tailSequence + 1L) {
                    LOGGER.info("The given sequence '{}' is greater than the current tail-sequence '{}' of the ringbuffer. Using the head-sequence instead.", (Object)this.readFromSequence, (Object)tailSequence);
                    return headSequence;
                }
                return this.readFromSequence;
            }
            if (this.readFromHead) {
                return headSequence;
            }
            return Math.max(headSequence, tailSequence);
        }

        public ZeebeHazelcast build() {
            LOGGER.debug("Read from ringbuffer with name '{}'", (Object)this.name);
            Ringbuffer ringbuffer = this.hazelcastInstance.getRingbuffer(this.name);
            if (ringbuffer == null) {
                throw new IllegalArgumentException(String.format("No ring buffer found with name '%s'", this.name));
            }
            LOGGER.debug("Ringbuffer status: [head: {}, tail: {}, size: {}, capacity: {}]", new Object[]{ringbuffer.headSequence(), ringbuffer.tailSequence(), ringbuffer.size(), ringbuffer.capacity()});
            long sequence = this.getSequence(ringbuffer);
            LOGGER.info("Read from ringbuffer '{}' starting from sequence '{}'", (Object)this.name, (Object)sequence);
            ZeebeHazelcast zeebeHazelcast = new ZeebeHazelcast((Ringbuffer<byte[]>)ringbuffer, sequence, this.listeners, this.postProcessListener);
            zeebeHazelcast.start();
            return zeebeHazelcast;
        }
    }
}

