/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.pulsar.handlers.kop.storage;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.Recycler;
import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration;
import io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager;
import io.streamnative.pulsar.handlers.kop.KafkaTopicManager;
import io.streamnative.pulsar.handlers.kop.MessageFetchContext;
import io.streamnative.pulsar.handlers.kop.MessagePublishContext;
import io.streamnative.pulsar.handlers.kop.PendingTopicFutures;
import io.streamnative.pulsar.handlers.kop.RequestStats;
import io.streamnative.pulsar.handlers.kop.exceptions.MetadataCorruptedException;
import io.streamnative.pulsar.handlers.kop.format.DecodeResult;
import io.streamnative.pulsar.handlers.kop.format.EncodeRequest;
import io.streamnative.pulsar.handlers.kop.format.EncodeResult;
import io.streamnative.pulsar.handlers.kop.format.EntryFormatter;
import io.streamnative.pulsar.handlers.kop.format.EntryFormatterFactory;
import io.streamnative.pulsar.handlers.kop.format.KafkaMixedEntryFormatter;
import io.streamnative.pulsar.handlers.kop.storage.AnalyzeResult;
import io.streamnative.pulsar.handlers.kop.storage.AppendRecordsContext;
import io.streamnative.pulsar.handlers.kop.storage.CompletedTxn;
import io.streamnative.pulsar.handlers.kop.storage.ProducerAppendInfo;
import io.streamnative.pulsar.handlers.kop.storage.ProducerStateManager;
import io.streamnative.pulsar.handlers.kop.utils.KopLogValidator;
import io.streamnative.pulsar.handlers.kop.utils.MessageMetadataUtils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.bookkeeper.common.util.MathUtils;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.NonDurableCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.commons.compress.utils.Lists;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.BaseRecords;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.utils.Time;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
import org.apache.pulsar.common.naming.TopicName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PartitionLog {
    private static final Logger log = LoggerFactory.getLogger(PartitionLog.class);
    private static final String PID_PREFIX = "KOP-PID-PREFIX";
    private static final KopLogValidator.CompressionCodec DEFAULT_COMPRESSION = new KopLogValidator.CompressionCodec(CompressionType.NONE.name, CompressionType.NONE.id);
    private final KafkaServiceConfiguration kafkaConfig;
    private final RequestStats requestStats;
    private final Time time;
    private final TopicPartition topicPartition;
    private final String fullPartitionName;
    private final AtomicReference<CompletableFuture<EntryFormatter>> entryFormatter = new AtomicReference();
    private final ProducerStateManager producerStateManager;
    private final ImmutableMap<String, EntryFilterWithClassLoader> entryfilterMap;
    private final boolean preciseTopicPublishRateLimitingEnable;

    public PartitionLog(KafkaServiceConfiguration kafkaConfig, RequestStats requestStats, Time time, TopicPartition topicPartition, String fullPartitionName, ImmutableMap<String, EntryFilterWithClassLoader> entryfilterMap, ProducerStateManager producerStateManager) {
        this.kafkaConfig = kafkaConfig;
        this.entryfilterMap = entryfilterMap;
        this.requestStats = requestStats;
        this.time = time;
        this.topicPartition = topicPartition;
        this.fullPartitionName = fullPartitionName;
        this.producerStateManager = producerStateManager;
        this.preciseTopicPublishRateLimitingEnable = kafkaConfig.isPreciseTopicPublishRateLimiterEnable();
    }

    private CompletableFuture<EntryFormatter> getEntryFormatter(CompletableFuture<Optional<PersistentTopic>> topicFuture) {
        return this.entryFormatter.accumulateAndGet(null, (current, ___) -> {
            if (current != null) {
                return current;
            }
            return topicFuture.thenCompose(persistentTopic -> {
                if (!persistentTopic.isPresent()) {
                    throw new IllegalStateException("Topic " + this.fullPartitionName + " is not ready");
                }
                TopicName logicalName = TopicName.get((String)((PersistentTopic)persistentTopic.get()).getName());
                TopicName actualName = logicalName.isPartitioned() ? TopicName.getPartitionedTopicName((String)((PersistentTopic)persistentTopic.get()).getName()) : logicalName;
                CompletionStage result = ((PersistentTopic)persistentTopic.get()).getBrokerService().fetchPartitionedTopicMetadataAsync(actualName).thenApply(metadata -> {
                    if (metadata.partitions > 0) {
                        return this.buildEntryFormatter(metadata.properties);
                    }
                    return this.buildEntryFormatter(((PersistentTopic)persistentTopic.get()).getManagedLedger().getProperties());
                });
                ((CompletableFuture)result).exceptionally(ex -> {
                    log.error("Cannot create the EntryFormatter for {}", (Object)this.fullPartitionName, ex);
                    this.entryFormatter.set(null);
                    return null;
                });
                return result;
            });
        });
    }

    private EntryFormatter buildEntryFormatter(Map<String, String> topicProperties) {
        String entryFormat = topicProperties != null ? topicProperties.getOrDefault("kafkaEntryFormat", this.kafkaConfig.getEntryFormat()) : this.kafkaConfig.getEntryFormat();
        if (log.isDebugEnabled()) {
            log.debug("entryFormat for {} is {} (topicProperties {})", new Object[]{this.fullPartitionName, entryFormat, topicProperties});
        }
        return EntryFormatterFactory.create(this.kafkaConfig, this.entryfilterMap, entryFormat);
    }

    public AnalyzeResult analyzeAndValidateProducerState(MemoryRecords records, Optional<Long> firstOffset, AppendOrigin origin) {
        HashMap updatedProducers = Maps.newHashMap();
        ArrayList completedTxns = Lists.newArrayList();
        for (RecordBatch batch : records.batches()) {
            if (!batch.hasProducerId()) continue;
            Optional<CompletedTxn> maybeCompletedTxn = this.updateProducers(batch, updatedProducers, firstOffset, origin);
            maybeCompletedTxn.ifPresent(completedTxns::add);
        }
        return new AnalyzeResult(updatedProducers, completedTxns);
    }

    private Optional<CompletedTxn> updateProducers(RecordBatch batch, Map<Long, ProducerAppendInfo> producers, Optional<Long> firstOffset, AppendOrigin origin) {
        Long producerId = batch.producerId();
        ProducerAppendInfo appendInfo = producers.computeIfAbsent(producerId, pid -> this.producerStateManager.prepareUpdate(producerId, origin));
        return appendInfo.append(batch, firstOffset);
    }

    public Optional<Long> firstUndecidedOffset() {
        return this.producerStateManager.firstUndecidedOffset();
    }

    public List<FetchResponse.AbortedTransaction> getAbortedIndexList(long fetchOffset) {
        return this.producerStateManager.getAbortedIndexList(fetchOffset);
    }

    public CompletableFuture<Long> appendRecords(MemoryRecords records, AppendOrigin origin, AppendRecordsContext appendRecordsContext) {
        CompletableFuture<Long> appendFuture = new CompletableFuture<Long>();
        KafkaTopicManager topicManager = appendRecordsContext.getTopicManager();
        long beforeRecordsProcess = this.time.nanoseconds();
        try {
            LogAppendInfo appendInfo = this.analyzeAndValidateRecords(records);
            if (appendInfo.shallowCount() == 0) {
                appendFuture.complete(appendInfo.firstOffset().orElse(-1L));
                return appendFuture;
            }
            MemoryRecords validRecords = this.trimInvalidBytes(records, appendInfo);
            CompletableFuture<Optional<PersistentTopic>> topicFuture = topicManager.getTopic(this.fullPartitionName);
            if (topicFuture.isCompletedExceptionally()) {
                topicFuture.exceptionally(e -> {
                    appendFuture.completeExceptionally((Throwable)e);
                    return Optional.empty();
                });
                return appendFuture;
            }
            if (topicFuture.isDone() && !topicFuture.getNow(Optional.empty()).isPresent()) {
                appendFuture.completeExceptionally((Throwable)Errors.NOT_LEADER_OR_FOLLOWER.exception());
                return appendFuture;
            }
            CompletableFuture<EntryFormatter> entryFormatterHandle = this.getEntryFormatter(topicFuture);
            Consumer<Optional<PersistentTopic>> persistentTopicConsumer = persistentTopicOpt -> {
                if (!persistentTopicOpt.isPresent()) {
                    appendFuture.completeExceptionally((Throwable)Errors.NOT_LEADER_OR_FOLLOWER.exception());
                    return;
                }
                ManagedLedger managedLedger = ((PersistentTopic)persistentTopicOpt.get()).getManagedLedger();
                entryFormatterHandle.whenComplete((entryFormatter, ee) -> {
                    if (ee != null) {
                        appendFuture.completeExceptionally((Throwable)Errors.NOT_LEADER_OR_FOLLOWER.exception());
                        return;
                    }
                    if (entryFormatter instanceof KafkaMixedEntryFormatter) {
                        long logEndOffset = MessageMetadataUtils.getLogEndOffset(managedLedger);
                        appendInfo.firstOffset(Optional.of(logEndOffset));
                    }
                    EncodeRequest encodeRequest = EncodeRequest.get(validRecords, appendInfo);
                    this.requestStats.getPendingTopicLatencyStats().registerSuccessfulEvent(this.time.nanoseconds() - beforeRecordsProcess, TimeUnit.NANOSECONDS);
                    long beforeEncodingStarts = this.time.nanoseconds();
                    EncodeResult encodeResult = entryFormatter.encode(encodeRequest);
                    encodeRequest.recycle();
                    this.requestStats.getProduceEncodeStats().registerSuccessfulEvent(this.time.nanoseconds() - beforeEncodingStarts, TimeUnit.NANOSECONDS);
                    appendRecordsContext.getStartSendOperationForThrottling().accept(encodeResult.getEncodedByteBuf().readableBytes());
                    this.publishMessages((Optional<PersistentTopic>)persistentTopicOpt, appendFuture, appendInfo, encodeResult, appendRecordsContext);
                });
            };
            appendRecordsContext.getPendingTopicFuturesMap().computeIfAbsent(this.topicPartition, ignored -> new PendingTopicFutures(this.requestStats)).addListener(topicFuture, persistentTopicConsumer, appendFuture::completeExceptionally);
        }
        catch (Exception exception) {
            log.error("Failed to handle produce request for {}", (Object)this.topicPartition, (Object)exception);
            appendFuture.completeExceptionally(exception);
        }
        return appendFuture;
    }

    public Position getLastPosition(KafkaTopicManager topicManager) {
        CompletableFuture<Optional<PersistentTopic>> topicFuture = topicManager.getTopic(this.fullPartitionName);
        if (topicFuture.isCompletedExceptionally()) {
            return PositionImpl.EARLIEST;
        }
        if (topicFuture.isDone() && !topicFuture.getNow(Optional.empty()).isPresent()) {
            return PositionImpl.EARLIEST;
        }
        Optional topicOpt = topicFuture.getNow(Optional.empty());
        if (topicOpt.isPresent()) {
            return this.getLastPosition((PersistentTopic)topicOpt.get());
        }
        return PositionImpl.EARLIEST;
    }

    private Position getLastPosition(PersistentTopic persistentTopic) {
        return persistentTopic.getLastPosition();
    }

    public CompletableFuture<ReadRecordsResult> readRecords(FetchRequest.PartitionData partitionData, boolean readCommitted, AtomicLong limitBytes, int maxReadEntriesNum, MessageFetchContext context) {
        long startPrepareMetadataNanos = MathUtils.nowInNano();
        CompletableFuture<ReadRecordsResult> future = new CompletableFuture<ReadRecordsResult>();
        long offset = partitionData.fetchOffset;
        KafkaTopicManager topicManager = context.getTopicManager();
        topicManager.getTopicConsumerManager(this.fullPartitionName).thenAccept(tcm -> {
            CompletableFuture<Pair<ManagedCursor, Long>> cursorFuture;
            if (tcm == null) {
                this.registerPrepareMetadataFailedEvent(startPrepareMetadataNanos);
                context.getSharedState().getKafkaTopicConsumerManagerCache().removeAndCloseByTopic(this.fullPartitionName);
                if (log.isDebugEnabled()) {
                    log.debug("Fetch for {}: no tcm for topic {} return NOT_LEADER_FOR_PARTITION.", (Object)this.topicPartition, (Object)this.fullPartitionName);
                }
                future.complete(ReadRecordsResult.error(Errors.NOT_LEADER_OR_FOLLOWER));
                return;
            }
            if (this.checkOffsetOutOfRange((KafkaTopicConsumerManager)tcm, offset, this.topicPartition, startPrepareMetadataNanos)) {
                future.complete(ReadRecordsResult.error(Errors.OFFSET_OUT_OF_RANGE));
                return;
            }
            if (log.isDebugEnabled()) {
                log.debug("Fetch for {}: remove tcm to get cursor for fetch offset: {} .", (Object)this.topicPartition, (Object)offset);
            }
            if ((cursorFuture = tcm.removeCursorFuture(offset)) == null) {
                log.warn("KafkaTopicConsumerManager is closed, remove TCM of {}", (Object)this.fullPartitionName);
                this.registerPrepareMetadataFailedEvent(startPrepareMetadataNanos);
                context.getSharedState().getKafkaTopicConsumerManagerCache().removeAndCloseByTopic(this.fullPartitionName);
                future.complete(ReadRecordsResult.error(Errors.NONE));
                return;
            }
            ((CompletableFuture)cursorFuture.thenAccept(cursorLongPair -> {
                if (cursorLongPair == null) {
                    log.warn("KafkaTopicConsumerManager.remove({}) return null for topic {}. Fetch for topic return error.", (Object)offset, (Object)this.topicPartition);
                    this.registerPrepareMetadataFailedEvent(startPrepareMetadataNanos);
                    future.complete(ReadRecordsResult.error(Errors.NOT_LEADER_OR_FOLLOWER));
                    return;
                }
                ManagedCursor cursor = (ManagedCursor)cursorLongPair.getLeft();
                AtomicLong cursorOffset = new AtomicLong((Long)cursorLongPair.getRight());
                this.requestStats.getPrepareMetadataStats().registerSuccessfulEvent(MathUtils.elapsedNanos((long)startPrepareMetadataNanos), TimeUnit.NANOSECONDS);
                long adjustedMaxBytes = Math.min((long)partitionData.maxBytes, limitBytes.get());
                this.readEntries(cursor, this.topicPartition, cursorOffset, maxReadEntriesNum, adjustedMaxBytes, topicManager).whenComplete((entries, throwable) -> {
                    if (throwable != null) {
                        tcm.deleteOneCursorAsync((ManagedCursor)cursorLongPair.getLeft(), "cursor.readEntry fail. deleteCursor");
                        if (throwable instanceof ManagedLedgerException.CursorAlreadyClosedException || throwable instanceof ManagedLedgerException.ManagedLedgerFencedException) {
                            future.complete(ReadRecordsResult.error(Errors.NOT_LEADER_OR_FOLLOWER));
                            return;
                        }
                        log.error("Read entry error on {}", (Object)partitionData, throwable);
                        future.complete(ReadRecordsResult.error(Errors.UNKNOWN_SERVER_ERROR));
                        return;
                    }
                    long readSize = entries.stream().mapToLong(Entry::getLength).sum();
                    limitBytes.addAndGet(-1L * readSize);
                    tcm.add(cursorOffset.get(), (Pair<ManagedCursor, Long>)Pair.of((Object)cursor, (Object)cursorOffset.get()));
                    this.handleEntries(future, (List<Entry>)entries, partitionData, (KafkaTopicConsumerManager)tcm, cursor, readCommitted, context);
                });
            })).exceptionally(ex -> {
                this.registerPrepareMetadataFailedEvent(startPrepareMetadataNanos);
                context.getSharedState().getKafkaTopicConsumerManagerCache().removeAndCloseByTopic(this.fullPartitionName);
                future.complete(ReadRecordsResult.error(Errors.NOT_LEADER_OR_FOLLOWER));
                return null;
            });
        });
        return future;
    }

    private boolean checkOffsetOutOfRange(KafkaTopicConsumerManager tcm, long offset, TopicPartition topicPartition, long startPrepareMetadataNanos) {
        ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)tcm.getManagedLedger();
        long logEndOffset = MessageMetadataUtils.getLogEndOffset((ManagedLedger)managedLedger);
        if (offset > logEndOffset) {
            log.error("Received request for offset {} for partition {}, but we only have entries less than {}.", new Object[]{offset, topicPartition, logEndOffset});
            this.registerPrepareMetadataFailedEvent(startPrepareMetadataNanos);
            return true;
        }
        return false;
    }

    private void registerPrepareMetadataFailedEvent(long startPrepareMetadataNanos) {
        this.requestStats.getPrepareMetadataStats().registerFailedEvent(MathUtils.elapsedNanos((long)startPrepareMetadataNanos), TimeUnit.NANOSECONDS);
    }

    private void handleEntries(CompletableFuture<ReadRecordsResult> future, List<Entry> entries, FetchRequest.PartitionData partitionData, KafkaTopicConsumerManager tcm, ManagedCursor cursor, boolean readCommitted, MessageFetchContext context) {
        List<Entry> committedEntries;
        long highWatermark = MessageMetadataUtils.getHighWatermark(cursor.getManagedLedger());
        long lso = readCommitted ? this.firstUndecidedOffset().orElse(highWatermark) : highWatermark;
        List<Entry> list = committedEntries = readCommitted ? this.getCommittedEntries(entries, lso) : entries;
        if (log.isDebugEnabled()) {
            log.debug("Read {} entries but only {} entries are committed", (Object)entries.size(), (Object)committedEntries.size());
        }
        if (committedEntries.isEmpty()) {
            future.complete(ReadRecordsResult.error(tcm.getManagedLedger().getLastConfirmedEntry(), Errors.NONE));
            return;
        }
        byte magic = PartitionLog.getCompatibleMagic(context.getHeader().apiVersion());
        CompletableFuture<String> groupNameFuture = this.kafkaConfig.isKopEnableGroupLevelConsumerMetrics() ? context.getCurrentConnectedGroupNameAsync() : CompletableFuture.completedFuture(null);
        CompletableFuture<EntryFormatter> entryFormatterHandle = this.getEntryFormatter(context.getTopicManager().getTopic(this.fullPartitionName));
        entryFormatterHandle.whenComplete((entryFormatter, ee) -> {
            if (ee != null) {
                future.complete(ReadRecordsResult.error(Errors.KAFKA_STORAGE_ERROR));
                return;
            }
            ((CompletableFuture)groupNameFuture.whenCompleteAsync((groupName, ex) -> {
                if (ex != null) {
                    log.error("Get groupId failed.", ex);
                    groupName = "";
                }
                long startDecodingEntriesNanos = MathUtils.nowInNano();
                Position lastPosition = this.getLastPositionFromEntries(committedEntries);
                DecodeResult decodeResult = entryFormatter.decode(committedEntries, magic);
                this.requestStats.getFetchDecodeStats().registerSuccessfulEvent(MathUtils.elapsedNanos((long)startDecodingEntriesNanos), TimeUnit.NANOSECONDS);
                decodeResult.updateConsumerStats(this.topicPartition, committedEntries.size(), (String)groupName, this.requestStats);
                List<FetchResponse.AbortedTransaction> abortedTransactions = null;
                if (readCommitted) {
                    abortedTransactions = this.getAbortedIndexList(partitionData.fetchOffset);
                }
                if (log.isDebugEnabled()) {
                    log.debug("Partition {} read entry completed in {} ns", (Object)this.topicPartition, (Object)(MathUtils.nowInNano() - startDecodingEntriesNanos));
                }
                future.complete(ReadRecordsResult.get(decodeResult, abortedTransactions, highWatermark, lso, lastPosition));
            }, (Executor)context.getDecodeExecutor())).exceptionally(ex -> {
                log.error("Partition {} read entry exceptionally. ", (Object)this.topicPartition, ex);
                future.complete(ReadRecordsResult.error(Errors.KAFKA_STORAGE_ERROR));
                return null;
            });
        });
    }

    private static byte getCompatibleMagic(short apiVersion) {
        int magic = apiVersion <= 1 ? 0 : (apiVersion <= 3 ? 1 : 2);
        return (byte)magic;
    }

    private Position getLastPositionFromEntries(List<Entry> entries) {
        if (entries == null || entries.isEmpty()) {
            return PositionImpl.EARLIEST;
        }
        return entries.get(entries.size() - 1).getPosition();
    }

    private List<Entry> getCommittedEntries(List<Entry> entries, long lso) {
        ArrayList<Entry> committedEntries = new ArrayList<Entry>();
        for (Entry entry : entries) {
            try {
                if (lso < MessageMetadataUtils.peekBaseOffsetFromEntry(entry)) break;
                committedEntries.add(entry);
            }
            catch (MetadataCorruptedException e) {
                log.error("[{}:{}] Failed to peek base offset from entry.", (Object)entry.getLedgerId(), (Object)entry.getEntryId());
            }
        }
        for (int i = committedEntries.size(); i < entries.size(); ++i) {
            entries.get(i).release();
        }
        return committedEntries;
    }

    private CompletableFuture<List<Entry>> readEntries(final ManagedCursor cursor, final TopicPartition topicPartition, final AtomicLong cursorOffset, int maxReadEntriesNum, long adjustedMaxBytes, final KafkaTopicManager topicManager) {
        final OpStatsLogger messageReadStats = this.requestStats.getMessageReadStats();
        final long startReadingMessagesNanos = MathUtils.nowInNano();
        final CompletableFuture<List<Entry>> readFuture = new CompletableFuture<List<Entry>>();
        if (adjustedMaxBytes <= 0L) {
            readFuture.complete(Collections.emptyList());
            return readFuture;
        }
        final long originalOffset = cursorOffset.get();
        cursor.asyncReadEntries(maxReadEntriesNum, adjustedMaxBytes, new AsyncCallbacks.ReadEntriesCallback(){

            public void readEntriesComplete(List<Entry> entries, Object ctx) {
                if (!entries.isEmpty()) {
                    Entry lastEntry = entries.get(entries.size() - 1);
                    PositionImpl currentPosition = PositionImpl.get((long)lastEntry.getLedgerId(), (long)lastEntry.getEntryId());
                    try {
                        long lastOffset = MessageMetadataUtils.peekOffsetFromEntry(lastEntry);
                        PartitionLog.commitOffset((NonDurableCursorImpl)cursor, currentPosition);
                        cursorOffset.set(lastOffset + 1L);
                        if (log.isDebugEnabled()) {
                            log.debug("Topic {} success read entry: ledgerId: {}, entryId: {}, size: {}, ConsumerManager original offset: {}, lastEntryPosition: {}, nextOffset: {}", new Object[]{topicPartition, lastEntry.getLedgerId(), lastEntry.getEntryId(), lastEntry.getLength(), originalOffset, currentPosition, cursorOffset.get()});
                        }
                    }
                    catch (MetadataCorruptedException e) {
                        log.error("[{}] Failed to peekOffsetFromEntry from position {}: {}", new Object[]{topicPartition, currentPosition, e.getMessage()});
                        messageReadStats.registerFailedEvent(MathUtils.elapsedNanos((long)startReadingMessagesNanos), TimeUnit.NANOSECONDS);
                        readFuture.completeExceptionally(e);
                        return;
                    }
                }
                messageReadStats.registerSuccessfulEvent(MathUtils.elapsedNanos((long)startReadingMessagesNanos), TimeUnit.NANOSECONDS);
                readFuture.complete(entries);
            }

            public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
                log.error("Error read entry for topic: {}", (Object)PartitionLog.this.fullPartitionName);
                if (exception instanceof ManagedLedgerException.ManagedLedgerFencedException) {
                    topicManager.invalidateCacheForFencedManagerLedgerOnTopic(PartitionLog.this.fullPartitionName);
                }
                messageReadStats.registerFailedEvent(MathUtils.elapsedNanos((long)startReadingMessagesNanos), TimeUnit.NANOSECONDS);
                readFuture.completeExceptionally((Throwable)exception);
            }
        }, null, PositionImpl.LATEST);
        return readFuture;
    }

    private static void commitOffset(NonDurableCursorImpl cursor, final PositionImpl currentPosition) {
        cursor.asyncMarkDelete((Position)currentPosition, new AsyncCallbacks.MarkDeleteCallback(){

            public void markDeleteComplete(Object ctx) {
                if (log.isDebugEnabled()) {
                    log.debug("Mark delete success for position: {}", (Object)currentPosition);
                }
            }

            public void markDeleteFailed(ManagedLedgerException e, Object ctx) {
                log.warn("Mark delete failed for position: {} with error:", (Object)currentPosition, (Object)e);
            }
        }, null);
    }

    private void publishMessages(Optional<PersistentTopic> persistentTopicOpt, CompletableFuture<Long> appendFuture, LogAppendInfo appendInfo, EncodeResult encodeResult, AppendRecordsContext appendRecordsContext) {
        if (!persistentTopicOpt.isPresent()) {
            encodeResult.recycle();
            appendFuture.completeExceptionally((Throwable)Errors.NOT_LEADER_OR_FOLLOWER.exception());
            return;
        }
        PersistentTopic persistentTopic = persistentTopicOpt.get();
        this.checkAndRecordPublishQuota((Topic)persistentTopic, appendInfo.validBytes(), appendInfo.numMessages(), appendRecordsContext);
        if (persistentTopic.isSystemTopic()) {
            encodeResult.recycle();
            log.error("Not support producing message to system topic: {}", (Object)persistentTopic);
            appendFuture.completeExceptionally((Throwable)Errors.INVALID_TOPIC_EXCEPTION.exception());
            return;
        }
        appendRecordsContext.getTopicManager().registerProducerInPersistentTopic(this.fullPartitionName, persistentTopic).ifPresent(producer -> encodeResult.updateProducerStats(this.topicPartition, this.requestStats, (Producer)producer));
        int numMessages = encodeResult.getNumMessages();
        ByteBuf byteBuf = encodeResult.getEncodedByteBuf();
        long beforePublish = this.time.nanoseconds();
        this.publishMessage(persistentTopic, byteBuf, appendInfo).whenComplete((offset, e) -> {
            appendRecordsContext.getCompleteSendOperationForThrottling().accept(byteBuf.readableBytes());
            if (e == null) {
                this.requestStats.getMessagePublishStats().registerSuccessfulEvent(this.time.nanoseconds() - beforePublish, TimeUnit.NANOSECONDS);
                long lastOffset = offset + (long)numMessages - 1L;
                AnalyzeResult analyzeResult = this.analyzeAndValidateProducerState(encodeResult.getRecords(), Optional.of(offset), AppendOrigin.Client);
                analyzeResult.updatedProducers().forEach((pid, producerAppendInfo) -> {
                    if (log.isDebugEnabled()) {
                        log.debug("Append pid: [{}], appendInfo: [{}], lastOffset: [{}]", new Object[]{pid, producerAppendInfo, lastOffset});
                    }
                    this.producerStateManager.update((ProducerAppendInfo)producerAppendInfo);
                });
                analyzeResult.completedTxns().forEach(completedTxn -> {
                    completedTxn.lastOffset(lastOffset - 1L);
                    long lastStableOffset = this.producerStateManager.lastStableOffset((CompletedTxn)completedTxn);
                    this.producerStateManager.updateTxnIndex((CompletedTxn)completedTxn, lastStableOffset);
                    this.producerStateManager.completeTxn((CompletedTxn)completedTxn);
                });
                appendFuture.complete((Long)offset);
            } else {
                log.error("publishMessages for topic partition: {} failed when write.", (Object)this.fullPartitionName, e);
                this.requestStats.getMessagePublishStats().registerFailedEvent(this.time.nanoseconds() - beforePublish, TimeUnit.NANOSECONDS);
                appendFuture.completeExceptionally((Throwable)e);
            }
            encodeResult.recycle();
        });
    }

    private void checkAndRecordPublishQuota(Topic topic, int msgSize, int numMessages, AppendRecordsContext appendRecordsContext) {
        ChannelHandlerContext ctx;
        boolean isPublishRateExceeded;
        if (this.preciseTopicPublishRateLimitingEnable) {
            boolean isPreciseTopicPublishRateExceeded = topic.isTopicPublishRateExceeded(numMessages, msgSize);
            if (isPreciseTopicPublishRateExceeded) {
                topic.disableCnxAutoRead();
                return;
            }
            isPublishRateExceeded = topic.isBrokerPublishRateExceeded();
        } else {
            boolean resourceGroupPublishRateExceeded;
            if (topic.isResourceGroupRateLimitingEnabled() && (resourceGroupPublishRateExceeded = topic.isResourceGroupPublishRateExceeded(numMessages, msgSize))) {
                topic.disableCnxAutoRead();
                return;
            }
            isPublishRateExceeded = topic.isPublishRateExceeded();
        }
        if (isPublishRateExceeded && (ctx = appendRecordsContext.getCtx()) != null && ctx.channel().config().isAutoRead()) {
            ctx.channel().config().setAutoRead(false);
        }
    }

    private CompletableFuture<Long> publishMessage(PersistentTopic persistentTopic, ByteBuf byteBuf, LogAppendInfo appendInfo) {
        CompletableFuture<Long> offsetFuture = new CompletableFuture<Long>();
        String producerName = new StringJoiner("-").add(PID_PREFIX).add(String.valueOf(appendInfo.producerId().orElse(-1L))).add(String.valueOf(appendInfo.producerEpoch())).toString();
        persistentTopic.publishMessage(byteBuf, (Topic.PublishContext)MessagePublishContext.get(offsetFuture, (Topic)persistentTopic, producerName, appendInfo.producerId().isPresent() && !appendInfo.isControlBatch(), appendInfo.firstSequence(), appendInfo.lastSequence(), appendInfo.numMessages(), this.time.nanoseconds()));
        return offsetFuture;
    }

    @VisibleForTesting
    public LogAppendInfo analyzeAndValidateRecords(MemoryRecords records) {
        int numMessages = 0;
        int shallowMessageCount = 0;
        Optional<Long> firstOffset = Optional.empty();
        boolean readFirstMessage = false;
        boolean isTransaction = false;
        boolean isControlBatch = false;
        int validBytesCount = 0;
        int firstSequence = Integer.MAX_VALUE;
        int lastSequence = -1;
        Optional<Long> producerId = Optional.empty();
        short producerEpoch = -1;
        KopLogValidator.CompressionCodec sourceCodec = DEFAULT_COMPRESSION;
        for (RecordBatch batch : records.batches()) {
            int batchSize;
            if (batch.magic() >= 2 && batch.baseOffset() != 0L) {
                throw new InvalidRecordException("The baseOffset of the record batch in the append to " + this.topicPartition + " should be 0, but it is " + batch.baseOffset());
            }
            if (!readFirstMessage) {
                if (batch.magic() >= 2) {
                    firstOffset = Optional.of(batch.baseOffset());
                }
                readFirstMessage = true;
            }
            if ((batchSize = batch.sizeInBytes()) > this.kafkaConfig.getMaxMessageSize()) {
                throw new RecordTooLargeException(String.format("Message batch size is %s in append to partition %s which exceeds the maximum configured size of %s .", batchSize, this.topicPartition, this.kafkaConfig.getMaxMessageSize()));
            }
            batch.ensureValid();
            ++shallowMessageCount;
            validBytesCount += batchSize;
            numMessages = (int)((long)numMessages + (batch.lastOffset() - batch.baseOffset() + 1L));
            isTransaction = batch.isTransactional();
            isControlBatch = batch.isControlBatch();
            if (batch.hasProducerId()) {
                producerId = Optional.of(batch.producerId());
                producerEpoch = batch.producerEpoch();
            }
            if (batch.compressionType().id != CompressionType.NONE.id) {
                CompressionType compressionType = CompressionType.forId((int)batch.compressionType().id);
                sourceCodec = new KopLogValidator.CompressionCodec(compressionType.name, compressionType.id);
            }
            if (firstSequence > batch.baseSequence()) {
                firstSequence = batch.baseSequence();
            }
            if (lastSequence >= batch.lastSequence()) continue;
            lastSequence = batch.lastSequence();
        }
        if (validBytesCount < 0) {
            throw new CorruptRecordException("Cannot append record batch with illegal length " + validBytesCount + " to log for " + this.topicPartition + ". A possible cause is corrupted produce request.");
        }
        KopLogValidator.CompressionCodec targetCodec = KopLogValidator.getTargetCodec(sourceCodec, this.kafkaConfig.getKafkaCompressionType());
        return new LogAppendInfo(firstOffset, producerId, producerEpoch, numMessages, shallowMessageCount, isTransaction, isControlBatch, validBytesCount, firstSequence, lastSequence, sourceCodec, targetCodec);
    }

    private MemoryRecords trimInvalidBytes(MemoryRecords records, LogAppendInfo info) {
        int validBytes = info.validBytes();
        if (validBytes < 0) {
            throw new CorruptRecordException(String.format("Cannot append record batch with illegal length %s to log for %s. A possible cause is a corrupted produce request.", validBytes, this.topicPartition));
        }
        if (validBytes == records.sizeInBytes()) {
            return records;
        }
        ByteBuffer validByteBuffer = records.buffer().duplicate();
        validByteBuffer.limit(validBytes);
        return MemoryRecords.readableRecords((ByteBuffer)validByteBuffer);
    }

    public static enum AppendOrigin {
        Coordinator,
        Client,
        Log;

    }

    public static class LogAppendInfo {
        private Optional<Long> firstOffset;
        private Optional<Long> producerId;
        private short producerEpoch;
        private int numMessages;
        private int shallowCount;
        private boolean isTransaction;
        private boolean isControlBatch;
        private int validBytes;
        private int firstSequence;
        private int lastSequence;
        private KopLogValidator.CompressionCodec sourceCodec;
        private KopLogValidator.CompressionCodec targetCodec;

        public Optional<Long> firstOffset() {
            return this.firstOffset;
        }

        public Optional<Long> producerId() {
            return this.producerId;
        }

        public short producerEpoch() {
            return this.producerEpoch;
        }

        public int numMessages() {
            return this.numMessages;
        }

        public int shallowCount() {
            return this.shallowCount;
        }

        public boolean isTransaction() {
            return this.isTransaction;
        }

        public boolean isControlBatch() {
            return this.isControlBatch;
        }

        public int validBytes() {
            return this.validBytes;
        }

        public int firstSequence() {
            return this.firstSequence;
        }

        public int lastSequence() {
            return this.lastSequence;
        }

        public KopLogValidator.CompressionCodec sourceCodec() {
            return this.sourceCodec;
        }

        public KopLogValidator.CompressionCodec targetCodec() {
            return this.targetCodec;
        }

        public LogAppendInfo firstOffset(Optional<Long> firstOffset) {
            this.firstOffset = firstOffset;
            return this;
        }

        public LogAppendInfo producerId(Optional<Long> producerId) {
            this.producerId = producerId;
            return this;
        }

        public LogAppendInfo producerEpoch(short producerEpoch) {
            this.producerEpoch = producerEpoch;
            return this;
        }

        public LogAppendInfo numMessages(int numMessages) {
            this.numMessages = numMessages;
            return this;
        }

        public LogAppendInfo shallowCount(int shallowCount) {
            this.shallowCount = shallowCount;
            return this;
        }

        public LogAppendInfo isTransaction(boolean isTransaction) {
            this.isTransaction = isTransaction;
            return this;
        }

        public LogAppendInfo isControlBatch(boolean isControlBatch) {
            this.isControlBatch = isControlBatch;
            return this;
        }

        public LogAppendInfo validBytes(int validBytes) {
            this.validBytes = validBytes;
            return this;
        }

        public LogAppendInfo firstSequence(int firstSequence) {
            this.firstSequence = firstSequence;
            return this;
        }

        public LogAppendInfo lastSequence(int lastSequence) {
            this.lastSequence = lastSequence;
            return this;
        }

        public LogAppendInfo sourceCodec(KopLogValidator.CompressionCodec sourceCodec) {
            this.sourceCodec = sourceCodec;
            return this;
        }

        public LogAppendInfo targetCodec(KopLogValidator.CompressionCodec targetCodec) {
            this.targetCodec = targetCodec;
            return this;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof LogAppendInfo)) {
                return false;
            }
            LogAppendInfo other = (LogAppendInfo)o;
            if (!other.canEqual(this)) {
                return false;
            }
            if (this.producerEpoch() != other.producerEpoch()) {
                return false;
            }
            if (this.numMessages() != other.numMessages()) {
                return false;
            }
            if (this.shallowCount() != other.shallowCount()) {
                return false;
            }
            if (this.isTransaction() != other.isTransaction()) {
                return false;
            }
            if (this.isControlBatch() != other.isControlBatch()) {
                return false;
            }
            if (this.validBytes() != other.validBytes()) {
                return false;
            }
            if (this.firstSequence() != other.firstSequence()) {
                return false;
            }
            if (this.lastSequence() != other.lastSequence()) {
                return false;
            }
            Optional<Long> this$firstOffset = this.firstOffset();
            Optional<Long> other$firstOffset = other.firstOffset();
            if (this$firstOffset == null ? other$firstOffset != null : !((Object)this$firstOffset).equals(other$firstOffset)) {
                return false;
            }
            Optional<Long> this$producerId = this.producerId();
            Optional<Long> other$producerId = other.producerId();
            if (this$producerId == null ? other$producerId != null : !((Object)this$producerId).equals(other$producerId)) {
                return false;
            }
            KopLogValidator.CompressionCodec this$sourceCodec = this.sourceCodec();
            KopLogValidator.CompressionCodec other$sourceCodec = other.sourceCodec();
            if (this$sourceCodec == null ? other$sourceCodec != null : !this$sourceCodec.equals(other$sourceCodec)) {
                return false;
            }
            KopLogValidator.CompressionCodec this$targetCodec = this.targetCodec();
            KopLogValidator.CompressionCodec other$targetCodec = other.targetCodec();
            return !(this$targetCodec == null ? other$targetCodec != null : !this$targetCodec.equals(other$targetCodec));
        }

        protected boolean canEqual(Object other) {
            return other instanceof LogAppendInfo;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            result = result * 59 + this.producerEpoch();
            result = result * 59 + this.numMessages();
            result = result * 59 + this.shallowCount();
            result = result * 59 + (this.isTransaction() ? 79 : 97);
            result = result * 59 + (this.isControlBatch() ? 79 : 97);
            result = result * 59 + this.validBytes();
            result = result * 59 + this.firstSequence();
            result = result * 59 + this.lastSequence();
            Optional<Long> $firstOffset = this.firstOffset();
            result = result * 59 + ($firstOffset == null ? 43 : ((Object)$firstOffset).hashCode());
            Optional<Long> $producerId = this.producerId();
            result = result * 59 + ($producerId == null ? 43 : ((Object)$producerId).hashCode());
            KopLogValidator.CompressionCodec $sourceCodec = this.sourceCodec();
            result = result * 59 + ($sourceCodec == null ? 43 : $sourceCodec.hashCode());
            KopLogValidator.CompressionCodec $targetCodec = this.targetCodec();
            result = result * 59 + ($targetCodec == null ? 43 : $targetCodec.hashCode());
            return result;
        }

        public String toString() {
            return "PartitionLog.LogAppendInfo(firstOffset=" + this.firstOffset() + ", producerId=" + this.producerId() + ", producerEpoch=" + this.producerEpoch() + ", numMessages=" + this.numMessages() + ", shallowCount=" + this.shallowCount() + ", isTransaction=" + this.isTransaction() + ", isControlBatch=" + this.isControlBatch() + ", validBytes=" + this.validBytes() + ", firstSequence=" + this.firstSequence() + ", lastSequence=" + this.lastSequence() + ", sourceCodec=" + this.sourceCodec() + ", targetCodec=" + this.targetCodec() + ")";
        }

        public LogAppendInfo(Optional<Long> firstOffset, Optional<Long> producerId, short producerEpoch, int numMessages, int shallowCount, boolean isTransaction, boolean isControlBatch, int validBytes, int firstSequence, int lastSequence, KopLogValidator.CompressionCodec sourceCodec, KopLogValidator.CompressionCodec targetCodec) {
            this.firstOffset = firstOffset;
            this.producerId = producerId;
            this.producerEpoch = producerEpoch;
            this.numMessages = numMessages;
            this.shallowCount = shallowCount;
            this.isTransaction = isTransaction;
            this.isControlBatch = isControlBatch;
            this.validBytes = validBytes;
            this.firstSequence = firstSequence;
            this.lastSequence = lastSequence;
            this.sourceCodec = sourceCodec;
            this.targetCodec = targetCodec;
        }
    }

    public static class ReadRecordsResult {
        private static final Recycler<ReadRecordsResult> RECYCLER = new Recycler<ReadRecordsResult>(){

            protected ReadRecordsResult newObject(Recycler.Handle<ReadRecordsResult> handle) {
                return new ReadRecordsResult(handle);
            }
        };
        private final Recycler.Handle<ReadRecordsResult> recyclerHandle;
        private DecodeResult decodeResult;
        private List<FetchResponse.AbortedTransaction> abortedTransactions;
        private long highWatermark;
        private long lastStableOffset;
        private Position lastPosition;
        private Errors errors;

        private ReadRecordsResult(Recycler.Handle<ReadRecordsResult> recyclerHandle) {
            this.recyclerHandle = recyclerHandle;
        }

        public Errors errors() {
            return this.errors == null ? Errors.NONE : this.errors;
        }

        public static ReadRecordsResult get(DecodeResult decodeResult, List<FetchResponse.AbortedTransaction> abortedTransactions, long highWatermark, long lastStableOffset, Position lastPosition) {
            return ReadRecordsResult.get(decodeResult, abortedTransactions, highWatermark, lastStableOffset, lastPosition, null);
        }

        public static ReadRecordsResult get(DecodeResult decodeResult, List<FetchResponse.AbortedTransaction> abortedTransactions, long highWatermark, long lastStableOffset, Position lastPosition, Errors errors) {
            ReadRecordsResult readRecordsResult = (ReadRecordsResult)RECYCLER.get();
            readRecordsResult.decodeResult = decodeResult;
            readRecordsResult.abortedTransactions = abortedTransactions;
            readRecordsResult.highWatermark = highWatermark;
            readRecordsResult.lastStableOffset = lastStableOffset;
            readRecordsResult.lastPosition = lastPosition;
            readRecordsResult.errors = errors;
            return readRecordsResult;
        }

        public static ReadRecordsResult error(Errors errors) {
            return ReadRecordsResult.error((Position)PositionImpl.EARLIEST, errors);
        }

        public static ReadRecordsResult error(Position position, Errors errors) {
            return ReadRecordsResult.get(null, null, -1L, -1L, position, errors);
        }

        public FetchResponse.PartitionData<Records> toPartitionData() {
            if (this.errors != null) {
                return new FetchResponse.PartitionData(this.errors, -1L, -1L, -1L, null, (BaseRecords)MemoryRecords.EMPTY);
            }
            return new FetchResponse.PartitionData(Errors.NONE, this.highWatermark, this.lastStableOffset, this.highWatermark, this.abortedTransactions, (BaseRecords)this.decodeResult.getRecords());
        }

        public void recycle() {
            this.errors = null;
            this.lastPosition = null;
            this.lastStableOffset = -1L;
            this.highWatermark = -1L;
            this.abortedTransactions = null;
            if (this.decodeResult != null) {
                this.decodeResult.recycle();
                this.decodeResult = null;
            }
        }

        public Recycler.Handle<ReadRecordsResult> recyclerHandle() {
            return this.recyclerHandle;
        }

        public DecodeResult decodeResult() {
            return this.decodeResult;
        }

        public List<FetchResponse.AbortedTransaction> abortedTransactions() {
            return this.abortedTransactions;
        }

        public long highWatermark() {
            return this.highWatermark;
        }

        public long lastStableOffset() {
            return this.lastStableOffset;
        }

        public Position lastPosition() {
            return this.lastPosition;
        }

        public ReadRecordsResult decodeResult(DecodeResult decodeResult) {
            this.decodeResult = decodeResult;
            return this;
        }

        public ReadRecordsResult abortedTransactions(List<FetchResponse.AbortedTransaction> abortedTransactions) {
            this.abortedTransactions = abortedTransactions;
            return this;
        }

        public ReadRecordsResult highWatermark(long highWatermark) {
            this.highWatermark = highWatermark;
            return this;
        }

        public ReadRecordsResult lastStableOffset(long lastStableOffset) {
            this.lastStableOffset = lastStableOffset;
            return this;
        }

        public ReadRecordsResult lastPosition(Position lastPosition) {
            this.lastPosition = lastPosition;
            return this;
        }

        public ReadRecordsResult errors(Errors errors) {
            this.errors = errors;
            return this;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof ReadRecordsResult)) {
                return false;
            }
            ReadRecordsResult other = (ReadRecordsResult)o;
            if (!other.canEqual(this)) {
                return false;
            }
            if (this.highWatermark() != other.highWatermark()) {
                return false;
            }
            if (this.lastStableOffset() != other.lastStableOffset()) {
                return false;
            }
            Recycler.Handle<ReadRecordsResult> this$recyclerHandle = this.recyclerHandle();
            Recycler.Handle<ReadRecordsResult> other$recyclerHandle = other.recyclerHandle();
            if (this$recyclerHandle == null ? other$recyclerHandle != null : !this$recyclerHandle.equals(other$recyclerHandle)) {
                return false;
            }
            DecodeResult this$decodeResult = this.decodeResult();
            DecodeResult other$decodeResult = other.decodeResult();
            if (this$decodeResult == null ? other$decodeResult != null : !this$decodeResult.equals(other$decodeResult)) {
                return false;
            }
            List<FetchResponse.AbortedTransaction> this$abortedTransactions = this.abortedTransactions();
            List<FetchResponse.AbortedTransaction> other$abortedTransactions = other.abortedTransactions();
            if (this$abortedTransactions == null ? other$abortedTransactions != null : !((Object)this$abortedTransactions).equals(other$abortedTransactions)) {
                return false;
            }
            Position this$lastPosition = this.lastPosition();
            Position other$lastPosition = other.lastPosition();
            if (this$lastPosition == null ? other$lastPosition != null : !this$lastPosition.equals(other$lastPosition)) {
                return false;
            }
            Errors this$errors = this.errors();
            Errors other$errors = other.errors();
            return !(this$errors == null ? other$errors != null : !this$errors.equals(other$errors));
        }

        protected boolean canEqual(Object other) {
            return other instanceof ReadRecordsResult;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            long $highWatermark = this.highWatermark();
            result = result * 59 + (int)($highWatermark >>> 32 ^ $highWatermark);
            long $lastStableOffset = this.lastStableOffset();
            result = result * 59 + (int)($lastStableOffset >>> 32 ^ $lastStableOffset);
            Recycler.Handle<ReadRecordsResult> $recyclerHandle = this.recyclerHandle();
            result = result * 59 + ($recyclerHandle == null ? 43 : $recyclerHandle.hashCode());
            DecodeResult $decodeResult = this.decodeResult();
            result = result * 59 + ($decodeResult == null ? 43 : $decodeResult.hashCode());
            List<FetchResponse.AbortedTransaction> $abortedTransactions = this.abortedTransactions();
            result = result * 59 + ($abortedTransactions == null ? 43 : ((Object)$abortedTransactions).hashCode());
            Position $lastPosition = this.lastPosition();
            result = result * 59 + ($lastPosition == null ? 43 : $lastPosition.hashCode());
            Errors $errors = this.errors();
            result = result * 59 + ($errors == null ? 43 : $errors.hashCode());
            return result;
        }

        public String toString() {
            return "PartitionLog.ReadRecordsResult(recyclerHandle=" + this.recyclerHandle() + ", decodeResult=" + this.decodeResult() + ", abortedTransactions=" + this.abortedTransactions() + ", highWatermark=" + this.highWatermark() + ", lastStableOffset=" + this.lastStableOffset() + ", lastPosition=" + this.lastPosition() + ", errors=" + this.errors() + ")";
        }

        public ReadRecordsResult(Recycler.Handle<ReadRecordsResult> recyclerHandle, DecodeResult decodeResult, List<FetchResponse.AbortedTransaction> abortedTransactions, long highWatermark, long lastStableOffset, Position lastPosition, Errors errors) {
            this.recyclerHandle = recyclerHandle;
            this.decodeResult = decodeResult;
            this.abortedTransactions = abortedTransactions;
            this.highWatermark = highWatermark;
            this.lastStableOffset = lastStableOffset;
            this.lastPosition = lastPosition;
            this.errors = errors;
        }
    }
}

