/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.process.test.testengine;

import io.camunda.zeebe.engine.processing.streamprocessor.TypedEventRegistry;
import io.camunda.zeebe.logstreams.log.LogStreamReader;
import io.camunda.zeebe.logstreams.log.LoggedEvent;
import io.camunda.zeebe.process.test.testengine.RecordStreamSource;
import io.camunda.zeebe.protocol.impl.record.CopiedRecord;
import io.camunda.zeebe.protocol.impl.record.RecordMetadata;
import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.RecordValue;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.value.DeploymentRecordValue;
import io.camunda.zeebe.protocol.record.value.IncidentRecordValue;
import io.camunda.zeebe.protocol.record.value.JobBatchRecordValue;
import io.camunda.zeebe.protocol.record.value.JobRecordValue;
import io.camunda.zeebe.protocol.record.value.MessageRecordValue;
import io.camunda.zeebe.protocol.record.value.MessageStartEventSubscriptionRecordValue;
import io.camunda.zeebe.protocol.record.value.MessageSubscriptionRecordValue;
import io.camunda.zeebe.protocol.record.value.ProcessInstanceRecordValue;
import io.camunda.zeebe.protocol.record.value.ProcessMessageSubscriptionRecordValue;
import io.camunda.zeebe.protocol.record.value.TimerRecordValue;
import io.camunda.zeebe.protocol.record.value.VariableDocumentRecordValue;
import io.camunda.zeebe.protocol.record.value.VariableRecordValue;
import io.camunda.zeebe.protocol.record.value.deployment.Process;
import io.camunda.zeebe.test.util.record.CompactRecordLogger;
import io.camunda.zeebe.util.buffer.BufferReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class RecordStreamSourceImpl
implements RecordStreamSource {
    private final LogStreamReader logStreamReader;
    private final int partitionId;
    private final List<Record<?>> records = new ArrayList();
    private volatile long lastPosition = -1L;

    public RecordStreamSourceImpl(LogStreamReader logStreamReader, int partitionId) {
        this.logStreamReader = logStreamReader;
        this.partitionId = partitionId;
    }

    @Override
    public Iterable<Record<?>> records() {
        this.updateWithNewRecords();
        return Collections.unmodifiableList(this.records);
    }

    <T extends RecordValue> Iterable<Record<T>> recordsOfValueType(ValueType valueType) {
        Stream<Record> stream = StreamSupport.stream(this.records().spliterator(), false).filter(record -> record.getValueType() == valueType);
        return stream::iterator;
    }

    @Override
    public Iterable<Record<ProcessInstanceRecordValue>> processInstanceRecords() {
        return this.recordsOfValueType(ValueType.PROCESS_INSTANCE);
    }

    @Override
    public Iterable<Record<JobRecordValue>> jobRecords() {
        return this.recordsOfValueType(ValueType.JOB);
    }

    @Override
    public Iterable<Record<JobBatchRecordValue>> jobBatchRecords() {
        return this.recordsOfValueType(ValueType.JOB_BATCH);
    }

    @Override
    public Iterable<Record<DeploymentRecordValue>> deploymentRecords() {
        return this.recordsOfValueType(ValueType.DEPLOYMENT);
    }

    @Override
    public Iterable<Record<Process>> processRecords() {
        return this.recordsOfValueType(ValueType.PROCESS);
    }

    @Override
    public Iterable<Record<VariableRecordValue>> variableRecords() {
        return this.recordsOfValueType(ValueType.VARIABLE);
    }

    @Override
    public Iterable<Record<VariableDocumentRecordValue>> variableDocumentRecords() {
        return this.recordsOfValueType(ValueType.VARIABLE_DOCUMENT);
    }

    @Override
    public Iterable<Record<IncidentRecordValue>> incidentRecords() {
        return this.recordsOfValueType(ValueType.INCIDENT);
    }

    @Override
    public Iterable<Record<TimerRecordValue>> timerRecords() {
        return this.recordsOfValueType(ValueType.TIMER);
    }

    @Override
    public Iterable<Record<MessageRecordValue>> messageRecords() {
        return this.recordsOfValueType(ValueType.MESSAGE);
    }

    @Override
    public Iterable<Record<MessageSubscriptionRecordValue>> messageSubscriptionRecords() {
        return this.recordsOfValueType(ValueType.MESSAGE_SUBSCRIPTION);
    }

    @Override
    public Iterable<Record<MessageStartEventSubscriptionRecordValue>> messageStartEventSubscriptionRecords() {
        return this.recordsOfValueType(ValueType.MESSAGE_START_EVENT_SUBSCRIPTION);
    }

    @Override
    public Iterable<Record<ProcessMessageSubscriptionRecordValue>> processMessageSubscriptionRecords() {
        return this.recordsOfValueType(ValueType.PROCESS_MESSAGE_SUBSCRIPTION);
    }

    @Override
    public void print(boolean compact) {
        ArrayList recordsList = new ArrayList();
        this.records().forEach(recordsList::add);
        if (compact) {
            new CompactRecordLogger(recordsList).log();
        } else {
            System.out.println("===== records (count: ${count()}) =====");
            recordsList.forEach(record -> System.out.println(record.toJson()));
            System.out.println("---------------------------");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateWithNewRecords() {
        LogStreamReader logStreamReader = this.logStreamReader;
        synchronized (logStreamReader) {
            if (this.lastPosition < 0L) {
                this.logStreamReader.seekToFirstEvent();
            } else {
                this.logStreamReader.seekToNextEvent(this.lastPosition);
            }
            while (this.logStreamReader.hasNext()) {
                LoggedEvent event = (LoggedEvent)this.logStreamReader.next();
                CopiedRecord<UnifiedRecordValue> record = this.mapToRecord(event);
                this.records.add((Record<?>)record);
                this.lastPosition = event.getPosition();
            }
        }
    }

    private CopiedRecord<UnifiedRecordValue> mapToRecord(LoggedEvent event) {
        UnifiedRecordValue value;
        RecordMetadata metadata = new RecordMetadata();
        event.readMetadata((BufferReader)metadata);
        try {
            value = (UnifiedRecordValue)((Class)TypedEventRegistry.EVENT_REGISTRY.get(metadata.getValueType())).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        event.readValue((BufferReader)value);
        return new CopiedRecord(value, metadata, event.getKey(), this.partitionId, event.getPosition(), event.getSourceEventPosition(), event.getTimestamp());
    }
}

