/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.snapshot;

import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.message.SnapshotFooterRecord;
import org.apache.kafka.common.message.SnapshotHeaderRecord;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.ControlRecordUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.internals.BatchAccumulator;
import org.apache.kafka.server.common.serialization.RecordSerde;
import org.apache.kafka.snapshot.RawSnapshotWriter;

public final class SnapshotWriter<T>
implements AutoCloseable {
    private final RawSnapshotWriter snapshot;
    private final BatchAccumulator<T> accumulator;
    private final Time time;
    private final long lastContainedLogTimestamp;

    private SnapshotWriter(RawSnapshotWriter snapshot, int maxBatchSize, MemoryPool memoryPool, Time time, long lastContainedLogTimestamp, CompressionType compressionType, RecordSerde<T> serde) {
        this.snapshot = snapshot;
        this.time = time;
        this.lastContainedLogTimestamp = lastContainedLogTimestamp;
        this.accumulator = new BatchAccumulator<T>(snapshot.snapshotId().epoch, 0L, Integer.MAX_VALUE, maxBatchSize, memoryPool, time, compressionType, serde);
    }

    private void initializeSnapshotWithHeader() {
        if (this.snapshot.sizeInBytes() != 0L) {
            String message = String.format("Initializing writer with a non-empty snapshot: id = '%s'.", this.snapshot.snapshotId());
            throw new IllegalStateException(message);
        }
        SnapshotHeaderRecord headerRecord = new SnapshotHeaderRecord().setVersion(ControlRecordUtils.SNAPSHOT_HEADER_HIGHEST_VERSION).setLastContainedLogTimestamp(this.lastContainedLogTimestamp);
        this.accumulator.appendSnapshotHeaderMessage(headerRecord, this.time.milliseconds());
        this.accumulator.forceDrain();
    }

    private void finalizeSnapshotWithFooter() {
        SnapshotFooterRecord footerRecord = new SnapshotFooterRecord().setVersion(ControlRecordUtils.SNAPSHOT_FOOTER_HIGHEST_VERSION);
        this.accumulator.appendSnapshotFooterMessage(footerRecord, this.time.milliseconds());
        this.accumulator.forceDrain();
    }

    public static <T> Optional<SnapshotWriter<T>> createWithHeader(Supplier<Optional<RawSnapshotWriter>> supplier, int maxBatchSize, MemoryPool memoryPool, Time snapshotTime, long lastContainedLogTimestamp, CompressionType compressionType, RecordSerde<T> serde) {
        Optional<SnapshotWriter<SnapshotWriter>> writer = supplier.get().map(snapshot -> new SnapshotWriter((RawSnapshotWriter)snapshot, maxBatchSize, memoryPool, snapshotTime, lastContainedLogTimestamp, CompressionType.NONE, serde));
        writer.ifPresent(SnapshotWriter::initializeSnapshotWithHeader);
        return writer;
    }

    public OffsetAndEpoch snapshotId() {
        return this.snapshot.snapshotId();
    }

    public long lastContainedLogOffset() {
        return this.snapshot.snapshotId().offset - 1L;
    }

    public int lastContainedLogEpoch() {
        return this.snapshot.snapshotId().epoch;
    }

    public boolean isFrozen() {
        return this.snapshot.isFrozen();
    }

    public void append(List<T> records) {
        if (this.snapshot.isFrozen()) {
            String message = String.format("Append not supported. Snapshot is already frozen: id = '%s'.", this.snapshot.snapshotId());
            throw new IllegalStateException(message);
        }
        this.accumulator.append(this.snapshot.snapshotId().epoch, records);
        if (this.accumulator.needsDrain(this.time.milliseconds())) {
            this.appendBatches(this.accumulator.drain());
        }
    }

    public void freeze() {
        this.finalizeSnapshotWithFooter();
        this.appendBatches(this.accumulator.drain());
        this.snapshot.freeze();
        this.accumulator.close();
    }

    @Override
    public void close() {
        this.snapshot.close();
        this.accumulator.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void appendBatches(List<BatchAccumulator.CompletedBatch<T>> batches) {
        try {
            for (BatchAccumulator.CompletedBatch<BatchAccumulator.CompletedBatch> completedBatch : batches) {
                this.snapshot.append(completedBatch.data);
            }
        }
        finally {
            batches.forEach(BatchAccumulator.CompletedBatch::release);
        }
    }
}

