/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.pulsar.handlers.kop.utils.delayed;

import com.google.common.base.Preconditions;
import io.streamnative.pulsar.handlers.kop.utils.CoreUtils;
import io.streamnative.pulsar.handlers.kop.utils.ShutdownableThread;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperation;
import io.streamnative.pulsar.handlers.kop.utils.timer.SystemTimer;
import io.streamnative.pulsar.handlers.kop.utils.timer.Timer;
import io.streamnative.pulsar.handlers.kop.utils.timer.TimerTask;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DelayedOperationPurgatory<T extends DelayedOperation> {
    private static final Logger log = LoggerFactory.getLogger(DelayedOperationPurgatory.class);
    private final String purgatoryName;
    private final boolean ownTimer;
    private final Timer timeoutTimer;
    private final int purgeInterval;
    private final boolean reaperEnabled;
    private final boolean timerEnabled;
    private final ConcurrentMap<Object, Watchers> watchersForKey;
    private final ReentrantReadWriteLock removeWatchersLock = new ReentrantReadWriteLock();
    private final AtomicInteger estimatedTotalOperations = new AtomicInteger(0);
    private final ShutdownableThread expirationReaper;

    public static <T extends DelayedOperation> Builder<T> builder() {
        return new Builder();
    }

    public DelayedOperationPurgatory(String purgatoryName, Timer timeoutTimer, boolean ownTimer, int purgeInterval, boolean reaperEnabled, boolean timerEnabled) {
        this.purgatoryName = purgatoryName;
        this.timeoutTimer = timeoutTimer;
        this.ownTimer = ownTimer;
        this.purgeInterval = purgeInterval;
        this.reaperEnabled = reaperEnabled;
        this.timerEnabled = timerEnabled;
        this.watchersForKey = new ConcurrentHashMap<Object, Watchers>();
        this.expirationReaper = new ShutdownableThread(String.format("ExpirationReaper-%s", purgatoryName)){

            @Override
            protected void doWork() {
                DelayedOperationPurgatory.this.advanceClock(200L);
            }
        };
        if (reaperEnabled) {
            this.expirationReaper.start();
        }
    }

    public boolean tryCompleteElseWatch(T operation, List<Object> watchKeys) {
        Preconditions.checkArgument((!watchKeys.isEmpty() ? 1 : 0) != 0, (Object)"The watch key list can't be empty");
        boolean isCompletedByMe = ((DelayedOperation)operation).tryComplete();
        if (isCompletedByMe) {
            return true;
        }
        boolean watchCreated = false;
        for (Object key : watchKeys) {
            if (((DelayedOperation)operation).isCompleted()) {
                return false;
            }
            this.watchForOperation(key, operation);
            if (watchCreated) continue;
            watchCreated = true;
            this.estimatedTotalOperations.incrementAndGet();
        }
        isCompletedByMe = ((DelayedOperation)operation).maybeTryComplete();
        if (isCompletedByMe) {
            return true;
        }
        if (!((DelayedOperation)operation).isCompleted()) {
            if (this.timerEnabled) {
                this.timeoutTimer.add((TimerTask)operation);
            }
            if (((DelayedOperation)operation).isCompleted()) {
                ((TimerTask)operation).cancel();
            }
        }
        return false;
    }

    public int checkAndComplete(Object key) {
        Watchers watchers = CoreUtils.inReadLock(this.removeWatchersLock, () -> (Watchers)this.watchersForKey.get(key));
        if (null == watchers) {
            return 0;
        }
        return watchers.tryCompleteWatched();
    }

    public int watched() {
        return this.allWatchers().stream().mapToInt(Watchers::countWatched).sum();
    }

    public int delayed() {
        return this.timeoutTimer.size();
    }

    public List<T> cancelForKey(Object key) {
        return CoreUtils.inWriteLock(this.removeWatchersLock, () -> {
            Watchers watchers = (Watchers)this.watchersForKey.remove(key);
            if (watchers != null) {
                return watchers.cancel();
            }
            return Collections.emptyList();
        });
    }

    private Collection<Watchers> allWatchers() {
        return CoreUtils.inReadLock(this.removeWatchersLock, () -> this.watchersForKey.values());
    }

    private void watchForOperation(Object key, T operation) {
        CoreUtils.inReadLock(this.removeWatchersLock, () -> {
            this.watchersForKey.computeIfAbsent(key, k -> new Watchers(k)).watch(operation);
            return null;
        });
    }

    private void removeKeyIfEmpty(Object key, Watchers watchers) {
        CoreUtils.inWriteLock(this.removeWatchersLock, () -> {
            if (this.watchersForKey.get(key) != watchers) {
                return null;
            }
            if (watchers != null && watchers.isEmpty()) {
                this.watchersForKey.remove(key);
            }
            return null;
        });
    }

    public void shutdown() {
        if (this.reaperEnabled) {
            try {
                this.expirationReaper.shutdown();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                log.error("Interrupted at shutting down expiration reaper for {}", (Object)this.purgatoryName);
            }
        }
        if (this.ownTimer) {
            this.timeoutTimer.shutdown();
        }
    }

    public void advanceClock(long timeoutMs) {
        this.timeoutTimer.advanceClock(timeoutMs);
        if (this.estimatedTotalOperations.get() - this.delayed() > this.purgeInterval) {
            this.estimatedTotalOperations.getAndSet(this.delayed());
            if (log.isDebugEnabled()) {
                log.debug("{} Begin purging watch lists", (Object)this.purgatoryName);
            }
            int purged = this.allWatchers().stream().mapToInt(Watchers::purgeCompleted).sum();
            if (log.isDebugEnabled()) {
                log.debug("{} Purged {} elements from watch lists.", (Object)this.purgatoryName, (Object)purged);
            }
        }
    }

    public static class Builder<T extends DelayedOperation> {
        private String purgatoryName;
        private Timer timer;
        private int purgeInterval = 1000;
        private boolean reaperEnabled = true;
        private boolean timerEnabled = true;

        private Builder() {
        }

        public Builder<T> purgatoryName(String purgatoryName) {
            this.purgatoryName = purgatoryName;
            return this;
        }

        public Builder<T> timeoutTimer(Timer timer) {
            this.timer = timer;
            return this;
        }

        public Builder<T> purgeInterval(int purgeInterval) {
            this.purgeInterval = purgeInterval;
            return this;
        }

        public Builder<T> reaperEnabled(boolean reaperEnabled) {
            this.reaperEnabled = reaperEnabled;
            return this;
        }

        public Builder<T> timerEnabled(boolean timerEnabled) {
            this.timerEnabled = timerEnabled;
            return this;
        }

        public DelayedOperationPurgatory<T> build() {
            boolean ownTimer;
            if (null == this.timer) {
                ownTimer = true;
                this.timer = SystemTimer.builder().executorName(this.purgatoryName).build();
            } else {
                ownTimer = false;
            }
            return new DelayedOperationPurgatory(this.purgatoryName, this.timer, ownTimer, this.purgeInterval, this.reaperEnabled, this.timerEnabled);
        }
    }

    private class Watchers {
        private final Object key;
        private final ConcurrentLinkedQueue<T> operations = new ConcurrentLinkedQueue();

        Watchers(Object key) {
            this.key = key;
        }

        public int countWatched() {
            return this.operations.size();
        }

        public boolean isEmpty() {
            return this.operations.isEmpty();
        }

        public void watch(T t) {
            this.operations.add(t);
        }

        public int tryCompleteWatched() {
            int completed = 0;
            Iterator iter = this.operations.iterator();
            while (iter.hasNext()) {
                DelayedOperation curr = (DelayedOperation)iter.next();
                if (curr.isCompleted()) {
                    iter.remove();
                    continue;
                }
                if (!curr.maybeTryComplete()) continue;
                iter.remove();
                ++completed;
            }
            if (this.operations.isEmpty()) {
                DelayedOperationPurgatory.this.removeKeyIfEmpty(this.key, this);
            }
            return completed;
        }

        public List<T> cancel() {
            Iterator iter = this.operations.iterator();
            ArrayList<DelayedOperation> cancelled = new ArrayList<DelayedOperation>();
            while (iter.hasNext()) {
                DelayedOperation curr = (DelayedOperation)iter.next();
                curr.cancel();
                iter.remove();
                cancelled.add(curr);
            }
            return cancelled;
        }

        int purgeCompleted() {
            int purged = 0;
            Iterator iter = this.operations.iterator();
            while (iter.hasNext()) {
                DelayedOperation curr = (DelayedOperation)iter.next();
                if (!curr.isCompleted()) continue;
                iter.remove();
                ++purged;
            }
            if (this.operations.isEmpty()) {
                DelayedOperationPurgatory.this.removeKeyIfEmpty(this.key, this);
            }
            return purged;
        }
    }
}

