/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.pulsar.handlers.kop.utils;

import com.google.common.base.Predicate;
import io.netty.buffer.ByteBuf;
import io.streamnative.pulsar.handlers.kop.exceptions.MetadataCorruptedException;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MessageMetadataUtils {
    private static final Logger log = LoggerFactory.getLogger(MessageMetadataUtils.class);

    public static long getCurrentOffset(ManagedLedger managedLedger) {
        return ((ManagedLedgerInterceptorImpl)managedLedger.getManagedLedgerInterceptor()).getIndex();
    }

    public static long getHighWatermark(ManagedLedger managedLedger) {
        return MessageMetadataUtils.getCurrentOffset(managedLedger) + 1L;
    }

    public static long getLogEndOffset(ManagedLedger managedLedger) {
        return MessageMetadataUtils.getCurrentOffset(managedLedger) + 1L;
    }

    public static long getPublishTime(ByteBuf byteBuf) throws MetadataCorruptedException {
        int readerIndex = byteBuf.readerIndex();
        MessageMetadata metadata = MessageMetadataUtils.parseMessageMetadata(byteBuf);
        byteBuf.readerIndex(readerIndex);
        if (metadata.hasPublishTime()) {
            return metadata.getPublishTime();
        }
        throw new MetadataCorruptedException("Field 'publish_time' is not set");
    }

    public static CompletableFuture<Long> getOffsetOfPosition(final ManagedLedgerImpl managedLedger, final PositionImpl position, final boolean needCheckMore, final long timestamp, final boolean skipMessagesWithoutIndex) {
        final CompletableFuture<Long> future = new CompletableFuture<Long>();
        managedLedger.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback(){

            public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
                if (exception instanceof ManagedLedgerException.NonRecoverableLedgerException) {
                    future.complete(MessageMetadataUtils.getLogEndOffset((ManagedLedger)managedLedger));
                } else {
                    future.completeExceptionally((Throwable)exception);
                }
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void readEntryComplete(Entry entry, Object ctx) {
                try {
                    if (needCheckMore) {
                        long offset = MessageMetadataUtils.peekOffsetFromEntry(entry);
                        long publishTime = MessageMetadataUtils.getPublishTime(entry.getDataBuffer());
                        if (publishTime >= timestamp) {
                            future.complete(offset);
                        } else {
                            future.complete(offset + 1L);
                        }
                    } else {
                        future.complete(MessageMetadataUtils.peekBaseOffsetFromEntry(entry));
                    }
                }
                catch (MetadataCorruptedException.NoBrokerEntryMetadata e) {
                    if (skipMessagesWithoutIndex) {
                        log.warn("The entry {} doesn't have BrokerEntryMetadata, return 0 as the offset", (Object)position);
                        future.complete(0L);
                    } else {
                        future.completeExceptionally(e);
                    }
                }
                catch (MetadataCorruptedException e) {
                    future.completeExceptionally(e);
                }
                finally {
                    if (entry != null) {
                        entry.release();
                    }
                }
            }
        }, null);
        return future;
    }

    public static long peekOffsetFromEntry(Entry entry) throws MetadataCorruptedException {
        return MessageMetadataUtils.peekOffset(entry.getDataBuffer(), entry.getPosition());
    }

    private static long peekOffset(ByteBuf buf, @Nullable Position position) throws MetadataCorruptedException {
        try {
            BrokerEntryMetadata brokerEntryMetadata = Commands.peekBrokerEntryMetadataIfExist((ByteBuf)buf);
            if (brokerEntryMetadata == null) {
                throw new MetadataCorruptedException.NoBrokerEntryMetadata();
            }
            return brokerEntryMetadata.getIndex();
        }
        catch (IllegalArgumentException | IllegalStateException e) {
            throw new MetadataCorruptedException("Failed to peekOffsetFromEntry for " + position + ": " + e.getMessage());
        }
    }

    public static long peekBaseOffsetFromEntry(Entry entry) throws MetadataCorruptedException {
        return MessageMetadataUtils.peekBaseOffset(entry.getDataBuffer(), entry.getPosition());
    }

    private static long peekBaseOffset(ByteBuf buf, @Nullable Position position) throws MetadataCorruptedException {
        MessageMetadata metadata = Commands.peekMessageMetadata((ByteBuf)buf, null, (long)0L);
        if (metadata == null) {
            throw new MetadataCorruptedException("Failed to peekMessageMetadata for " + position);
        }
        return MessageMetadataUtils.peekBaseOffset(buf, position, metadata.getNumMessagesInBatch());
    }

    private static long peekBaseOffset(ByteBuf buf, @Nullable Position position, int numMessages) throws MetadataCorruptedException {
        return MessageMetadataUtils.peekOffset(buf, position) - (long)(numMessages - 1);
    }

    public static long peekBaseOffset(ByteBuf buf, int numMessages) throws MetadataCorruptedException {
        return MessageMetadataUtils.peekBaseOffset(buf, null, numMessages);
    }

    public static MessageMetadata parseMessageMetadata(ByteBuf buf) throws MetadataCorruptedException {
        try {
            return Commands.parseMessageMetadata((ByteBuf)buf);
        }
        catch (IllegalArgumentException e) {
            throw new MetadataCorruptedException(e.getMessage());
        }
    }

    public static CompletableFuture<Position> asyncFindPosition(ManagedLedger managedLedger, long offset, boolean skipMessagesWithoutIndex) {
        return managedLedger.asyncFindPosition((Predicate)new FindEntryByOffset(managedLedger, offset, skipMessagesWithoutIndex));
    }

    private static class FindEntryByOffset
    implements Predicate<Entry> {
        private final ManagedLedger managedLedger;
        private final long offset;
        private final boolean skipMessagesWithoutIndex;

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public boolean apply(Entry entry) {
            if (entry == null) {
                return false;
            }
            try {
                boolean bl = MessageMetadataUtils.peekOffsetFromEntry(entry) < this.offset;
                return bl;
            }
            catch (MetadataCorruptedException.NoBrokerEntryMetadata ignored) {
                boolean bl = this.skipMessagesWithoutIndex;
                return bl;
            }
            catch (MetadataCorruptedException e) {
                log.error("[{}] Entry {} is corrupted: {}", new Object[]{this.managedLedger.getName(), entry.getPosition(), e.getMessage()});
                boolean bl = false;
                return bl;
            }
            finally {
                entry.release();
            }
        }

        public String toString() {
            return "FindEntryByOffset{ " + this.offset + "}";
        }

        public FindEntryByOffset(ManagedLedger managedLedger, long offset, boolean skipMessagesWithoutIndex) {
            this.managedLedger = managedLedger;
            this.offset = offset;
            this.skipMessagesWithoutIndex = skipMessagesWithoutIndex;
        }
    }
}

