/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.pulsar.handlers.kop.format;

import com.google.common.collect.ImmutableList;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.streamnative.pulsar.handlers.kop.format.AbstractEntryFormatter;
import io.streamnative.pulsar.handlers.kop.format.DecodeResult;
import io.streamnative.pulsar.handlers.kop.format.EncodeRequest;
import io.streamnative.pulsar.handlers.kop.format.EncodeResult;
import io.streamnative.pulsar.handlers.kop.format.ValidationAndOffsetAssignResult;
import io.streamnative.pulsar.handlers.kop.storage.PartitionLog;
import io.streamnative.pulsar.handlers.kop.utils.KopLogValidator;
import io.streamnative.pulsar.handlers.kop.utils.LongRef;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.TimestampType;
import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaMixedEntryFormatter
extends AbstractEntryFormatter {
    private static final Logger log = LoggerFactory.getLogger(KafkaMixedEntryFormatter.class);

    protected KafkaMixedEntryFormatter(ImmutableList<EntryFilterWithClassLoader> entryfilters) {
        super(entryfilters);
    }

    @Override
    public EncodeResult encode(EncodeRequest encodeRequest) {
        MemoryRecords records = encodeRequest.getRecords();
        PartitionLog.LogAppendInfo appendInfo = encodeRequest.getAppendInfo();
        long baseOffset = encodeRequest.getAppendInfo().firstOffset().orElse(-1L);
        LongRef offset = new LongRef(baseOffset);
        KopLogValidator.CompressionCodec sourceCodec = appendInfo.sourceCodec();
        KopLogValidator.CompressionCodec targetCodec = appendInfo.targetCodec();
        ValidationAndOffsetAssignResult validationAndOffsetAssignResult = KopLogValidator.validateMessagesAndAssignOffsets(records, offset, System.currentTimeMillis(), sourceCodec, targetCodec, false, (byte)2, TimestampType.CREATE_TIME, Long.MAX_VALUE);
        MemoryRecords validRecords = validationAndOffsetAssignResult.getRecords();
        int conversionCount = validationAndOffsetAssignResult.getConversionCount();
        long conversionTimeNanos = validationAndOffsetAssignResult.getConversionTimeNanos();
        int numMessages = appendInfo.numMessages();
        ByteBuf recordsWrapper = Unpooled.wrappedBuffer((ByteBuffer)validRecords.buffer());
        ByteBuf buf = Commands.serializeMetadataAndPayload((Commands.ChecksumType)Commands.ChecksumType.None, (MessageMetadata)KafkaMixedEntryFormatter.getMessageMetadataWithNumberMessages(numMessages), (ByteBuf)recordsWrapper);
        recordsWrapper.release();
        validationAndOffsetAssignResult.recycle();
        return EncodeResult.get(validRecords, buf, numMessages, conversionCount, conversionTimeNanos);
    }

    @Override
    public DecodeResult decode(List<Entry> entries, byte magic) {
        return super.decode(entries, magic);
    }

    private static MessageMetadata getMessageMetadataWithNumberMessages(int numMessages) {
        MessageMetadata metadata = new MessageMetadata();
        metadata.addProperty().setKey("entry.format").setValue(IDENTITY_VALUE);
        metadata.setProducerName("");
        metadata.setSequenceId(0L);
        metadata.setPublishTime(System.currentTimeMillis());
        metadata.setNumMessagesInBatch(numMessages);
        return metadata;
    }
}

