/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.kinesis.leases.dynamodb;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.LeaseRenewer;
import software.amazon.kinesis.leases.LeaseTaker;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRenewer;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseTaker;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.LeasingException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil;

@KinesisClientInternalApi
public class DynamoDBLeaseCoordinator
implements LeaseCoordinator {
    private static final Logger log = LoggerFactory.getLogger(DynamoDBLeaseCoordinator.class);
    private static final long STOP_WAIT_TIME_MILLIS = 2000L;
    private static final ThreadFactory LEASE_COORDINATOR_THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("LeaseCoordinator-%04d").setDaemon(true).build();
    private static final ThreadFactory LEASE_RENEWAL_THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("LeaseRenewer-%04d").setDaemon(true).build();
    private final LeaseRenewer leaseRenewer;
    private final LeaseTaker leaseTaker;
    private final long renewerIntervalMillis;
    private final long takerIntervalMillis;
    private final ExecutorService leaseRenewalThreadpool;
    private final LeaseRefresher leaseRefresher;
    private long initialLeaseTableReadCapacity;
    private long initialLeaseTableWriteCapacity;
    protected final MetricsFactory metricsFactory;
    private final Object shutdownLock = new Object();
    private ScheduledExecutorService leaseCoordinatorThreadPool;
    private ScheduledFuture<?> takerFuture;
    private volatile boolean running = false;

    @Deprecated
    public DynamoDBLeaseCoordinator(LeaseRefresher leaseRefresher, String workerIdentifier, long leaseDurationMillis, long epsilonMillis, int maxLeasesForWorker, int maxLeasesToStealAtOneTime, int maxLeaseRenewerThreadCount, MetricsFactory metricsFactory) {
        this(leaseRefresher, workerIdentifier, leaseDurationMillis, epsilonMillis, maxLeasesForWorker, maxLeasesToStealAtOneTime, maxLeaseRenewerThreadCount, 10L, 10L, metricsFactory);
    }

    public DynamoDBLeaseCoordinator(LeaseRefresher leaseRefresher, String workerIdentifier, long leaseDurationMillis, long epsilonMillis, int maxLeasesForWorker, int maxLeasesToStealAtOneTime, int maxLeaseRenewerThreadCount, long initialLeaseTableReadCapacity, long initialLeaseTableWriteCapacity, MetricsFactory metricsFactory) {
        this.leaseRefresher = leaseRefresher;
        this.leaseRenewalThreadpool = DynamoDBLeaseCoordinator.getLeaseRenewalExecutorService(maxLeaseRenewerThreadCount);
        this.leaseTaker = new DynamoDBLeaseTaker(leaseRefresher, workerIdentifier, leaseDurationMillis, metricsFactory).withMaxLeasesForWorker(maxLeasesForWorker).withMaxLeasesToStealAtOneTime(maxLeasesToStealAtOneTime);
        this.leaseRenewer = new DynamoDBLeaseRenewer(leaseRefresher, workerIdentifier, leaseDurationMillis, this.leaseRenewalThreadpool, metricsFactory);
        this.renewerIntervalMillis = leaseDurationMillis / 3L - epsilonMillis;
        this.takerIntervalMillis = (leaseDurationMillis + epsilonMillis) * 2L;
        if (initialLeaseTableReadCapacity <= 0L) {
            throw new IllegalArgumentException("readCapacity should be >= 1");
        }
        this.initialLeaseTableReadCapacity = initialLeaseTableReadCapacity;
        if (initialLeaseTableWriteCapacity <= 0L) {
            throw new IllegalArgumentException("writeCapacity should be >= 1");
        }
        this.initialLeaseTableWriteCapacity = initialLeaseTableWriteCapacity;
        this.metricsFactory = metricsFactory;
        log.info("With failover time {} ms and epsilon {} ms, LeaseCoordinator will renew leases every {} ms, takeleases every {} ms, process maximum of {} leases and steal {} lease(s) at a time.", new Object[]{leaseDurationMillis, epsilonMillis, this.renewerIntervalMillis, this.takerIntervalMillis, maxLeasesForWorker, maxLeasesToStealAtOneTime});
    }

    @Override
    public void initialize() throws ProvisionedThroughputException, DependencyException, IllegalStateException {
        boolean newTableCreated = this.leaseRefresher.createLeaseTableIfNotExists(this.initialLeaseTableReadCapacity, this.initialLeaseTableWriteCapacity);
        if (newTableCreated) {
            log.info("Created new lease table for coordinator with initial read capacity of {} and write capacity of {}.", (Object)this.initialLeaseTableReadCapacity, (Object)this.initialLeaseTableWriteCapacity);
        }
        long secondsBetweenPolls = 10L;
        long timeoutSeconds = 600L;
        boolean isTableActive = this.leaseRefresher.waitUntilLeaseTableExists(10L, 600L);
        if (!isTableActive) {
            throw new DependencyException(new IllegalStateException("Creating table timeout"));
        }
    }

    @Override
    public void start() throws DependencyException, InvalidStateException, ProvisionedThroughputException {
        this.leaseRenewer.initialize();
        this.leaseCoordinatorThreadPool = Executors.newScheduledThreadPool(2, LEASE_COORDINATOR_THREAD_FACTORY);
        this.takerFuture = this.leaseCoordinatorThreadPool.scheduleWithFixedDelay(new TakerRunnable(), 0L, this.takerIntervalMillis, TimeUnit.MILLISECONDS);
        this.leaseCoordinatorThreadPool.scheduleAtFixedRate(new RenewerRunnable(), 0L, this.renewerIntervalMillis, TimeUnit.MILLISECONDS);
        this.running = true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void runLeaseTaker() throws DependencyException, InvalidStateException {
        MetricsScope scope = MetricsUtil.createMetricsWithOperation(this.metricsFactory, "TakeLeases");
        long startTime = System.currentTimeMillis();
        boolean success = false;
        try {
            Map<String, Lease> takenLeases = this.leaseTaker.takeLeases();
            Object object = this.shutdownLock;
            synchronized (object) {
                if (this.running) {
                    this.leaseRenewer.addLeasesToRenew(takenLeases.values());
                }
            }
            success = true;
        }
        finally {
            MetricsUtil.addWorkerIdentifier(scope, this.workerIdentifier());
            MetricsUtil.addSuccessAndLatency(scope, success, startTime, MetricsLevel.SUMMARY);
            MetricsUtil.endScope(scope);
        }
    }

    @Override
    public void runLeaseRenewer() throws DependencyException, InvalidStateException {
        this.leaseRenewer.renewLeases();
    }

    @Override
    public Collection<Lease> getAssignments() {
        return this.leaseRenewer.getCurrentlyHeldLeases().values();
    }

    @Override
    public List<Lease> allLeases() {
        return this.leaseTaker.allLeases();
    }

    @Override
    public Lease getCurrentlyHeldLease(String leaseKey) {
        return this.leaseRenewer.getCurrentlyHeldLease(leaseKey);
    }

    @Override
    public String workerIdentifier() {
        return this.leaseTaker.getWorkerIdentifier();
    }

    @Override
    public LeaseRefresher leaseRefresher() {
        return this.leaseRefresher;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void stop() {
        block8: {
            if (this.leaseCoordinatorThreadPool != null) {
                this.leaseCoordinatorThreadPool.shutdown();
                try {
                    if (this.leaseCoordinatorThreadPool.awaitTermination(2000L, TimeUnit.MILLISECONDS)) {
                        log.info("Worker {} has successfully stopped lease-tracking threads", (Object)this.leaseTaker.getWorkerIdentifier());
                        break block8;
                    }
                    this.leaseCoordinatorThreadPool.shutdownNow();
                    log.info("Worker {} stopped lease-tracking threads {} ms after stop", (Object)this.leaseTaker.getWorkerIdentifier(), (Object)2000L);
                }
                catch (InterruptedException e) {
                    log.debug("Encountered InterruptedException when awaiting threadpool termination");
                }
            } else {
                log.debug("Threadpool was null, no need to shutdown/terminate threadpool.");
            }
        }
        this.leaseRenewalThreadpool.shutdownNow();
        Object object = this.shutdownLock;
        synchronized (object) {
            this.leaseRenewer.clearCurrentlyHeldLeases();
            this.running = false;
        }
    }

    @Override
    public void stopLeaseTaker() {
        this.takerFuture.cancel(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void dropLease(Lease lease) {
        Object object = this.shutdownLock;
        synchronized (object) {
            if (lease != null) {
                this.leaseRenewer.dropLease(lease);
            }
        }
    }

    @Override
    public boolean isRunning() {
        return this.running;
    }

    @Override
    public boolean updateLease(Lease lease, UUID concurrencyToken, String operation, String shardId) throws DependencyException, InvalidStateException, ProvisionedThroughputException {
        return this.leaseRenewer.updateLease(lease, concurrencyToken, operation, shardId);
    }

    private static ExecutorService getLeaseRenewalExecutorService(int maximumPoolSize) {
        int coreLeaseCount = Math.max(maximumPoolSize / 4, 2);
        return new ThreadPoolExecutor(coreLeaseCount, maximumPoolSize, 60L, TimeUnit.SECONDS, new LinkedTransferQueue<Runnable>(), LEASE_RENEWAL_THREAD_FACTORY);
    }

    @Override
    public List<ShardInfo> getCurrentAssignments() {
        Collection<Lease> leases = this.getAssignments();
        return DynamoDBLeaseCoordinator.convertLeasesToAssignments(leases);
    }

    private static List<ShardInfo> convertLeasesToAssignments(Collection<Lease> leases) {
        if (leases == null) {
            return Collections.emptyList();
        }
        return leases.stream().map(DynamoDBLeaseCoordinator::convertLeaseToAssignment).collect(Collectors.toList());
    }

    public static ShardInfo convertLeaseToAssignment(Lease lease) {
        return new ShardInfo(lease.leaseKey(), lease.concurrencyToken().toString(), lease.parentShardIds(), lease.checkpoint());
    }

    @Override
    @Deprecated
    public DynamoDBLeaseCoordinator initialLeaseTableReadCapacity(long readCapacity) {
        if (readCapacity <= 0L) {
            throw new IllegalArgumentException("readCapacity should be >= 1");
        }
        this.initialLeaseTableReadCapacity = readCapacity;
        return this;
    }

    @Override
    @Deprecated
    public DynamoDBLeaseCoordinator initialLeaseTableWriteCapacity(long writeCapacity) {
        if (writeCapacity <= 0L) {
            throw new IllegalArgumentException("writeCapacity should be >= 1");
        }
        this.initialLeaseTableWriteCapacity = writeCapacity;
        return this;
    }

    private class RenewerRunnable
    implements Runnable {
        private RenewerRunnable() {
        }

        @Override
        public void run() {
            try {
                DynamoDBLeaseCoordinator.this.runLeaseRenewer();
            }
            catch (LeasingException e) {
                log.error("LeasingException encountered in lease renewing thread", (Throwable)e);
            }
            catch (Throwable t) {
                log.error("Throwable encountered in lease renewing thread", t);
            }
        }
    }

    private class TakerRunnable
    implements Runnable {
        private TakerRunnable() {
        }

        @Override
        public void run() {
            try {
                DynamoDBLeaseCoordinator.this.runLeaseTaker();
            }
            catch (LeasingException e) {
                log.error("LeasingException encountered in lease taking thread", (Throwable)e);
            }
            catch (Throwable t) {
                log.error("Throwable encountered in lease taking thread", t);
            }
        }
    }
}

