/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.pulsar.handlers.kop;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.streamnative.pulsar.handlers.kop.KafkaRequestHandler;
import io.streamnative.pulsar.handlers.kop.utils.MessageMetadataUtils;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaTopicConsumerManager
implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(KafkaTopicConsumerManager.class);
    private final PersistentTopic topic;
    private final KafkaRequestHandler requestHandler;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final Map<Long, CompletableFuture<Pair<ManagedCursor, Long>>> cursors;
    private final Map<String, ManagedCursor> createdCursors;
    private final Map<Long, Long> lastAccessTimes;
    private final boolean skipMessagesWithoutIndex;

    KafkaTopicConsumerManager(KafkaRequestHandler requestHandler, PersistentTopic topic) {
        this.topic = topic;
        this.cursors = new ConcurrentHashMap<Long, CompletableFuture<Pair<ManagedCursor, Long>>>();
        this.createdCursors = new ConcurrentHashMap<String, ManagedCursor>();
        this.lastAccessTimes = new ConcurrentHashMap<Long, Long>();
        this.requestHandler = requestHandler;
        this.skipMessagesWithoutIndex = requestHandler.isSkipMessagesWithoutIndex();
    }

    void deleteExpiredCursor(long current, long expirePeriodMillis) {
        this.lastAccessTimes.forEach((offset, record) -> {
            if (current - record - expirePeriodMillis > 0L) {
                this.deleteOneExpiredCursor((long)offset);
            }
        });
    }

    void deleteOneExpiredCursor(long offset) {
        if (this.closed.get()) {
            return;
        }
        CompletableFuture<Pair<ManagedCursor, Long>> cursorFuture = this.cursors.remove(offset);
        this.lastAccessTimes.remove(offset);
        if (cursorFuture != null) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Cursor timed out for offset: {}, cursors cache size: {}", new Object[]{this.requestHandler.ctx.channel(), offset, this.cursors.size()});
            }
            cursorFuture.whenComplete((pair, e) -> {
                if (e != null || pair == null) {
                    return;
                }
                ManagedCursor managedCursor = (ManagedCursor)pair.getKey();
                this.deleteOneCursorAsync(managedCursor, "cursor expired");
            });
        }
    }

    public void deleteOneCursorAsync(final ManagedCursor cursor, final String reason) {
        if (cursor != null) {
            this.topic.getManagedLedger().asyncDeleteCursor(cursor.getName(), new AsyncCallbacks.DeleteCursorCallback(){

                public void deleteCursorComplete(Object ctx) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Cursor {} for topic {} deleted successfully for reason: {}.", new Object[]{KafkaTopicConsumerManager.this.requestHandler.ctx.channel(), cursor.getName(), KafkaTopicConsumerManager.this.topic.getName(), reason});
                    }
                }

                public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
                    log.warn("[{}] Error deleting cursor {} for topic {} for reason: {}.", new Object[]{KafkaTopicConsumerManager.this.requestHandler.ctx.channel(), cursor.getName(), KafkaTopicConsumerManager.this.topic.getName(), reason, exception});
                }
            }, null);
            this.createdCursors.remove(cursor.getName());
        }
    }

    public CompletableFuture<Pair<ManagedCursor, Long>> removeCursorFuture(long offset) {
        if (this.closed.get()) {
            return null;
        }
        this.lastAccessTimes.remove(offset);
        CompletableFuture<Pair<ManagedCursor, Long>> cursorFuture = this.cursors.remove(offset);
        if (cursorFuture == null) {
            return this.asyncCreateCursorIfNotExists(offset);
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] Get cursor for offset: {} in cache. cache size: {}", new Object[]{this.requestHandler.ctx.channel(), offset, this.cursors.size()});
        }
        return cursorFuture;
    }

    private CompletableFuture<Pair<ManagedCursor, Long>> asyncCreateCursorIfNotExists(long offset) {
        if (this.closed.get()) {
            return null;
        }
        this.cursors.putIfAbsent(offset, this.asyncGetCursorByOffset(offset));
        this.lastAccessTimes.remove(offset);
        return this.cursors.remove(offset);
    }

    public void add(long offset, Pair<ManagedCursor, Long> pair) {
        Preconditions.checkArgument((offset == (Long)pair.getRight() ? 1 : 0) != 0, (Object)("offset not equal. key: " + offset + " value: " + pair.getRight()));
        if (this.closed.get()) {
            ManagedCursor managedCursor = (ManagedCursor)pair.getLeft();
            this.deleteOneCursorAsync(managedCursor, "A race - add cursor back but tcm already closed");
            return;
        }
        CompletableFuture<Pair<ManagedCursor, Long>> cursorFuture = CompletableFuture.completedFuture(pair);
        if (this.cursors.putIfAbsent(offset, cursorFuture) != null) {
            this.deleteOneCursorAsync((ManagedCursor)pair.getLeft(), "reason: A race - same cursor already cached");
        }
        this.lastAccessTimes.put(offset, System.currentTimeMillis());
        if (log.isDebugEnabled()) {
            log.debug("[{}] Add cursor back {} for offset: {}", new Object[]{this.requestHandler.ctx.channel(), ((ManagedCursor)pair.getLeft()).getName(), offset});
        }
    }

    @Override
    public void close() {
        if (!this.closed.compareAndSet(false, true)) {
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] Close TCM for topic {}.", (Object)this.requestHandler.ctx.channel(), (Object)this.topic.getName());
        }
        ArrayList cursorFuturesToClose = new ArrayList();
        this.cursors.forEach((ignored, cursorFuture) -> cursorFuturesToClose.add(cursorFuture));
        this.cursors.clear();
        this.lastAccessTimes.clear();
        ArrayList cursorsToClose = new ArrayList();
        this.createdCursors.forEach((ignored, cursor) -> cursorsToClose.add(cursor));
        this.createdCursors.clear();
        cursorFuturesToClose.forEach(cursorFuture -> cursorFuture.whenComplete((pair, e) -> {
            if (e != null || pair == null) {
                return;
            }
            ManagedCursor cursor = (ManagedCursor)pair.getLeft();
            this.deleteOneCursorAsync(cursor, "TopicConsumerManager close");
        }));
        cursorFuturesToClose.clear();
        cursorsToClose.forEach(cursor -> this.deleteOneCursorAsync((ManagedCursor)cursor, "TopicConsumerManager close but cursor is still outstanding"));
        cursorsToClose.clear();
    }

    private CompletableFuture<Pair<ManagedCursor, Long>> asyncGetCursorByOffset(long offset) {
        if (this.closed.get()) {
            return CompletableFuture.completedFuture(null);
        }
        ManagedLedger ledger = this.topic.getManagedLedger();
        if (((ManagedLedgerImpl)ledger).getState() == ManagedLedgerImpl.State.Closed) {
            log.error("[{}] Async get cursor for offset {} for topic {} failed, because current managedLedger has been closed", new Object[]{this.requestHandler.ctx.channel(), offset, this.topic.getName()});
            CompletableFuture<Pair<ManagedCursor, Long>> future = new CompletableFuture<Pair<ManagedCursor, Long>>();
            future.completeExceptionally(new Exception("Current managedLedger for " + this.topic.getName() + " has been closed."));
            return future;
        }
        return MessageMetadataUtils.asyncFindPosition(ledger, offset, this.skipMessagesWithoutIndex).thenApply(position -> {
            if (position == null) {
                return null;
            }
            String cursorName = "kop-consumer-cursor-" + this.topic.getName() + "-" + position.getLedgerId() + "-" + position.getEntryId() + "-" + DigestUtils.sha1Hex((String)UUID.randomUUID().toString()).substring(0, 10);
            PositionImpl previous = ((ManagedLedgerImpl)ledger).getPreviousPosition((PositionImpl)position);
            if (log.isDebugEnabled()) {
                log.debug("[{}] Create cursor {} for offset: {}. position: {}, previousPosition: {}", new Object[]{this.requestHandler.ctx.channel(), cursorName, offset, position, previous});
            }
            try {
                ManagedCursor newCursor = ledger.newNonDurableCursor((Position)previous, cursorName);
                this.createdCursors.putIfAbsent(newCursor.getName(), newCursor);
                this.lastAccessTimes.put(offset, System.currentTimeMillis());
                return Pair.of((Object)newCursor, (Object)offset);
            }
            catch (ManagedLedgerException e) {
                log.error("[{}] Error new cursor for topic {} at offset {} - {}. will cause fetch data error.", new Object[]{this.requestHandler.ctx.channel(), this.topic.getName(), offset, previous, e});
                return null;
            }
        });
    }

    public ManagedLedger getManagedLedger() {
        return this.topic.getManagedLedger();
    }

    @VisibleForTesting
    public int getNumCreatedCursors() {
        int numCreatedCursors = 0;
        for (ManagedCursor ignored : this.topic.getManagedLedger().getCursors()) {
            ++numCreatedCursors;
        }
        return numCreatedCursors;
    }

    @VisibleForTesting
    public boolean isClosed() {
        return this.closed.get();
    }

    public CompletableFuture<Position> findPositionForIndex(Long offset) {
        if (this.closed.get()) {
            return FutureUtil.failedFuture((Throwable)new IllegalStateException("This manager for " + this.topic.getName() + " is closed"));
        }
        ManagedLedger ledger = this.topic.getManagedLedger();
        return MessageMetadataUtils.asyncFindPosition(ledger, offset, this.skipMessagesWithoutIndex).thenApply(position -> {
            PositionImpl lastConfirmedEntry = (PositionImpl)ledger.getLastConfirmedEntry();
            log.info("Found position {} for offset {}, lastConfirmedEntry {}", new Object[]{position, offset, lastConfirmedEntry});
            if (position == null) {
                return null;
            }
            if (lastConfirmedEntry != null && Objects.equals(lastConfirmedEntry.getNext(), position)) {
                log.debug("Found position {} for offset {}, LAC {} -> RETURN LATEST", new Object[]{position, offset, lastConfirmedEntry});
                return PositionImpl.LATEST;
            }
            return position;
        });
    }

    public Map<Long, CompletableFuture<Pair<ManagedCursor, Long>>> getCursors() {
        return this.cursors;
    }

    public Map<String, ManagedCursor> getCreatedCursors() {
        return this.createdCursors;
    }

    public Map<Long, Long> getLastAccessTimes() {
        return this.lastAccessTimes;
    }
}

