/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.pulsar.handlers.kop.format;

import com.google.common.collect.ImmutableList;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.streamnative.pulsar.handlers.kop.exceptions.MetadataCorruptedException;
import io.streamnative.pulsar.handlers.kop.format.DecodeResult;
import io.streamnative.pulsar.handlers.kop.format.EntryFormatter;
import io.streamnative.pulsar.handlers.kop.format.EntryFormatterFactory;
import io.streamnative.pulsar.handlers.kop.utils.ByteBufUtils;
import io.streamnative.pulsar.handlers.kop.utils.MessageMetadataUtils;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.bookkeeper.common.util.MathUtils;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.record.ConvertedRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.utils.Time;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
import org.apache.pulsar.broker.service.plugin.FilterContext;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.KeyValue;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractEntryFormatter
implements EntryFormatter {
    private static final Logger log = LoggerFactory.getLogger(AbstractEntryFormatter.class);
    public static final String IDENTITY_KEY = "entry.format";
    public static final String IDENTITY_VALUE = EntryFormatterFactory.EntryFormat.KAFKA.name().toLowerCase();
    private final Time time = Time.SYSTEM;
    private final ImmutableList<EntryFilterWithClassLoader> entryfilters;

    protected AbstractEntryFormatter(ImmutableList<EntryFilterWithClassLoader> entryfilters) {
        this.entryfilters = entryfilters;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public DecodeResult decode(List<Entry> entries, byte magic) {
        int totalSize = 0;
        int conversionCount = 0;
        long conversionTimeNanos = 0L;
        ByteBuf batchedByteBuf = PulsarByteBufAllocator.DEFAULT.directBuffer(totalSize);
        for (Entry entry : entries) {
            try {
                long startOffset = MessageMetadataUtils.peekBaseOffsetFromEntry(entry);
                ByteBuf byteBuf = entry.getDataBuffer();
                MessageMetadata metadata = MessageMetadataUtils.parseMessageMetadata(byteBuf);
                EntryFilter.FilterResult filterResult = this.filterOnlyByMsgMetadata(metadata, entry, (List<EntryFilterWithClassLoader>)this.entryfilters);
                if (filterResult == EntryFilter.FilterResult.REJECT) continue;
                if (AbstractEntryFormatter.isKafkaEntryFormat(metadata)) {
                    byte batchMagic = byteBuf.getByte(byteBuf.readerIndex() + 16);
                    byteBuf.setLong(byteBuf.readerIndex() + 0, startOffset);
                    if (batchMagic > magic) {
                        long startConversionNanos = MathUtils.nowInNano();
                        MemoryRecords memoryRecords = MemoryRecords.readableRecords((ByteBuffer)ByteBufUtils.getNioBuffer(byteBuf));
                        ConvertedRecords convertedRecords = memoryRecords.downConvert(magic, startOffset, this.time);
                        conversionCount += convertedRecords.recordConversionStats().numRecordsConverted();
                        conversionTimeNanos += MathUtils.elapsedNanos((long)startConversionNanos);
                        ByteBuf kafkaBuffer = Unpooled.wrappedBuffer((ByteBuffer)((MemoryRecords)convertedRecords.records()).buffer());
                        totalSize += kafkaBuffer.readableBytes();
                        batchedByteBuf.writeBytes(kafkaBuffer);
                        kafkaBuffer.release();
                        if (!log.isTraceEnabled()) continue;
                        log.trace("[{}:{}] MemoryRecords down converted, start offset {}, entry magic: {}, client magic: {}", new Object[]{entry.getLedgerId(), entry.getEntryId(), startOffset, batchMagic, magic});
                        continue;
                    }
                    ByteBuf buf = byteBuf.slice(byteBuf.readerIndex(), byteBuf.readableBytes());
                    totalSize += buf.readableBytes();
                    batchedByteBuf.writeBytes(buf);
                    continue;
                }
                DecodeResult decodeResult = ByteBufUtils.decodePulsarEntryToKafkaRecords(metadata, byteBuf, startOffset, magic);
                conversionCount += decodeResult.getConversionCount();
                conversionTimeNanos += decodeResult.getConversionTimeNanos();
                ByteBuf kafkaBuffer = decodeResult.getOrCreateByteBuf();
                totalSize += kafkaBuffer.readableBytes();
                batchedByteBuf.writeBytes(kafkaBuffer);
                decodeResult.recycle();
            }
            catch (MetadataCorruptedException | IOException | KafkaException e) {
                log.error("[{}:{}] Failed to decode entry. ", new Object[]{entry.getLedgerId(), entry.getEntryId(), e});
            }
            finally {
                entry.release();
            }
        }
        return DecodeResult.get(MemoryRecords.readableRecords((ByteBuffer)ByteBufUtils.getNioBuffer(batchedByteBuf)), batchedByteBuf, conversionCount, conversionTimeNanos);
    }

    protected static boolean isKafkaEntryFormat(MessageMetadata messageMetadata) {
        List keyValues = messageMetadata.getPropertiesList();
        for (KeyValue keyValue : keyValues) {
            if (!keyValue.hasKey() || !keyValue.getKey().equals(IDENTITY_KEY) || !keyValue.getValue().equals(IDENTITY_VALUE)) continue;
            return true;
        }
        return false;
    }

    protected EntryFilter.FilterResult filterOnlyByMsgMetadata(MessageMetadata msgMetadata, Entry entry, List<EntryFilterWithClassLoader> entryFilters) {
        if (entryFilters == null || entryFilters.isEmpty()) {
            return EntryFilter.FilterResult.ACCEPT;
        }
        FilterContext filterContext = new FilterContext();
        filterContext.setMsgMetadata(msgMetadata);
        EntryFilter.FilterResult result = EntryFilter.FilterResult.ACCEPT;
        for (EntryFilter entryFilter : entryFilters) {
            EntryFilter.FilterResult filterResult = entryFilter.filterEntry(entry, filterContext);
            if (filterResult == null || filterResult == EntryFilter.FilterResult.RESCHEDULE || filterResult == EntryFilter.FilterResult.ACCEPT || filterResult != EntryFilter.FilterResult.REJECT) continue;
            result = EntryFilter.FilterResult.REJECT;
            break;
        }
        return result;
    }
}

