/*
 * Decompiled with CFR 0.152.
 */
package org.graylog.shaded.opensearch2.org.opensearch.index.seqno;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.graylog.shaded.opensearch2.org.opensearch.LegacyESVersion;
import org.graylog.shaded.opensearch2.org.opensearch.Version;
import org.graylog.shaded.opensearch2.org.opensearch.action.support.GroupedActionListener;
import org.graylog.shaded.opensearch2.org.opensearch.action.support.replication.ReplicationResponse;
import org.graylog.shaded.opensearch2.org.opensearch.cluster.metadata.IndexMetadata;
import org.graylog.shaded.opensearch2.org.opensearch.cluster.routing.AllocationId;
import org.graylog.shaded.opensearch2.org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.graylog.shaded.opensearch2.org.opensearch.cluster.routing.ShardRouting;
import org.graylog.shaded.opensearch2.org.opensearch.common.SuppressForbidden;
import org.graylog.shaded.opensearch2.org.opensearch.common.annotation.PublicApi;
import org.graylog.shaded.opensearch2.org.opensearch.common.collect.Tuple;
import org.graylog.shaded.opensearch2.org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.graylog.shaded.opensearch2.org.opensearch.core.action.ActionListener;
import org.graylog.shaded.opensearch2.org.opensearch.core.common.io.stream.StreamInput;
import org.graylog.shaded.opensearch2.org.opensearch.core.common.io.stream.StreamOutput;
import org.graylog.shaded.opensearch2.org.opensearch.core.common.io.stream.Writeable;
import org.graylog.shaded.opensearch2.org.opensearch.core.index.shard.ShardId;
import org.graylog.shaded.opensearch2.org.opensearch.core.xcontent.NamedXContentRegistry;
import org.graylog.shaded.opensearch2.org.opensearch.gateway.WriteStateException;
import org.graylog.shaded.opensearch2.org.opensearch.index.IndexSettings;
import org.graylog.shaded.opensearch2.org.opensearch.index.SegmentReplicationShardStats;
import org.graylog.shaded.opensearch2.org.opensearch.index.engine.SafeCommitInfo;
import org.graylog.shaded.opensearch2.org.opensearch.index.seqno.RetentionLease;
import org.graylog.shaded.opensearch2.org.opensearch.index.seqno.RetentionLeaseAlreadyExistsException;
import org.graylog.shaded.opensearch2.org.opensearch.index.seqno.RetentionLeaseInvalidRetainingSeqNoException;
import org.graylog.shaded.opensearch2.org.opensearch.index.seqno.RetentionLeaseNotFoundException;
import org.graylog.shaded.opensearch2.org.opensearch.index.seqno.RetentionLeases;
import org.graylog.shaded.opensearch2.org.opensearch.index.shard.AbstractIndexShardComponent;
import org.graylog.shaded.opensearch2.org.opensearch.index.shard.ReplicationGroup;
import org.graylog.shaded.opensearch2.org.opensearch.index.store.Store;
import org.graylog.shaded.opensearch2.org.opensearch.index.store.StoreFileMetadata;
import org.graylog.shaded.opensearch2.org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.graylog.shaded.opensearch2.org.opensearch.indices.replication.common.ReplicationTimer;
import org.graylog.shaded.opensearch2.org.opensearch.indices.replication.common.SegmentReplicationLagTimer;

