/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.kinesis.leases.dynamodb;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.LeaseTaker;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil;

@KinesisClientInternalApi
public class DynamoDBLeaseTaker
implements LeaseTaker {
    private static final Logger log = LoggerFactory.getLogger(DynamoDBLeaseTaker.class);
    private static final int TAKE_RETRIES = 3;
    private static final int SCAN_RETRIES = 1;
    private long veryOldLeaseDurationNanosMultiplier = 3L;
    private static final Callable<Long> SYSTEM_CLOCK_CALLABLE = System::nanoTime;
    private static final String TAKE_LEASES_DIMENSION = "TakeLeases";
    private final LeaseRefresher leaseRefresher;
    private final String workerIdentifier;
    private final long leaseDurationNanos;
    private final MetricsFactory metricsFactory;
    private final Map<String, Lease> allLeases = new HashMap<String, Lease>();
    private int maxLeasesForWorker = Integer.MAX_VALUE;
    private int maxLeasesToStealAtOneTime = 1;
    private long lastScanTimeNanos = 0L;

    public DynamoDBLeaseTaker(LeaseRefresher leaseRefresher, String workerIdentifier, long leaseDurationMillis, MetricsFactory metricsFactory) {
        this.leaseRefresher = leaseRefresher;
        this.workerIdentifier = workerIdentifier;
        this.leaseDurationNanos = TimeUnit.MILLISECONDS.toNanos(leaseDurationMillis);
        this.metricsFactory = metricsFactory;
    }

    public DynamoDBLeaseTaker withMaxLeasesForWorker(int maxLeasesForWorker) {
        if (maxLeasesForWorker <= 0) {
            throw new IllegalArgumentException("maxLeasesForWorker should be >= 1");
        }
        this.maxLeasesForWorker = maxLeasesForWorker;
        return this;
    }

    public DynamoDBLeaseTaker withVeryOldLeaseDurationNanosMultipler(long veryOldLeaseDurationNanosMultipler) {
        this.veryOldLeaseDurationNanosMultiplier = veryOldLeaseDurationNanosMultipler;
        return this;
    }

    public DynamoDBLeaseTaker withMaxLeasesToStealAtOneTime(int maxLeasesToStealAtOneTime) {
        if (maxLeasesToStealAtOneTime <= 0) {
            throw new IllegalArgumentException("maxLeasesToStealAtOneTime should be >= 1");
        }
        this.maxLeasesToStealAtOneTime = maxLeasesToStealAtOneTime;
        return this;
    }

    @Override
    public Map<String, Lease> takeLeases() throws DependencyException, InvalidStateException {
        return this.takeLeases(SYSTEM_CLOCK_CALLABLE);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    synchronized Map<String, Lease> takeLeases(Callable<Long> timeProvider) throws DependencyException, InvalidStateException {
        HashMap<String, Lease> takenLeases = new HashMap<String, Lease>();
        MetricsScope scope = MetricsUtil.createMetricsWithOperation(this.metricsFactory, TAKE_LEASES_DIMENSION);
        long startTime = System.currentTimeMillis();
        boolean success = false;
        ProvisionedThroughputException lastException = null;
        try {
            try {
                for (int i = 1; i <= 1; ++i) {
                    try {
                        this.updateAllLeases(timeProvider);
                        success = true;
                        continue;
                    }
                    catch (ProvisionedThroughputException e) {
                        log.info("Worker {} could not find expired leases on try {} out of {}", new Object[]{this.workerIdentifier, i, 3});
                        lastException = e;
                    }
                }
            }
            finally {
                MetricsUtil.addWorkerIdentifier(scope, this.workerIdentifier);
                MetricsUtil.addSuccessAndLatency(scope, "ListLeases", success, startTime, MetricsLevel.DETAILED);
            }
            if (lastException != null) {
                log.error("Worker {} could not scan leases table, aborting TAKE_LEASES_DIMENSION. Exception caught by last retry:", (Object)this.workerIdentifier, lastException);
                HashMap<String, Lease> i = takenLeases;
                return i;
            }
            List<Lease> expiredLeases = this.getExpiredLeases();
            Set<Lease> leasesToTake = this.computeLeasesToTake(expiredLeases);
            HashSet<String> untakenLeaseKeys = new HashSet<String>();
            block15: for (Lease lease : leasesToTake) {
                String leaseKey = lease.leaseKey();
                startTime = System.currentTimeMillis();
                success = false;
                try {
                    for (int i = 1; i <= 3; ++i) {
                        try {
                            if (this.leaseRefresher.takeLease(lease, this.workerIdentifier)) {
                                lease.lastCounterIncrementNanos(System.nanoTime());
                                takenLeases.put(leaseKey, lease);
                            } else {
                                untakenLeaseKeys.add(leaseKey);
                            }
                            success = true;
                            continue block15;
                        }
                        catch (ProvisionedThroughputException e) {
                            log.info("Could not take lease with key {} for worker {} on try {} out of {} due to capacity", new Object[]{leaseKey, this.workerIdentifier, i, 3});
                            continue;
                        }
                    }
                }
                finally {
                    MetricsUtil.addSuccessAndLatency(scope, "TakeLease", success, startTime, MetricsLevel.DETAILED);
                }
            }
            if (takenLeases.size() > 0) {
                log.info("Worker {} successfully took {} leases: {}", new Object[]{this.workerIdentifier, takenLeases.size(), DynamoDBLeaseTaker.stringJoin(takenLeases.keySet(), ", ")});
            }
            if (untakenLeaseKeys.size() > 0) {
                log.info("Worker {} failed to take {} leases: {}", new Object[]{this.workerIdentifier, untakenLeaseKeys.size(), DynamoDBLeaseTaker.stringJoin(untakenLeaseKeys, ", ")});
            }
            scope.addData("TakenLeases", takenLeases.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY);
        }
        finally {
            MetricsUtil.endScope(scope);
        }
        return takenLeases;
    }

    static String stringJoin(Collection<String> strings, String delimiter) {
        StringBuilder builder = new StringBuilder();
        boolean needDelimiter = false;
        for (String string : strings) {
            if (needDelimiter) {
                builder.append(delimiter);
            }
            builder.append(string);
            needDelimiter = true;
        }
        return builder.toString();
    }

    private void updateAllLeases(Callable<Long> timeProvider) throws DependencyException, InvalidStateException, ProvisionedThroughputException {
        List<Lease> freshList = this.leaseRefresher.listLeases();
        try {
            this.lastScanTimeNanos = timeProvider.call();
        }
        catch (Exception e) {
            throw new DependencyException("Exception caught from timeProvider", e);
        }
        HashSet<String> notUpdated = new HashSet<String>(this.allLeases.keySet());
        for (Lease lease : freshList) {
            String leaseKey = lease.leaseKey();
            Lease oldLease = this.allLeases.get(leaseKey);
            this.allLeases.put(leaseKey, lease);
            notUpdated.remove(leaseKey);
            if (oldLease != null) {
                if (oldLease.leaseCounter().equals(lease.leaseCounter())) {
                    lease.lastCounterIncrementNanos(oldLease.lastCounterIncrementNanos());
                    continue;
                }
                lease.lastCounterIncrementNanos(this.lastScanTimeNanos);
                continue;
            }
            if (lease.leaseOwner() == null) {
                lease.lastCounterIncrementNanos(0L);
                if (!log.isDebugEnabled()) continue;
                log.debug("Treating new lease with key {} as never renewed because it is new and unowned.", (Object)leaseKey);
                continue;
            }
            lease.lastCounterIncrementNanos(this.lastScanTimeNanos);
            if (!log.isDebugEnabled()) continue;
            log.debug("Treating new lease with key {} as recently renewed because it is new and owned.", (Object)leaseKey);
        }
        for (String key : notUpdated) {
            this.allLeases.remove(key);
        }
    }

    private List<Lease> getExpiredLeases() {
        ArrayList<Lease> expiredLeases = new ArrayList<Lease>();
        for (Lease lease : this.allLeases.values()) {
            if (!lease.isExpired(this.leaseDurationNanos, this.lastScanTimeNanos)) continue;
            expiredLeases.add(lease);
        }
        return expiredLeases;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Set<Lease> computeLeasesToTake(List<Lease> expiredLeases) {
        Map<String, Integer> leaseCounts = this.computeLeaseCounts(expiredLeases);
        HashSet<Lease> leasesToTake = new HashSet<Lease>();
        MetricsScope scope = MetricsUtil.createMetricsWithOperation(this.metricsFactory, TAKE_LEASES_DIMENSION);
        MetricsUtil.addWorkerIdentifier(scope, this.workerIdentifier);
        List<Object> veryOldLeases = new ArrayList();
        int numLeases = 0;
        int numWorkers = 0;
        int numLeasesToReachTarget = 0;
        int leaseSpillover = 0;
        int veryOldLeaseCount = 0;
        try {
            int target;
            numLeases = this.allLeases.size();
            numWorkers = leaseCounts.size();
            if (numLeases == 0) {
                HashSet<Lease> hashSet = leasesToTake;
                return hashSet;
            }
            if (numWorkers >= numLeases) {
                target = 1;
            } else {
                target = numLeases / numWorkers + (numLeases % numWorkers == 0 ? 0 : 1);
                leaseSpillover = Math.max(0, target - this.maxLeasesForWorker);
                if (target > this.maxLeasesForWorker) {
                    log.warn("Worker {} target is {} leases and maxLeasesForWorker is {}. Resetting target to {}, lease spillover is {}. Note that some shards may not be processed if no other workers are able to pick them up.", new Object[]{this.workerIdentifier, target, this.maxLeasesForWorker, this.maxLeasesForWorker, leaseSpillover});
                    target = this.maxLeasesForWorker;
                }
            }
            int myCount = leaseCounts.get(this.workerIdentifier);
            int currentLeaseCount = leaseCounts.get(this.workerIdentifier);
            veryOldLeases = this.allLeases.values().stream().filter(lease -> System.nanoTime() - lease.lastCounterIncrementNanos() > this.veryOldLeaseDurationNanosMultiplier * this.leaseDurationNanos).collect(Collectors.toList());
            if (!veryOldLeases.isEmpty()) {
                Collections.shuffle(veryOldLeases);
                veryOldLeaseCount = Math.max(0, Math.min(this.maxLeasesForWorker - currentLeaseCount, veryOldLeases.size()));
                HashSet<Object> result = new HashSet<Object>(veryOldLeases.subList(0, veryOldLeaseCount));
                if (veryOldLeaseCount > 0) {
                    log.info("Taking leases that have been expired for a long time: {}", result);
                }
                HashSet<Object> hashSet = result;
                return hashSet;
            }
            if (numLeasesToReachTarget <= 0) {
                HashSet<Lease> result = leasesToTake;
                return result;
            }
            Collections.shuffle(expiredLeases);
            if (expiredLeases.size() > 0) {
                for (numLeasesToReachTarget = target - myCount; numLeasesToReachTarget > 0 && expiredLeases.size() > 0; --numLeasesToReachTarget) {
                    leasesToTake.add(expiredLeases.remove(0));
                }
            } else {
                List<Lease> leasesToSteal = this.chooseLeasesToSteal(leaseCounts, numLeasesToReachTarget, target);
                for (Lease leaseToSteal : leasesToSteal) {
                    log.info("Worker {} needed {} leases but none were expired, so it will steal lease {} from {}", new Object[]{this.workerIdentifier, numLeasesToReachTarget, leaseToSteal.leaseKey(), leaseToSteal.leaseOwner()});
                    leasesToTake.add(leaseToSteal);
                }
            }
            if (!leasesToTake.isEmpty()) {
                log.info("Worker {} saw {} total leases, {} available leases, {} workers. Target is {} leases, I have {} leases, I will take {} leases", new Object[]{this.workerIdentifier, numLeases, expiredLeases.size(), numWorkers, target, myCount, leasesToTake.size()});
            }
        }
        finally {
            scope.addData("ExpiredLeases", expiredLeases.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY);
            scope.addData("LeaseSpillover", leaseSpillover, StandardUnit.COUNT, MetricsLevel.SUMMARY);
            scope.addData("LeasesToTake", leasesToTake.size(), StandardUnit.COUNT, MetricsLevel.DETAILED);
            scope.addData("NeededLeases", Math.max(numLeasesToReachTarget, 0), StandardUnit.COUNT, MetricsLevel.DETAILED);
            scope.addData("NumWorkers", numWorkers, StandardUnit.COUNT, MetricsLevel.SUMMARY);
            scope.addData("TotalLeases", numLeases, StandardUnit.COUNT, MetricsLevel.DETAILED);
            scope.addData("VeryOldLeases", veryOldLeaseCount, StandardUnit.COUNT, MetricsLevel.SUMMARY);
            MetricsUtil.endScope(scope);
        }
        return leasesToTake;
    }

    private List<Lease> chooseLeasesToSteal(Map<String, Integer> leaseCounts, int needed, int target) {
        ArrayList<Lease> leasesToSteal = new ArrayList<Lease>();
        Map.Entry<String, Integer> mostLoadedWorker = null;
        for (Map.Entry<String, Integer> worker : leaseCounts.entrySet()) {
            if (mostLoadedWorker != null && (Integer)mostLoadedWorker.getValue() >= worker.getValue()) continue;
            mostLoadedWorker = worker;
        }
        int numLeasesToSteal = 0;
        if ((Integer)mostLoadedWorker.getValue() >= target && needed > 0) {
            int leasesOverTarget = mostLoadedWorker.getValue() - target;
            numLeasesToSteal = Math.min(needed, leasesOverTarget);
            if (needed > 1 && numLeasesToSteal == 0) {
                numLeasesToSteal = 1;
            }
            numLeasesToSteal = Math.min(numLeasesToSteal, this.maxLeasesToStealAtOneTime);
        }
        if (numLeasesToSteal <= 0) {
            if (log.isDebugEnabled()) {
                log.debug(String.format("Worker %s not stealing from most loaded worker %s.  He has %d, target is %d, and I need %d", this.workerIdentifier, mostLoadedWorker.getKey(), mostLoadedWorker.getValue(), target, needed));
            }
            return leasesToSteal;
        }
        if (log.isDebugEnabled()) {
            log.debug("Worker {} will attempt to steal {} leases from most loaded worker {}.  He has {} leases, target is {}, I need {}, maxLeasesToSteatAtOneTime is {}.", new Object[]{this.workerIdentifier, numLeasesToSteal, mostLoadedWorker.getKey(), mostLoadedWorker.getValue(), target, needed, this.maxLeasesToStealAtOneTime});
        }
        String mostLoadedWorkerIdentifier = mostLoadedWorker.getKey();
        ArrayList<Lease> candidates = new ArrayList<Lease>();
        for (Lease lease : this.allLeases.values()) {
            if (!mostLoadedWorkerIdentifier.equals(lease.leaseOwner())) continue;
            candidates.add(lease);
        }
        Collections.shuffle(candidates);
        int toIndex = Math.min(candidates.size(), numLeasesToSteal);
        leasesToSteal.addAll(candidates.subList(0, toIndex));
        return leasesToSteal;
    }

    private Map<String, Integer> computeLeaseCounts(List<Lease> expiredLeases) {
        HashMap<String, Integer> leaseCounts = new HashMap<String, Integer>();
        HashSet<Lease> expiredLeasesSet = new HashSet<Lease>(expiredLeases);
        for (Lease lease : this.allLeases.values()) {
            if (expiredLeasesSet.contains(lease)) continue;
            String leaseOwner = lease.leaseOwner();
            Integer oldCount = (Integer)leaseCounts.get(leaseOwner);
            if (oldCount == null) {
                leaseCounts.put(leaseOwner, 1);
                continue;
            }
            leaseCounts.put(leaseOwner, oldCount + 1);
        }
        Integer myCount = (Integer)leaseCounts.get(this.workerIdentifier);
        if (myCount == null) {
            myCount = 0;
            leaseCounts.put(this.workerIdentifier, myCount);
        }
        return leaseCounts;
    }

    @Override
    public String getWorkerIdentifier() {
        return this.workerIdentifier;
    }

    @Override
    public synchronized List<Lease> allLeases() {
        return new ArrayList<Lease>(this.allLeases.values());
    }
}

