/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.pulsar.handlers.kop.utils;

import com.google.common.annotations.VisibleForTesting;
import io.streamnative.pulsar.handlers.kop.format.ValidationAndOffsetAssignResult;
import io.streamnative.pulsar.handlers.kop.utils.LongRef;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Locale;
import org.apache.bookkeeper.common.util.MathUtils;
import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.errors.InvalidTimestampException;
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.TimestampType;

public class KopLogValidator {
    public static ValidationAndOffsetAssignResult validateMessagesAndAssignOffsets(MemoryRecords records, LongRef offsetCounter, long now, CompressionCodec sourceCodec, CompressionCodec targetCodec, boolean compactedTopic, byte magic, TimestampType timestampType, long timestampDiffMaxMs) {
        if (sourceCodec.codec() == CompressionType.NONE.id && targetCodec.codec() == CompressionType.NONE.id) {
            if (!records.hasMatchingMagic(magic)) {
                return KopLogValidator.convertAndAssignOffsetsNonCompressed(records, offsetCounter, compactedTopic, now, timestampType, timestampDiffMaxMs, magic);
            }
            return KopLogValidator.assignOffsetsNonCompressed(records, offsetCounter, now, compactedTopic, timestampType, timestampDiffMaxMs, magic);
        }
        return KopLogValidator.validateMessagesAndAssignOffsetsCompressed(records, offsetCounter, now, sourceCodec, targetCodec, compactedTopic, magic, timestampType, timestampDiffMaxMs);
    }

