/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.pulsar.handlers.kop.format;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import io.streamnative.pulsar.handlers.kop.format.AbstractEntryFormatter;
import io.streamnative.pulsar.handlers.kop.format.DecodeResult;
import io.streamnative.pulsar.handlers.kop.format.EncodeRequest;
import io.streamnative.pulsar.handlers.kop.format.EncodeResult;
import io.streamnative.pulsar.handlers.kop.utils.PulsarMessageBuilder;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.StreamSupport;
import org.apache.bookkeeper.common.util.MathUtils;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Record;
import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarEntryFormatter
extends AbstractEntryFormatter {
    private static final Logger log = LoggerFactory.getLogger(PulsarEntryFormatter.class);
    private static final int INITIAL_BATCH_BUFFER_SIZE = 1024;
    private static final int MAX_MESSAGE_BATCH_SIZE_BYTES = 131072;

    protected PulsarEntryFormatter(ImmutableList<EntryFilterWithClassLoader> entryfilters) {
        super(entryfilters);
    }

    @Override
    public EncodeResult encode(EncodeRequest encodeRequest) {
        MemoryRecords records = encodeRequest.getRecords();
        int numMessages = encodeRequest.getAppendInfo().numMessages();
        long currentBatchSizeBytes = 0L;
        int numMessagesInBatch = 0;
        long startConversionNanos = MathUtils.nowInNano();
        long sequenceId = -1L;
        ByteBuf batchedMessageMetadataAndPayload = PulsarByteBufAllocator.DEFAULT.buffer(Math.min(1024, 131072));
        ArrayList messages = Lists.newArrayListWithExpectedSize((int)numMessages);
        MessageMetadata msgMetadata = new MessageMetadata();
        records.batches().forEach(recordBatch -> {
            boolean controlBatch = recordBatch.isControlBatch();
            StreamSupport.stream(recordBatch.spliterator(), true).forEachOrdered(record -> {
                MessageImpl<byte[]> message = PulsarEntryFormatter.recordToEntry(record);
                messages.add(message);
                if (recordBatch.isTransactional()) {
                    msgMetadata.setTxnidMostBits(recordBatch.producerId());
                    msgMetadata.setTxnidLeastBits((long)recordBatch.producerEpoch());
                }
                if (controlBatch) {
                    ControlRecordType controlRecordType = ControlRecordType.parse((ByteBuffer)record.key());
                    switch (controlRecordType) {
                        case ABORT: {
                            msgMetadata.setMarkerType(22);
                            break;
                        }
                        case COMMIT: {
                            msgMetadata.setMarkerType(21);
                            break;
                        }
                        default: {
                            msgMetadata.setMarkerType(0);
                        }
                    }
                }
            });
        });
        for (MessageImpl message : messages) {
            if (++numMessagesInBatch == 1) {
                sequenceId = Commands.initBatchMessageMetadata((MessageMetadata)msgMetadata, (MessageMetadata)message.getMessageBuilder());
            }
            currentBatchSizeBytes += (long)message.getDataBuffer().readableBytes();
            if (log.isTraceEnabled()) {
                log.trace("recordsToByteBuf , sequenceId: {}, numMessagesInBatch: {}, currentBatchSizeBytes: {} ", new Object[]{sequenceId, numMessagesInBatch, currentBatchSizeBytes});
            }
            MessageMetadata msgBuilder = message.getMessageBuilder();
            batchedMessageMetadataAndPayload = Commands.serializeSingleMessageInBatchWithPayload((MessageMetadata)msgBuilder, (ByteBuf)message.getDataBuffer(), (ByteBuf)batchedMessageMetadataAndPayload);
        }
        msgMetadata.setNumMessagesInBatch(numMessagesInBatch);
        ByteBuf buf = Commands.serializeMetadataAndPayload((Commands.ChecksumType)Commands.ChecksumType.Crc32c, (MessageMetadata)msgMetadata, (ByteBuf)batchedMessageMetadataAndPayload);
        batchedMessageMetadataAndPayload.release();
        return EncodeResult.get(records, buf, numMessages, numMessagesInBatch, MathUtils.elapsedNanos((long)startConversionNanos));
    }

    @Override
    public DecodeResult decode(List<Entry> entries, byte magic) {
        return super.decode(entries, magic);
    }

    private static MessageImpl<byte[]> recordToEntry(Record record) {
        PulsarMessageBuilder builder = PulsarMessageBuilder.newBuilder();
        if (record.hasKey()) {
            byte[] key = new byte[record.keySize()];
            record.key().get(key);
            builder.keyBytes(key);
            builder.orderingKey(key);
        }
        if (record.hasValue()) {
            byte[] value = new byte[record.valueSize()];
            record.value().get(value);
            builder.value(value);
        } else {
            builder.value(null);
        }
        builder.getMetadataBuilder().setProducerName("");
        if (record.sequence() >= 0) {
            builder.sequenceId(record.sequence());
        } else {
            builder.sequenceId(0L);
        }
        if (record.timestamp() >= 0L) {
            builder.eventTime(record.timestamp());
            builder.getMetadataBuilder().setPublishTime(record.timestamp());
        } else {
            builder.getMetadataBuilder().setPublishTime(System.currentTimeMillis());
        }
        for (Header h : record.headers()) {
            builder.property(h.key(), new String(h.value(), StandardCharsets.UTF_8));
        }
        return (MessageImpl)builder.getMessage();
    }
}

