/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.pulsar.handlers.kop.utils;

import io.netty.buffer.ByteBuf;
import io.streamnative.pulsar.handlers.kop.format.DecodeResult;
import io.streamnative.pulsar.handlers.kop.format.DirectBufferOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.List;
import lombok.NonNull;
import org.apache.bookkeeper.common.util.MathUtils;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.EndTransactionMarker;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.pulsar.common.api.proto.KeyValue;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.protocol.Commands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ByteBufUtils {
    private static final Logger log = LoggerFactory.getLogger(ByteBufUtils.class);
    private static final int DEFAULT_BUFFER_SIZE = 0x100000;
    private static final int MAX_RECORDS_BUFFER_SIZE = 0xA00000;

    public static ByteBuffer getKeyByteBuffer(SingleMessageMetadata messageMetadata) {
        if (messageMetadata.hasOrderingKey()) {
            return ByteBuffer.wrap(messageMetadata.getOrderingKey()).asReadOnlyBuffer();
        }
        if (!messageMetadata.hasPartitionKey()) {
            return null;
        }
        String key = messageMetadata.getPartitionKey();
        if (messageMetadata.isPartitionKeyB64Encoded()) {
            return ByteBuffer.wrap(Base64.getDecoder().decode(key)).asReadOnlyBuffer();
        }
        return ByteBuffer.wrap(key.getBytes(StandardCharsets.UTF_8));
    }

    public static ByteBuffer getKeyByteBuffer(MessageMetadata messageMetadata) {
        if (messageMetadata.hasOrderingKey()) {
            return ByteBuffer.wrap(messageMetadata.getOrderingKey()).asReadOnlyBuffer();
        }
        if (!messageMetadata.hasPartitionKey()) {
            return null;
        }
        String key = messageMetadata.getPartitionKey();
        if (messageMetadata.isPartitionKeyB64Encoded()) {
            return ByteBuffer.wrap(Base64.getDecoder().decode(key));
        }
        return ByteBuffer.wrap(key.getBytes(StandardCharsets.UTF_8));
    }

    public static ByteBuffer getNioBuffer(ByteBuf buffer) {
        if (buffer.isDirect()) {
            return buffer.nioBuffer();
        }
        byte[] bytes = new byte[buffer.readableBytes()];
        buffer.getBytes(buffer.readerIndex(), bytes);
        return ByteBuffer.wrap(bytes);
    }

    public static DecodeResult decodePulsarEntryToKafkaRecords(MessageMetadata metadata, ByteBuf payload, long baseOffset, byte magic) throws IOException {
        if (metadata.hasMarkerType()) {
            ControlRecordType controlRecordType = switch (metadata.getMarkerType()) {
                case 21 -> ControlRecordType.COMMIT;
                case 22 -> ControlRecordType.ABORT;
                default -> ControlRecordType.UNKNOWN;
            };
            return DecodeResult.get(MemoryRecords.withEndTransactionMarker((long)baseOffset, (long)metadata.getPublishTime(), (int)0, (long)(metadata.hasTxnidMostBits() ? metadata.getTxnidMostBits() : Long.MAX_VALUE), (short)(metadata.hasTxnidLeastBits() ? (short)metadata.getTxnidLeastBits() : (short)0), (EndTransactionMarker)new EndTransactionMarker(controlRecordType == ControlRecordType.UNKNOWN ? ControlRecordType.ABORT : controlRecordType, 0)));
        }
        long startConversionNanos = MathUtils.nowInNano();
        int uncompressedSize = metadata.getUncompressedSize();
        CompressionCodec codec = CompressionCodecProvider.getCompressionCodec((org.apache.pulsar.common.api.proto.CompressionType)metadata.getCompression());
        ByteBuf uncompressedPayload = codec.decode(payload, uncompressedSize);
        DirectBufferOutputStream directBufferOutputStream = new DirectBufferOutputStream(0x100000);
        MemoryRecordsBuilder builder = new MemoryRecordsBuilder((ByteBufferOutputStream)directBufferOutputStream, magic, CompressionType.NONE, TimestampType.CREATE_TIME, baseOffset, metadata.getPublishTime(), -1L, -1, -1, metadata.hasTxnidMostBits() && metadata.hasTxnidLeastBits(), false, -1, 0xA00000);
        if (metadata.hasTxnidMostBits()) {
            builder.setProducerState(metadata.getTxnidMostBits(), (short)metadata.getTxnidLeastBits(), 0, true);
        }
        int conversionCount = 0;
        if (metadata.hasNumMessagesInBatch()) {
            int numMessages = metadata.getNumMessagesInBatch();
            conversionCount += numMessages;
            for (int i = 0; i < numMessages; ++i) {
                ByteBuffer value;
                SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata();
                ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch((ByteBuf)uncompressedPayload, (SingleMessageMetadata)singleMessageMetadata, (int)i, (int)numMessages);
                long timestamp = metadata.getEventTime() > 0L ? metadata.getEventTime() : metadata.getPublishTime();
                ByteBuffer byteBuffer = value = singleMessageMetadata.isNullValue() ? null : ByteBufUtils.getNioBuffer(singleMessagePayload);
                if (magic >= 2) {
                    Header[] headers = ByteBufUtils.getHeadersFromMetadata(singleMessageMetadata.getPropertiesList());
                    builder.appendWithOffset(baseOffset + (long)i, timestamp, ByteBufUtils.getKeyByteBuffer(singleMessageMetadata), value, headers);
                } else {
                    builder.appendWithOffset(baseOffset + (long)i, timestamp, ByteBufUtils.getKeyByteBuffer(singleMessageMetadata), value);
                }
                singleMessagePayload.release();
            }
        } else {
            ByteBuffer value;
            ++conversionCount;
            long timestamp = metadata.getEventTime() > 0L ? metadata.getEventTime() : metadata.getPublishTime();
            ByteBuffer byteBuffer = value = metadata.isNullValue() ? null : ByteBufUtils.getNioBuffer(uncompressedPayload);
            if (magic >= 2) {
                Header[] headers = ByteBufUtils.getHeadersFromMetadata(metadata.getPropertiesList());
                builder.appendWithOffset(baseOffset, timestamp, ByteBufUtils.getKeyByteBuffer(metadata), value, headers);
            } else {
                builder.appendWithOffset(baseOffset, timestamp, ByteBufUtils.getKeyByteBuffer(metadata), value);
            }
        }
        MemoryRecords records = builder.build();
        uncompressedPayload.release();
        return DecodeResult.get(records, directBufferOutputStream.getByteBuf(), conversionCount, MathUtils.elapsedNanos((long)startConversionNanos));
    }

    @NonNull
    private static Header[] getHeadersFromMetadata(List<KeyValue> properties) {
        return (Header[])properties.stream().map(property -> new RecordHeader(property.getKey(), property.getValue().getBytes(StandardCharsets.UTF_8))).toArray(Header[]::new);
    }
}