    private static ValidationAndOffsetAssignResult convertAndAssignOffsetsNonCompressed(MemoryRecords records, LongRef offsetCounter, boolean compactedTopic, long now, TimestampType timestampType, long timestampDiffMaxMs, byte toMagicValue) {
        long startConversionNanos = MathUtils.nowInNano();
        int sizeInBytesAfterConversion = AbstractRecords.estimateSizeInBytes((byte)toMagicValue, (long)offsetCounter.value(), (CompressionType)CompressionType.NONE, (Iterable)records.records());
        Iterator batchIterator = records.batches().iterator();
        MutableRecordBatch first = (MutableRecordBatch)batchIterator.next();
        long producerId = first.producerId();
        short producerEpoch = first.producerEpoch();
        int sequence = first.baseSequence();
        boolean isTransactional = first.isTransactional();
        ByteBuffer newBuffer = ByteBuffer.allocate(sizeInBytesAfterConversion);
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)newBuffer, (byte)toMagicValue, (CompressionType)CompressionType.NONE, (TimestampType)timestampType, (long)offsetCounter.value(), (long)now, (long)producerId, (short)producerEpoch, (int)sequence, (boolean)isTransactional, (int)-1);
        records.batches().forEach(batch -> {
            KopLogValidator.validateBatch((RecordBatch)batch, toMagicValue);
            for (Record record : batch) {
                KopLogValidator.validateRecord((RecordBatch)batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic);
                builder.appendWithOffset(offsetCounter.getAndIncrement(), record);
            }
        });
        MemoryRecords memoryRecords = builder.build();
        int conversionCount = builder.numRecords();
        return ValidationAndOffsetAssignResult.get(memoryRecords, conversionCount, MathUtils.elapsedNanos((long)startConversionNanos));
    }

    private static ValidationAndOffsetAssignResult assignOffsetsNonCompressed(MemoryRecords records, LongRef offsetCounter, long now, boolean compactedTopic, TimestampType timestampType, long timestampDiffMaxMs, byte magic) {
        long maxTimestamp = -1L;
        for (MutableRecordBatch batch : records.batches()) {
            KopLogValidator.validateBatch((RecordBatch)batch, magic);
            long maxBatchTimestamp = -1L;
            for (Record record : batch) {
                KopLogValidator.validateRecord((RecordBatch)batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic);
                if (batch.magic() <= 0 || record.timestamp() <= maxBatchTimestamp) continue;
                maxBatchTimestamp = record.timestamp();
            }
            if (batch.magic() > 0 && maxBatchTimestamp > maxTimestamp) {
                maxTimestamp = maxBatchTimestamp;
            }
            batch.setLastOffset(offsetCounter.value() - 1L);
            if (batch.magic() >= 2) {
                batch.setPartitionLeaderEpoch(-1);
            }
            if (batch.magic() <= 0) continue;
            if (timestampType == TimestampType.LOG_APPEND_TIME) {
                batch.setMaxTimestamp(TimestampType.LOG_APPEND_TIME, now);
                continue;
            }
            batch.setMaxTimestamp(timestampType, maxBatchTimestamp);
        }
        return ValidationAndOffsetAssignResult.get(records, 0, 0L);
    }

    private static ValidationAndOffsetAssignResult validateMessagesAndAssignOffsetsCompressed(MemoryRecords records, LongRef offsetCounter, long now, CompressionCodec sourceCodec, CompressionCodec targetCodec, boolean compactedTopic, byte toMagic, TimestampType timestampType, long timestampDiffMaxMs) {
        boolean inPlaceAssignment = sourceCodec == targetCodec && toMagic > 0;
        long maxTimestamp = -1L;
        LongRef expectedInnerOffset = new LongRef(0L);
        ArrayList<Record> validatedRecords = new ArrayList<Record>();
        for (MutableRecordBatch batch : records.batches()) {
            KopLogValidator.validateBatch((RecordBatch)batch, toMagic);
            if (sourceCodec.codec() == CompressionType.NONE.id && batch.isControlBatch()) {
                inPlaceAssignment = true;
            }
            for (Record record : batch) {
                KopLogValidator.validateRecord((RecordBatch)batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic);
                if (sourceCodec.codec() != CompressionType.NONE.id && record.isCompressed()) {
                    throw new InvalidRecordException(String.format("Compressed outer record should not have an inner record with a compression attribute set: %s", record));
                }
                if (batch.magic() > 0 && toMagic > 0) {
                    if (record.offset() != expectedInnerOffset.getAndIncrement()) {
                        inPlaceAssignment = false;
                    }
                    if (record.timestamp() > maxTimestamp) {
                        maxTimestamp = record.timestamp();
                    }
                }
                if (!record.hasMagic(toMagic)) {
                    inPlaceAssignment = false;
                }
                validatedRecords.add(record);
            }
        }
        return KopLogValidator.buildIfPlaceAssignment(inPlaceAssignment, records, validatedRecords, offsetCounter, now, toMagic, timestampType, maxTimestamp, targetCodec);
    }

    private static ValidationAndOffsetAssignResult buildIfPlaceAssignment(boolean inPlaceAssignment, MemoryRecords records, ArrayList<Record> validatedRecords, LongRef offsetCounter, long now, byte toMagic, TimestampType timestampType, long maxTimestamp, CompressionCodec targetCodec) {
        if (inPlaceAssignment) {
            return KopLogValidator.buildInPlaceAssignment(records, validatedRecords, offsetCounter, now, toMagic, timestampType, maxTimestamp);
        }
        return KopLogValidator.buildNoInPlaceAssignment(records, validatedRecords, offsetCounter, now, targetCodec, toMagic, timestampType);
    }

    private static ValidationAndOffsetAssignResult buildNoInPlaceAssignment(MemoryRecords records, ArrayList<Record> validatedRecords, LongRef offsetCounter, long now, CompressionCodec targetCodec, byte toMagic, TimestampType timestampType) {
        MutableRecordBatch first = (MutableRecordBatch)records.batches().iterator().next();
        return KopLogValidator.buildRecordsAndAssignOffsets(toMagic, offsetCounter, timestampType, CompressionType.forId((int)targetCodec.codec()), now, validatedRecords, first);
    }

    private static ValidationAndOffsetAssignResult buildInPlaceAssignment(MemoryRecords records, ArrayList<Record> validatedRecords, LongRef offsetCounter, long now, byte toMagic, TimestampType timestampType, long maxTimestamp) {
        long currentMaxTimestamp = maxTimestamp;
        MutableRecordBatch batch = (MutableRecordBatch)records.batches().iterator().next();
        long lastOffset = offsetCounter.addAndGet(validatedRecords.size()) - 1L;
        batch.setLastOffset(lastOffset);
        if (timestampType == TimestampType.LOG_APPEND_TIME) {
            currentMaxTimestamp = now;
        }
        if (toMagic >= 1) {
            batch.setMaxTimestamp(timestampType, currentMaxTimestamp);
        }
        if (toMagic >= 2) {
            batch.setPartitionLeaderEpoch(-1);
        }
        return ValidationAndOffsetAssignResult.get(records, 0, 0L);
    }

    private static ValidationAndOffsetAssignResult buildRecordsAndAssignOffsets(byte magic, LongRef offsetCounter, TimestampType timestampType, CompressionType compressionType, long logAppendTime, ArrayList<Record> validatedRecords, MutableRecordBatch first) {
        long startConversionNanos = MathUtils.nowInNano();
        long producerId = first.producerId();
        short producerEpoch = first.producerEpoch();
        int baseSequence = first.baseSequence();
        boolean isTransactional = first.isTransactional();
        int estimatedSize = AbstractRecords.estimateSizeInBytes((byte)magic, (long)offsetCounter.value(), (CompressionType)compressionType, validatedRecords);
        ByteBuffer buffer = ByteBuffer.allocate(estimatedSize);
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)magic, (CompressionType)compressionType, (TimestampType)timestampType, (long)offsetCounter.value(), (long)logAppendTime, (long)producerId, (short)producerEpoch, (int)baseSequence, (boolean)isTransactional, (int)-1);
        validatedRecords.forEach(record -> builder.appendWithOffset(offsetCounter.getAndIncrement(), record));
        MemoryRecords memoryRecords = builder.build();
        int conversionCount = builder.numRecords();
        return ValidationAndOffsetAssignResult.get(memoryRecords, conversionCount, MathUtils.elapsedNanos((long)startConversionNanos));
    }

    private static void validateBatch(RecordBatch batch, byte toMagic) {
        if (batch.magic() >= 2) {
            long countFromOffsets = batch.lastOffset() - batch.baseOffset() + 1L;
            if (countFromOffsets <= 0L) {
                String exceptionMsg = String.format("Batch has an invalid offset range: [%d, %d]", batch.baseOffset(), batch.lastOffset());
                throw new InvalidRecordException(exceptionMsg);
            }
            int count = batch.countOrNull();
            if (count <= 0) {
                throw new InvalidRecordException(String.format("Invalid reported count for record batch: %d", count));
            }
            if (countFromOffsets != (long)batch.countOrNull().intValue()) {
                String exceptionMsg = String.format("Inconsistent batch offset range [%d, %d] and count of records %d", batch.baseOffset(), batch.lastOffset(), count);
                throw new InvalidRecordException(exceptionMsg);
            }
        }
        if (batch.hasProducerId() && batch.baseSequence() < 0) {
            String exceptionMsg = String.format("Invalid sequence number %d in record batch with producerId %d", batch.baseSequence(), batch.producerId());
            throw new InvalidRecordException(exceptionMsg);
        }
        if (batch.isControlBatch()) {
            throw new InvalidRecordException("Clients are not allowed to write control records");
        }
        KopLogValidator.checkUnsupportedForMessageFormat(batch, toMagic);
    }

    private static void checkUnsupportedForMessageFormat(RecordBatch batch, byte toMagic) {
        if (batch.isTransactional() && toMagic < 2) {
            throw new UnsupportedForMessageFormatException(String.format("Transactional records cannot be used with magic version %s", toMagic));
        }
        if (batch.hasProducerId() && toMagic < 2) {
            throw new UnsupportedForMessageFormatException(String.format("Idempotent records cannot be used with magic version %s", toMagic));
        }
    }

    private static void validateRecord(RecordBatch batch, Record record, long now, TimestampType timestampType, long timestampDiffMaxMs, boolean compactedTopic) {
        if (!record.hasMagic(batch.magic())) {
            throw new InvalidRecordException(String.format("Log record magic does not match outer magic %s", batch.magic()));
        }
        if (batch.magic() <= 1 && batch.isCompressed()) {
            record.ensureValid();
        }
        KopLogValidator.validateKey(record, compactedTopic);
        KopLogValidator.validateTimestamp(batch, record, now, timestampType, timestampDiffMaxMs);
    }

    private static void validateKey(Record record, boolean compactedTopic) {
        if (compactedTopic && !record.hasKey()) {
            throw new InvalidRecordException("Compacted topic cannot accept message without key.");
        }
    }

    private static void validateTimestamp(RecordBatch batch, Record record, long now, TimestampType timestampType, long timestampDiffMaxMs) {
        if (timestampType == TimestampType.CREATE_TIME && record.timestamp() != -1L && Math.abs(record.timestamp() - now) > timestampDiffMaxMs) {
            String exceptionMsg = String.format("Timestamp %d of message with offset %d is out of range. The timestamp should be within [%d, %d]", record.timestamp(), record.offset(), now - timestampDiffMaxMs, now + timestampDiffMaxMs);
            throw new InvalidTimestampException(exceptionMsg);
        }
        if (batch.timestampType() == TimestampType.LOG_APPEND_TIME) {
            String exceptionMsg = String.format("Invalid timestamp type in message %s. Producer should not set timestamp type to LogAppendTime.", record);
            throw new InvalidTimestampException(exceptionMsg);
        }
    }

    @VisibleForTesting
    public static CompressionCodec getSourceCodec(MemoryRecords records) {
        CompressionCodec sourceCodec = new CompressionCodec(CompressionType.NONE.name, CompressionType.NONE.id);
        for (RecordBatch batch : records.batches()) {
            CompressionType compressionType = CompressionType.forId((int)batch.compressionType().id);
            CompressionCodec messageCodec = new CompressionCodec(compressionType.name, compressionType.id);
            if (messageCodec.codec() == CompressionType.NONE.id) continue;
            sourceCodec = messageCodec;
        }
        return sourceCodec;
    }

    @VisibleForTesting
    public static CompressionCodec getTargetCodec(CompressionCodec sourceCodec, String brokerCompressionType) {
        String lowerCaseBrokerCompressionType = brokerCompressionType.toLowerCase(Locale.ROOT);
        if (lowerCaseBrokerCompressionType.equals(CompressionType.NONE.name)) {
            return sourceCodec;
        }
        CompressionType compressionType = CompressionType.forName((String)lowerCaseBrokerCompressionType);
        return new CompressionCodec(compressionType.name, compressionType.id);
    }

    public static class CompressionCodec {
        private final String name;
        private final int codec;

        public CompressionCodec(String name, int codec) {
            this.name = name;
            this.codec = codec;
        }

        public String name() {
            return this.name;
        }

        public int codec() {
            return this.codec;
        }
    }
}

