/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.client.stream.impl;

import com.google.common.annotations.VisibleForTesting;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.stream.EventWriterConfig;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.Transaction;
import io.pravega.client.stream.impl.Controller;
import io.pravega.common.Exceptions;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.concurrent.GuardedBy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Pinger
implements AutoCloseable {
    @SuppressFBWarnings(justification="generated code")
    private static final Logger log = LoggerFactory.getLogger(Pinger.class);
    private static final double PING_INTERVAL_FACTOR = 0.5;
    private static final long MINIMUM_PING_INTERVAL_MS = TimeUnit.SECONDS.toMillis(10L);
    private final Stream stream;
    private final Controller controller;
    private final long txnLeaseMillis;
    private final long pingIntervalMillis;
    private final ScheduledExecutorService executor;
    private final Object lock = new Object();
    @GuardedBy(value="lock")
    private final Set<UUID> txnList = new HashSet<UUID>();
    @VisibleForTesting
    @GuardedBy(value="lock")
    private final Set<UUID> completedTxns = new HashSet<UUID>();
    private final AtomicBoolean isStarted = new AtomicBoolean();
    private final AtomicReference<ScheduledFuture<?>> scheduledFuture = new AtomicReference();

    Pinger(EventWriterConfig config, Stream stream, Controller controller, ScheduledExecutorService executor) {
        this.txnLeaseMillis = config.getTransactionTimeoutTime();
        this.pingIntervalMillis = this.getPingInterval(this.txnLeaseMillis);
        this.stream = stream;
        this.controller = controller;
        this.executor = executor;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void startPing(UUID txnID) {
        Object object = this.lock;
        synchronized (object) {
            this.txnList.add(txnID);
        }
        this.startPeriodicPingTxn();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void stopPing(UUID txnID) {
        Object object = this.lock;
        synchronized (object) {
            this.txnList.remove(txnID);
        }
    }

    private long getPingInterval(long txnLeaseMillis) {
        double pingInterval = (double)txnLeaseMillis * 0.5;
        if (pingInterval < (double)MINIMUM_PING_INTERVAL_MS) {
            log.warn("Transaction ping interval is less than 10 seconds(lower bound)");
        }
        return Math.max(MINIMUM_PING_INTERVAL_MS, (long)pingInterval);
    }

    private void startPeriodicPingTxn() {
        if (!this.isStarted.getAndSet(true)) {
            log.info("Starting Pinger at an interval of {}ms ", (Object)this.pingIntervalMillis);
            this.scheduledFuture.set(this.executor.scheduleAtFixedRate(this::pingTransactions, 10L, this.pingIntervalMillis, TimeUnit.MILLISECONDS));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void pingTransactions() {
        log.info("Start sending transaction pings.");
        Object object = this.lock;
        synchronized (object) {
            this.txnList.removeAll(this.completedTxns);
            this.completedTxns.clear();
            this.txnList.forEach(uuid -> {
                try {
                    log.debug("Sending ping request for txn ID: {} with lease: {}", uuid, (Object)this.txnLeaseMillis);
                    this.controller.pingTransaction(this.stream, (UUID)uuid, this.txnLeaseMillis).whenComplete((status, e) -> {
                        if (e != null) {
                            log.warn("Ping Transaction for txn ID:{} failed", uuid, (Object)Exceptions.unwrap((Throwable)e));
                        } else if (Transaction.PingStatus.ABORTED.equals(status) || Transaction.PingStatus.COMMITTED.equals(status)) {
                            this.completedTxns.add((UUID)uuid);
                        }
                    });
                }
                catch (Exception e2) {
                    log.warn("Encountered exception when attempting to ping transactions", (Throwable)e2);
                }
            });
        }
        log.trace("Completed sending transaction pings.");
    }

    @Override
    public void close() {
        log.info("Closing Pinger periodic task");
        ScheduledFuture future = this.scheduledFuture.getAndSet(null);
        if (future != null) {
            future.cancel(false);
        }
    }

    @SuppressFBWarnings(justification="generated code")
    Set<UUID> getCompletedTxns() {
        return this.completedTxns;
    }
}

