/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.pulsar.handlers.kop;

import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
import io.streamnative.pulsar.handlers.kop.exceptions.MetadataCorruptedException;
import io.streamnative.pulsar.handlers.kop.utils.MessageMetadataUtils;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.MessageDeduplication;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class MessagePublishContext
implements Topic.PublishContext {
    private static final Logger log = LoggerFactory.getLogger(MessagePublishContext.class);
    public static final long DEFAULT_OFFSET = -1L;
    private CompletableFuture<Long> offsetFuture;
    private Topic topic;
    private long startTimeNs;
    private int numberOfMessages;
    private long baseOffset;
    private long sequenceId;
    private long highestSequenceId;
    private String producerName;
    private boolean enableDeduplication;
    private MetadataCorruptedException peekOffsetError;
    private final Recycler.Handle<MessagePublishContext> recyclerHandle;
    private static final Recycler<MessagePublishContext> RECYCLER = new Recycler<MessagePublishContext>(){

        protected MessagePublishContext newObject(Recycler.Handle<MessagePublishContext> handle) {
            return new MessagePublishContext(handle);
        }
    };

    public boolean isMarkerMessage() {
        return !this.enableDeduplication;
    }

    public long getSequenceId() {
        return this.sequenceId;
    }

    public long getHighestSequenceId() {
        return this.highestSequenceId;
    }

    public void setMetadataFromEntryData(ByteBuf entryData) {
        try {
            this.baseOffset = MessageMetadataUtils.peekBaseOffset(entryData, this.numberOfMessages);
        }
        catch (MetadataCorruptedException e) {
            this.peekOffsetError = e;
        }
    }

    public void completed(Exception exception, long ledgerId, long entryId) {
        if (exception != null) {
            if (exception instanceof BrokerServiceException.TopicClosedException || exception instanceof BrokerServiceException.TopicTerminatedException || exception instanceof BrokerServiceException.TopicFencedException) {
                log.warn("[{}] Failed to publish message: {}", (Object)this.topic.getName(), (Object)exception.getMessage());
                this.offsetFuture.completeExceptionally((Throwable)new NotLeaderOrFollowerException());
            } else if (exception instanceof MessageDeduplication.MessageDupUnknownException) {
                log.warn("[{}] Failed to publish message: {}", (Object)this.topic.getName(), (Object)exception.getMessage());
                this.offsetFuture.completeExceptionally(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER.exception());
            } else {
                log.error("[{}] Failed to publish message", (Object)this.topic.getName(), (Object)exception);
                this.offsetFuture.completeExceptionally((Throwable)new KafkaStorageException((Throwable)exception));
            }
        } else {
            if (log.isDebugEnabled()) {
                log.debug("Success write topic: {}, producerName {} ledgerId: {}, entryId: {} And triggered send callback.", new Object[]{this.topic.getName(), this.producerName, ledgerId, entryId});
            }
            this.topic.recordAddLatency(System.nanoTime() - this.startTimeNs, TimeUnit.NANOSECONDS);
            if (ledgerId == -1L && entryId == -1L) {
                if (log.isDebugEnabled()) {
                    log.debug("Failed to write topic: {}, producerName {}, ledgerId: {}, entryId: {} with duplicated message.", new Object[]{this.topic.getName(), this.producerName, ledgerId, entryId});
                }
                this.offsetFuture.completeExceptionally(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER.exception());
                return;
            }
            if (this.baseOffset == -1L) {
                log.error("[{}] Failed to get offset for ({}, {}): {}", new Object[]{this.topic, ledgerId, entryId, this.peekOffsetError.getMessage()});
            }
            this.offsetFuture.complete(this.baseOffset);
        }
        this.recycle();
    }

    public static MessagePublishContext get(CompletableFuture<Long> offsetFuture, Topic topic, String producerName, boolean enableDeduplication, long sequenceId, long highestSequenceId, int numberOfMessages, long startTimeNs) {
        MessagePublishContext callback = (MessagePublishContext)RECYCLER.get();
        callback.offsetFuture = offsetFuture;
        callback.topic = topic;
        callback.producerName = producerName;
        callback.numberOfMessages = numberOfMessages;
        callback.startTimeNs = startTimeNs;
        callback.baseOffset = -1L;
        callback.sequenceId = sequenceId;
        callback.highestSequenceId = highestSequenceId;
        callback.peekOffsetError = null;
        callback.enableDeduplication = enableDeduplication;
        return callback;
    }

    private MessagePublishContext(Recycler.Handle<MessagePublishContext> recyclerHandle) {
        this.recyclerHandle = recyclerHandle;
    }

    public String getProducerName() {
        return this.producerName;
    }

    public long getNumberOfMessages() {
        return this.numberOfMessages;
    }

    public void recycle() {
        this.offsetFuture = null;
        this.producerName = null;
        this.topic = null;
        this.startTimeNs = -1L;
        this.numberOfMessages = 0;
        this.baseOffset = -1L;
        this.sequenceId = -1L;
        this.highestSequenceId = -1L;
        this.peekOffsetError = null;
        this.enableDeduplication = false;
        this.recyclerHandle.recycle((Object)this);
    }
}