public class ReplicationTracker
extends AbstractIndexShardComponent
implements LongSupplier {
    final String shardAllocationId;
    volatile boolean primaryMode;
    private volatile long operationPrimaryTerm;
    boolean handoffInProgress;
    volatile boolean relocated;
    long appliedClusterStateVersion;
    IndexShardRoutingTable routingTable;
    final Map<String, CheckpointState> checkpoints;
    volatile long globalCheckpoint;
    private final LongConsumer onGlobalCheckpointUpdated;
    private final LongSupplier currentTimeMillisSupplier;
    private final BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onSyncRetentionLeases;
    final Set<String> pendingInSync;
    volatile ReplicationGroup replicationGroup;
    private RetentionLeases retentionLeases = RetentionLeases.EMPTY;
    private long persistedRetentionLeasesPrimaryTerm;
    private long persistedRetentionLeasesVersion;
    private boolean hasAllPeerRecoveryRetentionLeases;
    private final Supplier<SafeCommitInfo> safeCommitInfoSupplier;
    private final double fileBasedRecoveryThreshold;
    private final Consumer<ReplicationGroup> onReplicationGroupUpdated;
    private volatile ReplicationCheckpoint latestReplicationCheckpoint;
    private final Function<String, Boolean> isShardOnRemoteEnabledNode;
    private boolean createdMissingRetentionLeases;
    private final Object retentionLeasePersistenceLock = new Object();
    public static final String PEER_RECOVERY_RETENTION_LEASE_SOURCE = "peer recovery";

    public RetentionLeases getRetentionLeases() {
        return this.getRetentionLeases(false).v2();
    }

    public synchronized Tuple<Boolean, RetentionLeases> getRetentionLeases(boolean expireLeases) {
        if (!expireLeases) {
            return Tuple.tuple(false, this.retentionLeases);
        }
        assert (this.primaryMode);
        long currentTimeMillis = this.currentTimeMillisSupplier.getAsLong();
        long retentionLeaseMillis = this.indexSettings.getRetentionLeaseMillis();
        Set leaseIdsForCurrentPeers = this.routingTable.assignedShards().stream().map(ReplicationTracker::getPeerRecoveryRetentionLeaseId).collect(Collectors.toSet());
        boolean allShardsStarted = this.routingTable.allShardsStarted();
        long minimumReasonableRetainedSeqNo = allShardsStarted ? 0L : this.getMinimumReasonableRetainedSeqNo();
        Map<Boolean, List<RetentionLease>> partitionByExpiration = this.retentionLeases.leases().stream().collect(Collectors.groupingBy(lease -> {
            if (lease.source().equals(PEER_RECOVERY_RETENTION_LEASE_SOURCE)) {
                if (leaseIdsForCurrentPeers.contains(lease.id())) {
                    return false;
                }
                if (allShardsStarted) {
                    this.logger.trace("expiring unused [{}]", lease);
                    return true;
                }
                if (lease.retainingSequenceNumber() < minimumReasonableRetainedSeqNo) {
                    this.logger.trace("expiring unreasonable [{}] retaining history before [{}]", lease, (Object)minimumReasonableRetainedSeqNo);
                    return true;
                }
            }
            return currentTimeMillis - lease.timestamp() > retentionLeaseMillis;
        }));
        Collection expiredLeases = partitionByExpiration.get(true);
        if (expiredLeases == null) {
            this.logger.debug("no retention leases are expired from current retention leases [{}]", (Object)this.retentionLeases);
            return Tuple.tuple(false, this.retentionLeases);
        }
        List<RetentionLease> nonExpiredLeases = partitionByExpiration.get(false) != null ? (Collection)partitionByExpiration.get(false) : Collections.emptyList();
        this.logger.debug("expiring retention leases [{}] from current retention leases [{}]", (Object)expiredLeases, (Object)this.retentionLeases);
        this.retentionLeases = new RetentionLeases(this.operationPrimaryTerm, this.retentionLeases.version() + 1L, nonExpiredLeases);
        return Tuple.tuple(true, this.retentionLeases);
    }

    private long getMinimumReasonableRetainedSeqNo() {
        SafeCommitInfo safeCommitInfo = this.safeCommitInfoSupplier.get();
        return safeCommitInfo.localCheckpoint + 1L - Math.round(Math.ceil((double)safeCommitInfo.docCount * this.fileBasedRecoveryThreshold));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public RetentionLease addRetentionLease(String id, long retainingSequenceNumber, String source, ActionListener<ReplicationResponse> listener) {
        RetentionLeases currentRetentionLeases;
        RetentionLease retentionLease;
        Objects.requireNonNull(listener);
        ReplicationTracker replicationTracker = this;
        synchronized (replicationTracker) {
            retentionLease = this.innerAddRetentionLease(id, retainingSequenceNumber, source);
            currentRetentionLeases = this.retentionLeases;
        }
        this.onSyncRetentionLeases.accept(currentRetentionLeases, listener);
        return retentionLease;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    RetentionLease cloneRetentionLease(String sourceLeaseId, String targetLeaseId, ActionListener<ReplicationResponse> listener) {
        RetentionLeases currentRetentionLeases;
        RetentionLease retentionLease;
        Objects.requireNonNull(listener);
        ReplicationTracker replicationTracker = this;
        synchronized (replicationTracker) {
            assert (this.primaryMode);
            if (!this.getRetentionLeases().contains(sourceLeaseId)) {
                throw new RetentionLeaseNotFoundException(sourceLeaseId);
            }
            RetentionLease sourceLease = this.getRetentionLeases().get(sourceLeaseId);
            retentionLease = this.innerAddRetentionLease(targetLeaseId, sourceLease.retainingSequenceNumber(), sourceLease.source());
            currentRetentionLeases = this.retentionLeases;
        }
        this.onSyncRetentionLeases.accept(currentRetentionLeases, listener);
        return retentionLease;
    }

    private RetentionLease innerAddRetentionLease(String id, long retainingSequenceNumber, String source) {
        assert (Thread.holdsLock(this));
        assert (this.primaryMode) : id + "/" + retainingSequenceNumber + "/" + source;
        if (this.retentionLeases.contains(id)) {
            throw new RetentionLeaseAlreadyExistsException(id);
        }
        RetentionLease retentionLease = new RetentionLease(id, retainingSequenceNumber, this.currentTimeMillisSupplier.getAsLong(), source);
        this.logger.debug("adding new retention lease [{}] to current retention leases [{}]", (Object)retentionLease, (Object)this.retentionLeases);
        this.retentionLeases = new RetentionLeases(this.operationPrimaryTerm, this.retentionLeases.version() + 1L, Stream.concat(this.retentionLeases.leases().stream(), Stream.of(retentionLease)).collect(Collectors.toList()));
        return retentionLease;
    }

    public synchronized RetentionLease renewRetentionLease(String id, long retainingSequenceNumber, String source) {
        assert (this.primaryMode);
        RetentionLease existingRetentionLease = this.retentionLeases.get(id);
        if (existingRetentionLease == null) {
            throw new RetentionLeaseNotFoundException(id);
        }
        if (retainingSequenceNumber < existingRetentionLease.retainingSequenceNumber()) {
            assert (!PEER_RECOVERY_RETENTION_LEASE_SOURCE.equals(source)) : "renewing peer recovery retention lease [" + String.valueOf(existingRetentionLease) + "] with a lower retaining sequence number [" + retainingSequenceNumber + "]";
            throw new RetentionLeaseInvalidRetainingSeqNoException(id, source, retainingSequenceNumber, existingRetentionLease);
        }
        RetentionLease retentionLease = new RetentionLease(id, retainingSequenceNumber, this.currentTimeMillisSupplier.getAsLong(), source);
        this.retentionLeases = new RetentionLeases(this.operationPrimaryTerm, this.retentionLeases.version() + 1L, Stream.concat(this.retentionLeases.leases().stream().filter(lease -> !lease.id().equals(id)), Stream.of(retentionLease)).collect(Collectors.toList()));
        return retentionLease;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void removeRetentionLease(String id, ActionListener<ReplicationResponse> listener) {
        RetentionLeases currentRetentionLeases;
        Objects.requireNonNull(listener);
        ReplicationTracker replicationTracker = this;
        synchronized (replicationTracker) {
            assert (this.primaryMode);
            if (!this.retentionLeases.contains(id)) {
                throw new RetentionLeaseNotFoundException(id);
            }
            this.logger.debug("removing retention lease [{}] from current retention leases [{}]", (Object)id, (Object)this.retentionLeases);
            currentRetentionLeases = this.retentionLeases = new RetentionLeases(this.operationPrimaryTerm, this.retentionLeases.version() + 1L, this.retentionLeases.leases().stream().filter(lease -> !lease.id().equals(id)).collect(Collectors.toList()));
        }
        this.onSyncRetentionLeases.accept(currentRetentionLeases, listener);
    }

    public synchronized void updateRetentionLeasesOnReplica(RetentionLeases retentionLeases) {
        assert (!this.primaryMode);
        if (retentionLeases.supersedes(this.retentionLeases)) {
            this.retentionLeases = retentionLeases;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public RetentionLeases loadRetentionLeases(Path path) throws IOException {
        RetentionLeases retentionLeases;
        Object object = this.retentionLeasePersistenceLock;
        synchronized (object) {
            retentionLeases = RetentionLeases.FORMAT.loadLatestState(this.logger, NamedXContentRegistry.EMPTY, path);
        }
        assert (Version.CURRENT.major <= 8) : "throw an exception instead of returning EMPTY on null";
        if (retentionLeases == null) {
            return RetentionLeases.EMPTY;
        }
        return retentionLeases;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void persistRetentionLeases(Path path) throws WriteStateException {
        Object object = this.retentionLeasePersistenceLock;
        synchronized (object) {
            RetentionLeases currentRetentionLeases;
            ReplicationTracker replicationTracker = this;
            synchronized (replicationTracker) {
                if (!this.retentionLeases.supersedes(this.persistedRetentionLeasesPrimaryTerm, this.persistedRetentionLeasesVersion)) {
                    this.logger.trace("skipping persisting retention leases [{}], already persisted", (Object)this.retentionLeases);
                    return;
                }
                currentRetentionLeases = this.retentionLeases;
            }
            this.logger.trace("persisting retention leases [{}]", (Object)currentRetentionLeases);
            RetentionLeases.FORMAT.writeAndCleanup(currentRetentionLeases, path);
            this.persistedRetentionLeasesPrimaryTerm = currentRetentionLeases.primaryTerm();
            this.persistedRetentionLeasesVersion = currentRetentionLeases.version();
        }
    }

    public boolean assertRetentionLeasesPersisted(Path path) throws IOException {
        assert (RetentionLeases.FORMAT.loadLatestState(this.logger, NamedXContentRegistry.EMPTY, path) != null);
        return true;
    }

    public RetentionLease addPeerRecoveryRetentionLease(String nodeId, long globalCheckpoint, ActionListener<ReplicationResponse> listener) {
        return this.addRetentionLease(ReplicationTracker.getPeerRecoveryRetentionLeaseId(nodeId), globalCheckpoint + 1L, PEER_RECOVERY_RETENTION_LEASE_SOURCE, listener);
    }

    public RetentionLease cloneLocalPeerRecoveryRetentionLease(String nodeId, ActionListener<ReplicationResponse> listener) {
        return this.cloneRetentionLease(ReplicationTracker.getPeerRecoveryRetentionLeaseId(this.routingTable.primaryShard()), ReplicationTracker.getPeerRecoveryRetentionLeaseId(nodeId), listener);
    }

    public void removePeerRecoveryRetentionLease(String nodeId, ActionListener<ReplicationResponse> listener) {
        this.removeRetentionLease(ReplicationTracker.getPeerRecoveryRetentionLeaseId(nodeId), listener);
    }

    public static String getPeerRecoveryRetentionLeaseId(String nodeId) {
        return "peer_recovery/" + nodeId;
    }

    public static String getPeerRecoveryRetentionLeaseId(ShardRouting shardRouting) {
        return ReplicationTracker.getPeerRecoveryRetentionLeaseId(shardRouting.currentNodeId());
    }

    public List<RetentionLease> getPeerRecoveryRetentionLeases() {
        return this.getRetentionLeases().leases().stream().filter(lease -> PEER_RECOVERY_RETENTION_LEASE_SOURCE.equals(lease.source())).collect(Collectors.toList());
    }

    public synchronized void renewPeerRecoveryRetentionLeases() {
        assert (this.primaryMode);
        assert (this.invariant());
        long renewalTimeMillis = this.currentTimeMillisSupplier.getAsLong() - this.indexSettings.getRetentionLeaseMillis() / 2L;
        boolean renewalNeeded = StreamSupport.stream(this.routingTable.spliterator(), false).filter(ShardRouting::assignedToNode).anyMatch(shardRouting -> {
            RetentionLease retentionLease = this.retentionLeases.get(ReplicationTracker.getPeerRecoveryRetentionLeaseId(shardRouting));
            if (retentionLease == null) {
                assert (this.checkpoints.get((Object)shardRouting.allocationId().getId()).tracked && !this.checkpoints.get((Object)shardRouting.allocationId().getId()).replicated || !this.checkpoints.get((Object)shardRouting.allocationId().getId()).tracked || !this.hasAllPeerRecoveryRetentionLeases);
                return false;
            }
            return retentionLease.timestamp() <= renewalTimeMillis || retentionLease.retainingSequenceNumber() <= this.checkpoints.get((Object)shardRouting.allocationId().getId()).globalCheckpoint;
        });
        if (renewalNeeded) {
            for (ShardRouting shardRouting2 : this.routingTable) {
                RetentionLease retentionLease;
                if (!shardRouting2.assignedToNode() || (retentionLease = this.retentionLeases.get(ReplicationTracker.getPeerRecoveryRetentionLeaseId(shardRouting2))) == null) continue;
                CheckpointState checkpointState = this.checkpoints.get(shardRouting2.allocationId().getId());
                long newRetainedSequenceNumber = Math.max(0L, checkpointState.globalCheckpoint + 1L);
                if (retentionLease.retainingSequenceNumber() <= newRetainedSequenceNumber) {
                    this.renewRetentionLease(ReplicationTracker.getPeerRecoveryRetentionLeaseId(shardRouting2), newRetainedSequenceNumber, PEER_RECOVERY_RETENTION_LEASE_SOURCE);
                    continue;
                }
                assert (checkpointState.globalCheckpoint == -2L) : "cannot renew " + String.valueOf(retentionLease) + " according to " + String.valueOf(checkpointState) + " for " + String.valueOf(shardRouting2);
            }
        }
        assert (this.invariant());
    }

    public synchronized Map<String, Long> getInSyncGlobalCheckpoints() {
        assert (this.primaryMode);
        assert (!this.handoffInProgress);
        HashMap<String, Long> globalCheckpoints = new HashMap<String, Long>(this.checkpoints.size());
        this.checkpoints.entrySet().stream().filter(e -> ((CheckpointState)e.getValue()).inSync && ((CheckpointState)e.getValue()).replicated).forEach(e -> globalCheckpoints.put((String)e.getKey(), ((CheckpointState)e.getValue()).globalCheckpoint));
        return globalCheckpoints;
    }

    public boolean isPrimaryMode() {
        return this.primaryMode;
    }

    public long getOperationPrimaryTerm() {
        return this.operationPrimaryTerm;
    }

    public void setOperationPrimaryTerm(long operationPrimaryTerm) {
        this.operationPrimaryTerm = operationPrimaryTerm;
    }

    public boolean isRelocated() {
        return this.relocated;
    }

    private boolean invariant() {
        assert (this.primaryMode || this.checkpoints.values().stream().allMatch(lcps -> lcps.localCheckpoint == -2L));
        assert (this.primaryMode || this.checkpoints.values().stream().allMatch(cps -> cps.globalCheckpoint == -2L));
        assert (!this.handoffInProgress || this.primaryMode);
        assert (!this.relocated || !this.primaryMode);
        assert (!this.primaryMode || this.checkpoints.get((Object)this.shardAllocationId).inSync);
        assert (!this.primaryMode || this.checkpoints.get((Object)this.shardAllocationId).tracked);
        assert (!this.primaryMode || this.routingTable != null && this.replicationGroup != null) : "primary mode but routing table is " + String.valueOf(this.routingTable) + " and replication group is " + String.valueOf(this.replicationGroup);
        assert (!this.primaryMode || this.routingTable.primaryShard().allocationId().getId().equals(this.shardAllocationId) || this.routingTable.primaryShard().allocationId().getRelocationId().equals(this.shardAllocationId));
        assert (!this.handoffInProgress || this.pendingInSync.isEmpty()) : "entries blocking global checkpoint advancement during relocation handoff: " + String.valueOf(this.pendingInSync);
        assert (this.pendingInSync.isEmpty() || this.primaryMode && !this.handoffInProgress);
        assert (!this.primaryMode || this.globalCheckpoint == ReplicationTracker.computeGlobalCheckpoint(this.pendingInSync, this.checkpoints.values(), this.globalCheckpoint)) : "global checkpoint is not up-to-date, expected: " + ReplicationTracker.computeGlobalCheckpoint(this.pendingInSync, this.checkpoints.values(), this.globalCheckpoint) + " but was: " + this.globalCheckpoint;
        assert (!this.primaryMode || this.globalCheckpoint <= ReplicationTracker.inSyncCheckpointStates(this.checkpoints, CheckpointState::getLocalCheckpoint, LongStream::min)) : "global checkpoint [" + this.globalCheckpoint + "] for primary mode allocation ID [" + this.shardAllocationId + "] more than in-sync local checkpoints [" + String.valueOf(this.checkpoints) + "]";
        assert (this.routingTable == null == (this.replicationGroup == null)) : "routing table is " + String.valueOf(this.routingTable) + " but replication group is " + String.valueOf(this.replicationGroup);
        assert (this.replicationGroup == null || this.replicationGroup.equals(this.calculateReplicationGroup())) : "cached replication group out of sync: expected: " + String.valueOf(this.calculateReplicationGroup()) + " but was: " + String.valueOf(this.replicationGroup);
        assert (this.routingTable == null || this.checkpoints.keySet().containsAll(this.routingTable.getAllAllocationIds())) : "local checkpoints " + String.valueOf(this.checkpoints) + " not in-sync with routing table " + String.valueOf(this.routingTable);
        for (Map.Entry<String, CheckpointState> entry : this.checkpoints.entrySet()) {
            assert (!this.pendingInSync.contains(entry.getKey()) || !entry.getValue().inSync) : "shard copy " + entry.getKey() + " blocks global checkpoint advancement but is in-sync";
            assert (!entry.getValue().inSync || entry.getValue().tracked) : "shard copy " + entry.getKey() + " is in-sync but not tracked";
        }
        for (String aId : this.pendingInSync) {
            assert (this.checkpoints.get(aId) != null) : "aId [" + aId + "] is pending in sync but isn't tracked";
        }
        if (this.primaryMode && this.indexSettings.isSoftDeleteEnabled() && this.hasAllPeerRecoveryRetentionLeases && this.createdMissingRetentionLeases) {
            for (ShardRouting shardRouting : this.routingTable.assignedShards()) {
                CheckpointState cps2 = this.checkpoints.get(shardRouting.allocationId().getId());
                if (!cps2.tracked || !cps2.replicated) continue;
                assert (this.retentionLeases.contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(shardRouting))) : "no retention lease for tracked shard [" + String.valueOf(shardRouting) + "] in " + String.valueOf(this.retentionLeases);
                assert (PEER_RECOVERY_RETENTION_LEASE_SOURCE.equals(this.retentionLeases.get(ReplicationTracker.getPeerRecoveryRetentionLeaseId(shardRouting)).source())) : "incorrect source [" + this.retentionLeases.get(ReplicationTracker.getPeerRecoveryRetentionLeaseId(shardRouting)).source() + "] for [" + String.valueOf(shardRouting) + "] in " + String.valueOf(this.retentionLeases);
            }
        }
        return true;
    }

    private static long inSyncCheckpointStates(Map<String, CheckpointState> checkpoints, ToLongFunction<CheckpointState> function, Function<LongStream, OptionalLong> reducer) {
        OptionalLong value = reducer.apply(checkpoints.values().stream().filter(cps -> cps.inSync && cps.replicated).mapToLong(function).filter(v -> v != -2L));
        return value.isPresent() ? value.getAsLong() : -2L;
    }

    public ReplicationTracker(ShardId shardId, String allocationId, IndexSettings indexSettings, long operationPrimaryTerm, long globalCheckpoint, LongConsumer onGlobalCheckpointUpdated, LongSupplier currentTimeMillisSupplier, BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onSyncRetentionLeases, Supplier<SafeCommitInfo> safeCommitInfoSupplier, Function<String, Boolean> isShardOnRemoteEnabledNode) {
        this(shardId, allocationId, indexSettings, operationPrimaryTerm, globalCheckpoint, onGlobalCheckpointUpdated, currentTimeMillisSupplier, onSyncRetentionLeases, safeCommitInfoSupplier, x -> {}, isShardOnRemoteEnabledNode);
    }

    public ReplicationTracker(ShardId shardId, String allocationId, IndexSettings indexSettings, long operationPrimaryTerm, long globalCheckpoint, LongConsumer onGlobalCheckpointUpdated, LongSupplier currentTimeMillisSupplier, BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onSyncRetentionLeases, Supplier<SafeCommitInfo> safeCommitInfoSupplier, Consumer<ReplicationGroup> onReplicationGroupUpdated, Function<String, Boolean> isShardOnRemoteEnabledNode) {
        super(shardId, indexSettings);
        assert (globalCheckpoint >= -2L) : "illegal initial global checkpoint: " + globalCheckpoint;
        this.shardAllocationId = allocationId;
        this.primaryMode = false;
        this.operationPrimaryTerm = operationPrimaryTerm;
        this.handoffInProgress = false;
        this.appliedClusterStateVersion = -1L;
        this.globalCheckpoint = globalCheckpoint;
        this.checkpoints = new HashMap<String, CheckpointState>(1 + indexSettings.getNumberOfReplicas());
        this.onGlobalCheckpointUpdated = Objects.requireNonNull(onGlobalCheckpointUpdated);
        this.currentTimeMillisSupplier = Objects.requireNonNull(currentTimeMillisSupplier);
        this.onSyncRetentionLeases = Objects.requireNonNull(onSyncRetentionLeases);
        this.pendingInSync = new HashSet<String>();
        this.routingTable = null;
        this.replicationGroup = null;
        this.hasAllPeerRecoveryRetentionLeases = indexSettings.getIndexVersionCreated().onOrAfter(LegacyESVersion.V_7_6_0) || indexSettings.isSoftDeleteEnabled() && indexSettings.getIndexVersionCreated().onOrAfter(LegacyESVersion.V_7_4_0) && indexSettings.getIndexMetadata().getState() == IndexMetadata.State.OPEN;
        this.fileBasedRecoveryThreshold = IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.get(indexSettings.getSettings());
        this.safeCommitInfoSupplier = safeCommitInfoSupplier;
        this.onReplicationGroupUpdated = onReplicationGroupUpdated;
        this.latestReplicationCheckpoint = indexSettings.isSegRepEnabledOrRemoteNode() ? ReplicationCheckpoint.empty(shardId) : null;
        this.isShardOnRemoteEnabledNode = isShardOnRemoteEnabledNode;
        assert (!Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()));
        assert (this.invariant());
    }

    public ReplicationGroup getReplicationGroup() {
        assert (this.primaryMode);
        return this.replicationGroup;
    }

    private void updateReplicationGroupAndNotify() {
        ReplicationGroup newReplicationGroup;
        assert (Thread.holdsLock(this));
        this.replicationGroup = newReplicationGroup = this.calculateReplicationGroup();
        this.onReplicationGroupUpdated.accept(newReplicationGroup);
    }

    private ReplicationGroup calculateReplicationGroup() {
        long newVersion = this.replicationGroup == null ? 0L : this.replicationGroup.getVersion() + 1L;
        assert (newVersion == 0L || this.indexSettings.isRemoteTranslogStoreEnabled() || this.replicationGroup.getReplicationTargets().stream().anyMatch(shardRouting -> this.isShardOnRemoteEnabledNode.apply(shardRouting.currentNodeId())) || this.checkpoints.entrySet().stream().filter(e -> ((CheckpointState)e.getValue()).tracked).allMatch(e -> ((CheckpointState)e.getValue()).replicated)) : "In absence of remote translog store, all tracked shards must have replication mode as LOGICAL_REPLICATION";
        return new ReplicationGroup(this.routingTable, this.checkpoints.entrySet().stream().filter(e -> ((CheckpointState)e.getValue()).inSync).map(Map.Entry::getKey).collect(Collectors.toSet()), this.checkpoints.entrySet().stream().filter(e -> ((CheckpointState)e.getValue()).tracked).map(Map.Entry::getKey).collect(Collectors.toSet()), newVersion);
    }

    public long getGlobalCheckpoint() {
        return this.globalCheckpoint;
    }

    @Override
    public long getAsLong() {
        return this.globalCheckpoint;
    }

    public synchronized void updateGlobalCheckpointOnReplica(long newGlobalCheckpoint, String reason) {
        assert (this.invariant());
        assert (!this.primaryMode);
        long previousGlobalCheckpoint = this.globalCheckpoint;
        if (newGlobalCheckpoint > previousGlobalCheckpoint) {
            this.globalCheckpoint = newGlobalCheckpoint;
            this.logger.trace("updated global checkpoint from [{}] to [{}] due to [{}]", (Object)previousGlobalCheckpoint, (Object)this.globalCheckpoint, (Object)reason);
            this.onGlobalCheckpointUpdated.accept(this.globalCheckpoint);
        }
        assert (this.invariant());
    }

    public synchronized void updateGlobalCheckpointForShard(String allocationId, long globalCheckpoint) {
        assert (this.primaryMode);
        assert (!this.handoffInProgress);
        assert (this.invariant());
        CheckpointState cps = this.checkpoints.get(allocationId);
        assert (!this.shardAllocationId.equals(allocationId) || cps != null);
        if (cps != null && globalCheckpoint > cps.globalCheckpoint) {
            long previousGlobalCheckpoint = cps.globalCheckpoint;
            cps.globalCheckpoint = globalCheckpoint;
            this.logger.trace("updated local knowledge for [{}] on the primary of the global checkpoint from [{}] to [{}]", (Object)allocationId, (Object)previousGlobalCheckpoint, (Object)globalCheckpoint);
        }
        assert (this.invariant());
    }

    public synchronized void updateVisibleCheckpointForShard(String allocationId, ReplicationCheckpoint visibleCheckpoint) {
        assert (this.indexSettings.isSegRepEnabledOrRemoteNode());
        assert (this.primaryMode);
        assert (!this.handoffInProgress);
        assert (this.invariant());
        CheckpointState cps = this.checkpoints.get(allocationId);
        assert (!this.shardAllocationId.equals(allocationId));
        if (cps == null) {
            this.logger.warn("Ignoring the checkpoint update for allocation ID {} as its not being tracked by primary", (Object)allocationId);
            return;
        }
        if (!cps.checkpointTimers.isEmpty()) {
            AtomicLong lastFinished = new AtomicLong(0L);
            cps.checkpointTimers.entrySet().removeIf(entry -> {
                boolean result;
                boolean bl = result = !((ReplicationCheckpoint)entry.getKey()).isAheadOf(visibleCheckpoint);
                if (result) {
                    SegmentReplicationLagTimer timer = (SegmentReplicationLagTimer)entry.getValue();
                    timer.stop();
                    lastFinished.set(Math.max(lastFinished.get(), timer.totalElapsedTime()));
                }
                return result;
            });
            cps.lastCompletedReplicationLag = lastFinished.get();
        }
        this.logger.trace(() -> new ParameterizedMessage("updated local knowledge for [{}] on the primary of the visible checkpoint from [{}] to [{}], active timers {}", new Object[]{allocationId, cps.visibleReplicationCheckpoint, visibleCheckpoint, cps.checkpointTimers.keySet()}));
        cps.visibleReplicationCheckpoint = visibleCheckpoint;
        assert (this.invariant());
    }

    public synchronized void setLatestReplicationCheckpoint(ReplicationCheckpoint checkpoint) {
        assert (this.indexSettings.isSegRepEnabledOrRemoteNode());
        if (!checkpoint.equals(this.latestReplicationCheckpoint)) {
            this.latestReplicationCheckpoint = checkpoint;
        }
        if (this.primaryMode) {
            this.createReplicationLagTimers();
        }
    }

    public ReplicationCheckpoint getLatestReplicationCheckpoint() {
        return this.latestReplicationCheckpoint;
    }

    private boolean isPrimaryRelocation(String allocationId) {
        Optional<ShardRouting> shardRouting = this.routingTable.shards().stream().filter(routing -> routing.allocationId().getId().equals(allocationId)).findAny();
        return shardRouting.isPresent() && shardRouting.get().primary();
    }

    private void createReplicationLagTimers() {
        for (Map.Entry<String, CheckpointState> entry : this.checkpoints.entrySet()) {
            String allocationId = entry.getKey();
            if (allocationId.equals(this.shardAllocationId)) continue;
            CheckpointState cps = entry.getValue();
            if (!cps.inSync || this.replicationGroup.getUnavailableInSyncShards().contains(allocationId) || this.isPrimaryRelocation(allocationId) || !this.latestReplicationCheckpoint.isAheadOf(cps.visibleReplicationCheckpoint) || !this.indexSettings.isSegRepLocalEnabled() && !this.isShardOnRemoteEnabledNode.apply(this.routingTable.getByAllocationId(allocationId).currentNodeId()).booleanValue()) continue;
            cps.checkpointTimers.computeIfAbsent(this.latestReplicationCheckpoint, ignored -> new SegmentReplicationLagTimer());
            this.logger.trace(() -> new ParameterizedMessage("updated last published checkpoint for {} at visible cp {} to {} - timers [{}]", new Object[]{allocationId, cps.visibleReplicationCheckpoint, this.latestReplicationCheckpoint, cps.checkpointTimers.keySet()}));
        }
    }

    public synchronized void startReplicationLagTimers(ReplicationCheckpoint checkpoint) {
        assert (this.indexSettings.isSegRepEnabledOrRemoteNode());
        if (!checkpoint.equals(this.latestReplicationCheckpoint)) {
            this.latestReplicationCheckpoint = checkpoint;
        }
        if (this.primaryMode) {
            this.checkpoints.entrySet().stream().filter(e -> !((String)e.getKey()).equals(this.shardAllocationId)).forEach(e -> {
                String allocationId = (String)e.getKey();
                CheckpointState cps = (CheckpointState)e.getValue();
                if (cps.inSync && !this.replicationGroup.getUnavailableInSyncShards().contains(allocationId) && !this.isPrimaryRelocation((String)e.getKey()) && this.latestReplicationCheckpoint.isAheadOf(cps.visibleReplicationCheckpoint) && cps.checkpointTimers.containsKey(this.latestReplicationCheckpoint)) {
                    cps.checkpointTimers.get(this.latestReplicationCheckpoint).start();
                }
            });
        }
    }

    public synchronized Set<SegmentReplicationShardStats> getSegmentReplicationStats() {
        assert (this.indexSettings.isSegRepEnabledOrRemoteNode());
        if (this.primaryMode) {
            return this.checkpoints.entrySet().stream().filter(entry -> !((String)entry.getKey()).equals(this.shardAllocationId) && ((CheckpointState)entry.getValue()).inSync && !this.replicationGroup.getUnavailableInSyncShards().contains(entry.getKey()) && !this.isPrimaryRelocation((String)entry.getKey()) && (this.indexSettings.isRemoteStoreEnabled() || this.indexSettings.isSegRepLocalEnabled() || this.indexSettings.isAssignedOnRemoteNode() && this.isShardOnRemoteEnabledNode.apply(this.routingTable.getByAllocationId((String)entry.getKey()).currentNodeId()) != false)).map(entry -> this.buildShardStats((String)entry.getKey(), (CheckpointState)entry.getValue())).collect(Collectors.toUnmodifiableSet());
        }
        return Collections.emptySet();
    }

    private SegmentReplicationShardStats buildShardStats(String allocationId, CheckpointState cps) {
        Store.RecoveryDiff diff = Store.segmentReplicationDiff(this.latestReplicationCheckpoint.getMetadataMap(), cps.visibleReplicationCheckpoint != null ? cps.visibleReplicationCheckpoint.getMetadataMap() : Collections.emptyMap());
        long bytesBehind = diff.missing.stream().mapToLong(StoreFileMetadata::length).sum();
        return new SegmentReplicationShardStats(allocationId, cps.checkpointTimers.size(), bytesBehind, bytesBehind > 0L ? cps.checkpointTimers.values().stream().mapToLong(ReplicationTimer::time).max().orElse(0L) : 0L, bytesBehind > 0L ? cps.checkpointTimers.values().stream().mapToLong(SegmentReplicationLagTimer::totalElapsedTime).max().orElse(0L) : 0L, cps.lastCompletedReplicationLag);
    }

    public synchronized void activatePrimaryMode(long localCheckpoint) {
        assert (this.invariant());
        assert (!this.primaryMode);
        assert (this.checkpoints.get(this.shardAllocationId) != null && this.checkpoints.get((Object)this.shardAllocationId).inSync && this.checkpoints.get((Object)this.shardAllocationId).localCheckpoint == -2L) : "expected " + this.shardAllocationId + " to have initialized entry in " + String.valueOf(this.checkpoints) + " when activating primary";
        assert (localCheckpoint >= -1L);
        this.primaryMode = true;
        this.updateLocalCheckpoint(this.shardAllocationId, this.checkpoints.get(this.shardAllocationId), localCheckpoint);
        this.updateGlobalCheckpointOnPrimary();
        this.addPeerRecoveryRetentionLeaseForSolePrimary();
        assert (this.invariant());
    }

    private void addPeerRecoveryRetentionLeaseForSolePrimary() {
        assert (this.primaryMode);
        assert (Thread.holdsLock(this));
        ShardRouting primaryShard = this.routingTable.primaryShard();
        String leaseId = ReplicationTracker.getPeerRecoveryRetentionLeaseId(primaryShard);
        if (this.retentionLeases.get(leaseId) == null) {
            if (this.replicationGroup.getReplicationTargets().equals(Collections.singletonList(primaryShard)) || this.indexSettings.isAssignedOnRemoteNode()) {
                assert (primaryShard.allocationId().getId().equals(this.shardAllocationId)) : String.valueOf(this.routingTable.assignedShards()) + " vs " + this.shardAllocationId;
                this.logger.trace("addPeerRecoveryRetentionLeaseForSolePrimary: adding lease [{}]", (Object)leaseId);
                this.innerAddRetentionLease(leaseId, Math.max(0L, this.checkpoints.get((Object)this.shardAllocationId).globalCheckpoint + 1L), PEER_RECOVERY_RETENTION_LEASE_SOURCE);
                this.hasAllPeerRecoveryRetentionLeases = true;
            } else {
                assert (!this.hasAllPeerRecoveryRetentionLeases) : String.valueOf(this.routingTable) + " vs " + String.valueOf(this.retentionLeases);
                this.logger.debug("{} becoming primary of {} with missing lease: {}", (Object)primaryShard, (Object)this.routingTable, (Object)this.retentionLeases);
            }
        } else if (!this.hasAllPeerRecoveryRetentionLeases && this.routingTable.assignedShards().stream().allMatch(shardRouting -> this.retentionLeases.contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(shardRouting)) || !this.checkpoints.get((Object)shardRouting.allocationId().getId()).tracked)) {
            this.hasAllPeerRecoveryRetentionLeases = true;
        }
    }

    public synchronized void updateFromClusterManager(long applyingClusterStateVersion, Set<String> inSyncAllocationIds, IndexShardRoutingTable routingTable) {
        assert (this.invariant());
        if (applyingClusterStateVersion > this.appliedClusterStateVersion) {
            String primaryTargetAllocationId;
            assert (!this.primaryMode || inSyncAllocationIds.stream().allMatch(inSyncId -> this.checkpoints.containsKey(inSyncId) && this.checkpoints.get((Object)inSyncId).inSync)) : "update from cluster-manager in primary mode contains in-sync ids " + String.valueOf(inSyncAllocationIds) + " that have no matching entries in " + String.valueOf(this.checkpoints);
            Set initializingAllocationIds = routingTable.getAllInitializingShards().stream().map(ShardRouting::allocationId).map(AllocationId::getId).collect(Collectors.toSet());
            boolean removedEntries = this.checkpoints.keySet().removeIf(aid -> !inSyncAllocationIds.contains(aid) && !initializingAllocationIds.contains(aid));
            ShardRouting primary = routingTable.primaryShard();
            String primaryAllocationId = primary.allocationId().getId();
            String string = primaryTargetAllocationId = primary.relocating() ? primary.getTargetRelocatingShard().allocationId().getId() : null;
            if (this.primaryMode) {
                for (String initializingId : initializingAllocationIds) {
                    if (this.checkpoints.containsKey(initializingId)) continue;
                    boolean inSync = inSyncAllocationIds.contains(initializingId);
                    assert (!inSync) : "update from cluster-manager in primary mode has " + initializingId + " as in-sync but it does not exist locally";
                    long localCheckpoint = -2L;
                    long globalCheckpoint = -2L;
                    this.checkpoints.put(initializingId, new CheckpointState(-2L, -2L, inSync, inSync, this.isReplicated(initializingId, primaryAllocationId, primaryTargetAllocationId, this.assignedToRemoteStoreNode(routingTable, initializingId))));
                }
                if (removedEntries) {
                    this.pendingInSync.removeIf(aId -> !this.checkpoints.containsKey(aId));
                }
            } else {
                long globalCheckpoint;
                long localCheckpoint;
                for (String initializingId : initializingAllocationIds) {
                    localCheckpoint = -2L;
                    globalCheckpoint = -2L;
                    this.checkpoints.put(initializingId, new CheckpointState(-2L, -2L, false, false, this.isReplicated(initializingId, primaryAllocationId, primaryTargetAllocationId, this.assignedToRemoteStoreNode(routingTable, initializingId))));
                }
                for (String inSyncId2 : inSyncAllocationIds) {
                    localCheckpoint = -2L;
                    globalCheckpoint = -2L;
                    this.checkpoints.put(inSyncId2, new CheckpointState(-2L, -2L, true, true, this.isReplicated(inSyncId2, primaryAllocationId, primaryTargetAllocationId, this.assignedToRemoteStoreNode(routingTable, inSyncId2))));
                }
            }
            this.appliedClusterStateVersion = applyingClusterStateVersion;
            this.routingTable = routingTable;
            this.updateReplicationGroupAndNotify();
            if (this.primaryMode && removedEntries) {
                this.updateGlobalCheckpointOnPrimary();
                this.notifyAllWaiters();
            }
        }
        assert (this.invariant());
    }

    private boolean assignedToRemoteStoreNode(IndexShardRoutingTable routingTable, String allocationId) {
        return this.indexSettings().isRemoteStoreEnabled() || routingTable.getByAllocationId(allocationId) != null && this.isShardOnRemoteEnabledNode.apply(routingTable.getByAllocationId(allocationId).currentNodeId()) != false;
    }

    private boolean isReplicated(String allocationId, String primaryAllocationId, String primaryTargetAllocationId, boolean assignedToRemoteStoreNode) {
        if (assignedToRemoteStoreNode) {
            return allocationId.equals(primaryAllocationId) || allocationId.equals(primaryTargetAllocationId);
        }
        return true;
    }

    @Deprecated
    public synchronized void updateFromMaster(long applyingClusterStateVersion, Set<String> inSyncAllocationIds, IndexShardRoutingTable routingTable) {
        this.updateFromClusterManager(applyingClusterStateVersion, inSyncAllocationIds, routingTable);
    }

    public synchronized void initiateTracking(String allocationId) {
        assert (this.invariant());
        assert (this.primaryMode);
        assert (!this.handoffInProgress);
        CheckpointState cps = this.checkpoints.get(allocationId);
        if (cps == null) {
            throw new IllegalStateException("no local checkpoint tracking information available");
        }
        cps.tracked = true;
        this.updateReplicationGroupAndNotify();
        assert (this.invariant());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void markAllocationIdAsInSync(String allocationId, long localCheckpoint) throws InterruptedException {
        assert (this.invariant());
        assert (this.primaryMode);
        assert (!this.handoffInProgress);
        CheckpointState cps = this.checkpoints.get(allocationId);
        if (cps == null) {
            throw new IllegalStateException("no local checkpoint tracking information available for " + allocationId);
        }
        assert (localCheckpoint >= -1L) : "expected known local checkpoint for " + allocationId + " but was " + localCheckpoint;
        assert (!this.pendingInSync.contains(allocationId)) : "shard copy " + allocationId + " is already marked as pending in-sync";
        assert (cps.tracked) : "shard copy " + allocationId + " cannot be marked as in-sync as it's not tracked";
        this.updateLocalCheckpoint(allocationId, cps, localCheckpoint);
        assert (!cps.inSync || cps.localCheckpoint >= this.getGlobalCheckpoint() || !cps.replicated) : "shard copy " + allocationId + " that's already in-sync should have a local checkpoint " + cps.localCheckpoint + " that's above the global checkpoint " + this.getGlobalCheckpoint() + " or it's not replicated";
        if (cps.replicated && cps.localCheckpoint < this.getGlobalCheckpoint()) {
            this.pendingInSync.add(allocationId);
            try {
                while (this.pendingInSync.contains(allocationId)) {
                    this.waitForLocalCheckpointToAdvance();
                }
            }
            finally {
                this.pendingInSync.remove(allocationId);
            }
        } else {
            cps.inSync = true;
            this.updateReplicationGroupAndNotify();
            this.logger.trace("marked [{}] as in-sync", (Object)allocationId);
            this.updateGlobalCheckpointOnPrimary();
        }
        assert (this.invariant());
    }

    private boolean updateLocalCheckpoint(String allocationId, CheckpointState cps, long localCheckpoint) {
        assert (localCheckpoint >= -1L) : "invalid local checkpoint [" + localCheckpoint + "] for shard copy [" + allocationId + "]";
        if (localCheckpoint > cps.localCheckpoint) {
            this.logger.trace("updated local checkpoint of [{}] from [{}] to [{}]", (Object)allocationId, (Object)cps.localCheckpoint, (Object)localCheckpoint);
            cps.localCheckpoint = localCheckpoint;
            return true;
        }
        this.logger.trace("skipped updating local checkpoint of [{}] from [{}] to [{}], current checkpoint is higher", (Object)allocationId, (Object)cps.localCheckpoint, (Object)localCheckpoint);
        return false;
    }

    public synchronized void updateLocalCheckpoint(String allocationId, long localCheckpoint) {
        assert (this.invariant());
        assert (this.primaryMode);
        assert (!this.handoffInProgress);
        CheckpointState cps = this.checkpoints.get(allocationId);
        if (cps == null) {
            return;
        }
        boolean increasedLocalCheckpoint = this.updateLocalCheckpoint(allocationId, cps, localCheckpoint);
        boolean pending = this.pendingInSync.contains(allocationId);
        if (pending && cps.localCheckpoint >= this.getGlobalCheckpoint()) {
            this.pendingInSync.remove(allocationId);
            pending = false;
            cps.inSync = true;
            this.updateReplicationGroupAndNotify();
            this.logger.trace("marked [{}] as in-sync", (Object)allocationId);
            this.notifyAllWaiters();
        }
        if (cps.replicated && increasedLocalCheckpoint && !pending) {
            this.updateGlobalCheckpointOnPrimary();
        }
        assert (this.invariant());
    }

    private static long computeGlobalCheckpoint(Set<String> pendingInSync, Collection<CheckpointState> localCheckpoints, long fallback) {
        long minLocalCheckpoint = Long.MAX_VALUE;
        if (!pendingInSync.isEmpty()) {
            return fallback;
        }
        for (CheckpointState cps : localCheckpoints) {
            if (!cps.inSync || !cps.replicated) continue;
            if (cps.localCheckpoint == -2L) {
                return fallback;
            }
            minLocalCheckpoint = Math.min(cps.localCheckpoint, minLocalCheckpoint);
        }
        assert (minLocalCheckpoint != Long.MAX_VALUE);
        return minLocalCheckpoint;
    }

    private synchronized void updateGlobalCheckpointOnPrimary() {
        assert (this.primaryMode);
        long computedGlobalCheckpoint = ReplicationTracker.computeGlobalCheckpoint(this.pendingInSync, this.checkpoints.values(), this.getGlobalCheckpoint());
        assert (computedGlobalCheckpoint >= this.globalCheckpoint) : "new global checkpoint [" + computedGlobalCheckpoint + "] is lower than previous one [" + this.globalCheckpoint + "]";
        if (this.globalCheckpoint != computedGlobalCheckpoint) {
            this.globalCheckpoint = computedGlobalCheckpoint;
            this.logger.trace("updated global checkpoint to [{}]", (Object)computedGlobalCheckpoint);
            this.onGlobalCheckpointUpdated.accept(computedGlobalCheckpoint);
        }
    }

    public synchronized PrimaryContext startRelocationHandoff(String targetAllocationId) {
        assert (this.invariant());
        assert (this.primaryMode);
        assert (!this.handoffInProgress);
        assert (this.pendingInSync.isEmpty()) : "relocation handoff started while there are still shard copies pending in-sync: " + String.valueOf(this.pendingInSync);
        if (!this.checkpoints.containsKey(targetAllocationId)) {
            throw new IllegalStateException("relocation target [" + targetAllocationId + "] is no longer part of the replication group");
        }
        this.handoffInProgress = true;
        HashMap<String, CheckpointState> localCheckpointsCopy = new HashMap<String, CheckpointState>();
        for (Map.Entry<String, CheckpointState> entry : this.checkpoints.entrySet()) {
            localCheckpointsCopy.put(entry.getKey(), entry.getValue().copy());
        }
        assert (this.invariant());
        return new PrimaryContext(this.appliedClusterStateVersion, localCheckpointsCopy, this.routingTable);
    }

    public synchronized void abortRelocationHandoff() {
        assert (this.invariant());
        assert (this.primaryMode);
        assert (this.handoffInProgress);
        this.handoffInProgress = false;
        assert (this.invariant());
    }

    public synchronized void completeRelocationHandoff() {
        assert (this.invariant());
        assert (this.primaryMode);
        assert (this.handoffInProgress);
        assert (!this.relocated);
        this.primaryMode = false;
        this.handoffInProgress = false;
        this.relocated = true;
        this.checkpoints.forEach((key, cps) -> {
            cps.localCheckpoint = -2L;
            cps.globalCheckpoint = -2L;
        });
        assert (this.invariant());
    }

    public synchronized void activateWithPrimaryContext(PrimaryContext primaryContext) {
        assert (this.invariant());
        assert (!this.primaryMode);
        if (!primaryContext.checkpoints.containsKey(this.shardAllocationId)) {
            assert (this.indexSettings.getIndexVersionCreated().before(LegacyESVersion.V_7_3_0));
            throw new IllegalStateException("primary context [" + String.valueOf(primaryContext) + "] does not contain " + this.shardAllocationId);
        }
        Runnable runAfter = this.getClusterManagerUpdateOperationFromCurrentState();
        this.primaryMode = true;
        this.appliedClusterStateVersion = primaryContext.clusterStateVersion();
        this.checkpoints.clear();
        for (Map.Entry<String, CheckpointState> entry : primaryContext.checkpoints.entrySet()) {
            this.checkpoints.put(entry.getKey(), entry.getValue().copy());
        }
        this.routingTable = primaryContext.getRoutingTable();
        this.updateReplicationGroupAndNotify();
        this.updateGlobalCheckpointOnPrimary();
        runAfter.run();
        this.addPeerRecoveryRetentionLeaseForSolePrimary();
        assert (this.invariant());
    }

    private synchronized void setHasAllPeerRecoveryRetentionLeases() {
        this.hasAllPeerRecoveryRetentionLeases = true;
        assert (this.invariant());
    }

    private synchronized void setCreatedMissingRetentionLeases() {
        this.createdMissingRetentionLeases = true;
        assert (this.invariant());
    }

    public synchronized boolean hasAllPeerRecoveryRetentionLeases() {
        return this.hasAllPeerRecoveryRetentionLeases;
    }

    public synchronized void createMissingPeerRecoveryRetentionLeases(ActionListener<Void> listener) {
        boolean createMissingRetentionLeasesDuringMigration;
        boolean bl = createMissingRetentionLeasesDuringMigration = this.indexSettings.isAssignedOnRemoteNode() && this.replicationGroup.getReplicationTargets().stream().anyMatch(shardRouting -> this.isShardOnRemoteEnabledNode.apply(shardRouting.currentNodeId()) == false);
        if (!this.hasAllPeerRecoveryRetentionLeases || createMissingRetentionLeasesDuringMigration) {
            List<ShardRouting> shardRoutings = this.routingTable.assignedShards();
            GroupedActionListener<ReplicationResponse> groupedActionListener = new GroupedActionListener<ReplicationResponse>(ActionListener.wrap(vs -> {
                this.setHasAllPeerRecoveryRetentionLeases();
                this.setCreatedMissingRetentionLeases();
                listener.onResponse(null);
            }, listener::onFailure), shardRoutings.size());
            for (ShardRouting shardRouting2 : shardRoutings) {
                if (this.retentionLeases.contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(shardRouting2))) {
                    groupedActionListener.onResponse(null);
                    continue;
                }
                CheckpointState checkpointState = this.checkpoints.get(shardRouting2.allocationId().getId());
                if (!checkpointState.tracked) {
                    groupedActionListener.onResponse(null);
                    continue;
                }
                this.logger.trace("createMissingPeerRecoveryRetentionLeases: adding missing lease for {}", (Object)shardRouting2);
                try {
                    this.addPeerRecoveryRetentionLease(shardRouting2.currentNodeId(), Math.max(-1L, checkpointState.globalCheckpoint), groupedActionListener);
                }
                catch (Exception e) {
                    groupedActionListener.onFailure(e);
                }
            }
        } else {
            this.logger.trace("createMissingPeerRecoveryRetentionLeases: nothing to do");
            listener.onResponse(null);
        }
    }

    private Runnable getClusterManagerUpdateOperationFromCurrentState() {
        assert (!this.primaryMode);
        long lastAppliedClusterStateVersion = this.appliedClusterStateVersion;
        HashSet inSyncAllocationIds = new HashSet();
        this.checkpoints.entrySet().forEach(entry -> {
            if (((CheckpointState)entry.getValue()).inSync) {
                inSyncAllocationIds.add((String)entry.getKey());
            }
        });
        IndexShardRoutingTable lastAppliedRoutingTable = this.routingTable;
        return () -> this.updateFromClusterManager(lastAppliedClusterStateVersion, inSyncAllocationIds, lastAppliedRoutingTable);
    }

    public synchronized boolean pendingInSync() {
        assert (this.primaryMode);
        return !this.pendingInSync.isEmpty();
    }

    public synchronized CheckpointState getTrackedLocalCheckpointForShard(String allocationId) {
        assert (this.primaryMode);
        return this.checkpoints.get(allocationId);
    }

    @SuppressForbidden(reason="Object#notifyAll waiters for local checkpoint advancement")
    private synchronized void notifyAllWaiters() {
        this.notifyAll();
    }

    @SuppressForbidden(reason="Object#wait for local checkpoint advancement")
    private synchronized void waitForLocalCheckpointToAdvance() throws InterruptedException {
        this.wait();
    }

    @PublicApi(since="1.0.0")
    public static class CheckpointState
    implements Writeable {
        long localCheckpoint;
        long globalCheckpoint;
        boolean inSync;
        boolean tracked;
        boolean replicated;
        ReplicationCheckpoint visibleReplicationCheckpoint;
        Map<ReplicationCheckpoint, SegmentReplicationLagTimer> checkpointTimers;
        long lastCompletedReplicationLag;

        public CheckpointState(long localCheckpoint, long globalCheckpoint, boolean inSync, boolean tracked, boolean replicated) {
            this.localCheckpoint = localCheckpoint;
            this.globalCheckpoint = globalCheckpoint;
            this.inSync = inSync;
            this.tracked = tracked;
            this.replicated = replicated;
            this.checkpointTimers = ConcurrentCollections.newConcurrentMap();
        }

        public CheckpointState(StreamInput in) throws IOException {
            this.localCheckpoint = in.readZLong();
            this.globalCheckpoint = in.readZLong();
            this.inSync = in.readBoolean();
            this.tracked = in.readBoolean();
            this.replicated = in.getVersion().onOrAfter(Version.V_2_5_0) ? in.readBoolean() : true;
        }

        @Override
        public void writeTo(StreamOutput out) throws IOException {
            out.writeZLong(this.localCheckpoint);
            out.writeZLong(this.globalCheckpoint);
            out.writeBoolean(this.inSync);
            out.writeBoolean(this.tracked);
            if (out.getVersion().onOrAfter(Version.V_2_5_0)) {
                out.writeBoolean(this.replicated);
            }
        }

        public CheckpointState copy() {
            return new CheckpointState(this.localCheckpoint, this.globalCheckpoint, this.inSync, this.tracked, this.replicated);
        }

        public long getLocalCheckpoint() {
            return this.localCheckpoint;
        }

        public long getGlobalCheckpoint() {
            return this.globalCheckpoint;
        }

        public String toString() {
            return "LocalCheckpointState{localCheckpoint=" + this.localCheckpoint + ", globalCheckpoint=" + this.globalCheckpoint + ", inSync=" + this.inSync + ", tracked=" + this.tracked + ", replicated=" + this.replicated + "}";
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            CheckpointState that = (CheckpointState)o;
            if (this.localCheckpoint != that.localCheckpoint) {
                return false;
            }
            if (this.globalCheckpoint != that.globalCheckpoint) {
                return false;
            }
            if (this.inSync != that.inSync) {
                return false;
            }
            if (this.tracked != that.tracked) {
                return false;
            }
            return this.replicated == that.replicated;
        }

        public int hashCode() {
            int result = Long.hashCode(this.localCheckpoint);
            result = 31 * result + Long.hashCode(this.globalCheckpoint);
            result = 31 * result + Boolean.hashCode(this.inSync);
            result = 31 * result + Boolean.hashCode(this.tracked);
            result = 31 * result + Boolean.hashCode(this.replicated);
            return result;
        }
    }

    @PublicApi(since="1.0.0")
    public static class PrimaryContext
    implements Writeable {
        private final long clusterStateVersion;
        private final Map<String, CheckpointState> checkpoints;
        private final IndexShardRoutingTable routingTable;

        public PrimaryContext(long clusterStateVersion, Map<String, CheckpointState> checkpoints, IndexShardRoutingTable routingTable) {
            this.clusterStateVersion = clusterStateVersion;
            this.checkpoints = checkpoints;
            this.routingTable = routingTable;
        }

        public PrimaryContext(StreamInput in) throws IOException {
            this.clusterStateVersion = in.readVLong();
            this.checkpoints = in.readMap(StreamInput::readString, CheckpointState::new);
            this.routingTable = IndexShardRoutingTable.Builder.readFrom(in);
        }

        public long clusterStateVersion() {
            return this.clusterStateVersion;
        }

        public Map<String, CheckpointState> getCheckpointStates() {
            return this.checkpoints;
        }

        public IndexShardRoutingTable getRoutingTable() {
            return this.routingTable;
        }

        @Override
        public void writeTo(StreamOutput out) throws IOException {
            out.writeVLong(this.clusterStateVersion);
            out.writeMap(this.checkpoints, (streamOutput, s) -> out.writeString((String)s), (streamOutput, cps) -> cps.writeTo(out));
            IndexShardRoutingTable.Builder.writeTo(this.routingTable, out);
        }

        public String toString() {
            return "PrimaryContext{clusterStateVersion=" + this.clusterStateVersion + ", checkpoints=" + String.valueOf(this.checkpoints) + ", routingTable=" + String.valueOf(this.routingTable) + "}";
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            PrimaryContext that = (PrimaryContext)o;
            if (this.clusterStateVersion != that.clusterStateVersion) {
                return false;
            }
            if (this.routingTable.equals(that.routingTable)) {
                return false;
            }
            return this.routingTable.equals(that.routingTable);
        }

        public int hashCode() {
            int result = Long.hashCode(this.clusterStateVersion);
            result = 31 * result + this.checkpoints.hashCode();
            result = 31 * result + this.routingTable.hashCode();
            return result;
        }
    }
}

