/*
 * Decompiled with CFR 0.152.
 */
package org.graylog.shaded.elasticsearch7.org.elasticsearch.indices.recovery;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Deque;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.IntSupplier;
import java.util.stream.StreamSupport;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.graylog.shaded.elasticsearch7.org.apache.lucene.index.CorruptIndexException;
import org.graylog.shaded.elasticsearch7.org.apache.lucene.index.IndexCommit;
import org.graylog.shaded.elasticsearch7.org.apache.lucene.index.IndexFormatTooNewException;
import org.graylog.shaded.elasticsearch7.org.apache.lucene.index.IndexFormatTooOldException;
import org.graylog.shaded.elasticsearch7.org.apache.lucene.store.IOContext;
import org.graylog.shaded.elasticsearch7.org.apache.lucene.store.IndexInput;
import org.graylog.shaded.elasticsearch7.org.apache.lucene.util.ArrayUtil;
import org.graylog.shaded.elasticsearch7.org.apache.lucene.util.SetOnce;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.ElasticsearchException;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.ExceptionsHelper;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.Version;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.action.ActionListener;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.action.ActionRunnable;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.action.StepListener;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.action.support.PlainActionFuture;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.action.support.ThreadedActionListener;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.action.support.replication.ReplicationResponse;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.cluster.routing.ShardRouting;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.common.CheckedSupplier;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.common.StopWatch;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.common.bytes.BytesArray;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.common.bytes.BytesReference;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.common.lease.Releasable;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.common.lease.Releasables;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.common.logging.Loggers;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.common.unit.ByteSizeValue;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.common.unit.TimeValue;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.common.util.CancellableThreads;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.common.util.concurrent.FutureUtils;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.common.util.concurrent.ThreadContext;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.core.internal.io.IOUtils;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.index.engine.Engine;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.index.engine.RecoveryEngineException;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.index.seqno.ReplicationTracker;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.index.seqno.RetentionLease;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.index.seqno.RetentionLeaseNotFoundException;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.index.seqno.RetentionLeases;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.index.seqno.SequenceNumbers;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.index.shard.IndexShard;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.index.shard.IndexShardClosedException;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.index.shard.IndexShardRelocatedException;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.index.shard.IndexShardState;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.index.store.Store;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.index.store.StoreFileMetadata;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.index.translog.Translog;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.indices.recovery.DelayRecoveryException;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.indices.recovery.MultiFileTransfer;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.indices.recovery.RecoverFilesRecoveryException;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.indices.recovery.RecoveryResponse;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.indices.recovery.RecoveryTargetHandler;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.indices.recovery.StartRecoveryRequest;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.threadpool.ThreadPool;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.transport.RemoteTransportException;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.transport.Transports;

public class RecoverySourceHandler {
    protected final Logger logger;
    private final IndexShard shard;
    private final int shardId;
    private final StartRecoveryRequest request;
    private final int chunkSizeInBytes;
    private final RecoveryTargetHandler recoveryTarget;
    private final int maxConcurrentFileChunks;
    private final ThreadPool threadPool;
    private final CancellableThreads cancellableThreads = new CancellableThreads();
    private final List<Closeable> resources = new CopyOnWriteArrayList<Closeable>();

    public RecoverySourceHandler(IndexShard shard, RecoveryTargetHandler recoveryTarget, ThreadPool threadPool, StartRecoveryRequest request, int fileChunkSizeInBytes, int maxConcurrentFileChunks) {
        this.shard = shard;
        this.recoveryTarget = recoveryTarget;
        this.threadPool = threadPool;
        this.request = request;
        this.shardId = this.request.shardId().id();
        this.logger = Loggers.getLogger(this.getClass(), request.shardId(), "recover to " + request.targetNode().getName());
        this.chunkSizeInBytes = fileChunkSizeInBytes;
        this.maxConcurrentFileChunks = request.targetNode().getVersion().onOrAfter(Version.V_6_7_0) ? maxConcurrentFileChunks : 1;
    }

    public StartRecoveryRequest getRequest() {
        return this.request;
    }

    public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
        Closeable releaseResources = () -> IOUtils.close(this.resources);
        ActionListener<RecoveryResponse> wrappedListener = ActionListener.notifyOnce(listener);
        try {
            long startingSeqNo;
            boolean isSequenceNumberBasedRecovery;
            this.cancellableThreads.setOnCancel((reason, beforeCancelEx) -> {
                ElasticsearchException e = this.shard.state() == IndexShardState.CLOSED ? new IndexShardClosedException(this.shard.shardId(), "shard is closed and recovery was canceled reason [" + reason + "]") : new CancellableThreads.ExecutionCancelledException("recovery was canceled reason [" + reason + "]");
                if (beforeCancelEx != null) {
                    e.addSuppressed(beforeCancelEx);
                }
                IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e));
                throw e;
            });
            Consumer<Exception> onFailure = e -> {
                assert (Transports.assertNotTransportThread(this + "[onFailure]"));
                IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure((Exception)e));
            };
            boolean softDeletesEnabled = this.shard.indexSettings().isSoftDeleteEnabled();
            SetOnce retentionLeaseRef = new SetOnce();
            RecoverySourceHandler.runUnderPrimaryPermit(() -> {
                IndexShardRoutingTable routingTable = this.shard.getReplicationGroup().getRoutingTable();
                ShardRouting targetShardRouting = routingTable.getByAllocationId(this.request.targetAllocationId());
                if (targetShardRouting == null) {
                    this.logger.debug("delaying recovery of {} as it is not listed as assigned to target node {}", (Object)this.request.shardId(), (Object)this.request.targetNode());
                    throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node");
                }
                assert (targetShardRouting.initializing()) : "expected recovery target to be initializing but was " + targetShardRouting;
                retentionLeaseRef.set(this.shard.getRetentionLeases().get(ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting)));
            }, this.shardId + " validating recovery target [" + this.request.targetAllocationId() + "] registered ", this.shard, this.cancellableThreads, this.logger);
            Engine.HistorySource historySource = softDeletesEnabled && (this.shard.useRetentionLeasesInPeerRecovery() || retentionLeaseRef.get() != null) ? Engine.HistorySource.INDEX : Engine.HistorySource.TRANSLOG;
            Closeable retentionLock = this.shard.acquireHistoryRetentionLock(historySource);
            this.resources.add(retentionLock);
            boolean bl = isSequenceNumberBasedRecovery = this.request.startingSeqNo() != -2L && this.isTargetSameHistory() && this.shard.hasCompleteHistoryOperations("peer-recovery", historySource, this.request.startingSeqNo()) && (historySource == Engine.HistorySource.TRANSLOG || retentionLeaseRef.get() != null && ((RetentionLease)retentionLeaseRef.get()).retainingSequenceNumber() <= this.request.startingSeqNo());
            if (isSequenceNumberBasedRecovery && softDeletesEnabled && retentionLeaseRef.get() != null) {
                retentionLock.close();
                this.logger.trace("history is retained by {}", retentionLeaseRef.get());
            } else {
                this.logger.trace("history is retained by retention lock");
            }
            StepListener<SendFileResult> sendFileStep = new StepListener<SendFileResult>();
            StepListener<TimeValue> prepareEngineStep = new StepListener<TimeValue>();
            StepListener<SendSnapshotResult> sendSnapshotStep = new StepListener<SendSnapshotResult>();
            StepListener<Void> finalizeStep = new StepListener<Void>();
            if (isSequenceNumberBasedRecovery) {
                this.logger.trace("performing sequence numbers based recovery. starting at [{}]", (Object)this.request.startingSeqNo());
                startingSeqNo = this.request.startingSeqNo();
                if (retentionLeaseRef.get() == null) {
                    this.createRetentionLease(startingSeqNo, ActionListener.map(sendFileStep, ignored -> SendFileResult.EMPTY));
                } else {
                    sendFileStep.onResponse(SendFileResult.EMPTY);
                }
            } else {
                Engine.IndexCommitRef safeCommitRef;
                try {
                    safeCommitRef = this.shard.acquireSafeIndexCommit();
                    this.resources.add(safeCommitRef);
                }
                catch (Exception e2) {
                    throw new RecoveryEngineException(this.shard.shardId(), 1, "snapshot failed", e2);
                }
                startingSeqNo = softDeletesEnabled ? Long.parseLong(safeCommitRef.getIndexCommit().getUserData().get("local_checkpoint")) + 1L : 0L;
                this.logger.trace("performing file-based recovery followed by history replay starting at [{}]", (Object)startingSeqNo);
                try {
                    int estimateNumOps = this.shard.estimateNumberOfHistoryOperations("peer-recovery", historySource, startingSeqNo);
                    Releasable releaseStore = this.acquireStore(this.shard.store());
                    this.resources.add(releaseStore);
                    sendFileStep.whenComplete(r -> IOUtils.close(safeCommitRef, releaseStore), e -> {
                        try {
                            IOUtils.close(safeCommitRef, releaseStore);
                        }
                        catch (IOException ex) {
                            this.logger.warn("releasing snapshot caused exception", (Throwable)ex);
                        }
                    });
                    StepListener<ReplicationResponse> deleteRetentionLeaseStep = new StepListener<ReplicationResponse>();
                    RecoverySourceHandler.runUnderPrimaryPermit(() -> {
                        try {
                            this.shard.removePeerRecoveryRetentionLease(this.request.targetNode().getId(), new ThreadedActionListener<ReplicationResponse>(this.logger, this.shard.getThreadPool(), "generic", deleteRetentionLeaseStep, false));
                        }
                        catch (RetentionLeaseNotFoundException e) {
                            this.logger.debug("no peer-recovery retention lease for " + this.request.targetAllocationId());
                            deleteRetentionLeaseStep.onResponse(null);
                        }
                    }, this.shardId + " removing retention lease for [" + this.request.targetAllocationId() + "]", this.shard, this.cancellableThreads, this.logger);
                    deleteRetentionLeaseStep.whenComplete(ignored -> {
                        assert (Transports.assertNotTransportThread(this + "[phase1]"));
                        this.phase1(safeCommitRef.getIndexCommit(), startingSeqNo, () -> estimateNumOps, sendFileStep);
                    }, onFailure);
                }
                catch (Exception e3) {
                    throw new RecoveryEngineException(this.shard.shardId(), 1, "sendFileStep failed", e3);
                }
            }
            assert (startingSeqNo >= 0L) : "startingSeqNo must be non negative. got: " + startingSeqNo;
            sendFileStep.whenComplete(r -> {
                assert (Transports.assertNotTransportThread(this + "[prepareTargetForTranslog]"));
                this.prepareTargetForTranslog(this.shard.estimateNumberOfHistoryOperations("peer-recovery", historySource, startingSeqNo), prepareEngineStep);
            }, onFailure);
            prepareEngineStep.whenComplete(prepareEngineTime -> {
                assert (Transports.assertNotTransportThread(this + "[phase2]"));
                RecoverySourceHandler.runUnderPrimaryPermit(() -> this.shard.initiateTracking(this.request.targetAllocationId()), this.shardId + " initiating tracking of " + this.request.targetAllocationId(), this.shard, this.cancellableThreads, this.logger);
                long endingSeqNo = this.shard.seqNoStats().getMaxSeqNo();
                this.logger.trace("snapshot translog for recovery; current size is [{}]", (Object)this.shard.estimateNumberOfHistoryOperations("peer-recovery", historySource, startingSeqNo));
                Translog.Snapshot phase2Snapshot = this.shard.getHistoryOperations("peer-recovery", historySource, startingSeqNo);
                this.resources.add(phase2Snapshot);
                retentionLock.close();
                long maxSeenAutoIdTimestamp = this.shard.getMaxSeenAutoIdTimestamp();
                long maxSeqNoOfUpdatesOrDeletes = this.shard.getMaxSeqNoOfUpdatesOrDeletes();
                RetentionLeases retentionLeases = this.shard.getRetentionLeases();
                long mappingVersionOnPrimary = this.shard.indexSettings().getIndexMetadata().getMappingVersion();
                this.phase2(startingSeqNo, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, retentionLeases, mappingVersionOnPrimary, sendSnapshotStep);
                sendSnapshotStep.whenComplete(r -> IOUtils.close(phase2Snapshot), e -> {
                    IOUtils.closeWhileHandlingException(phase2Snapshot);
                    onFailure.accept(new RecoveryEngineException(this.shard.shardId(), 2, "phase2 failed", (Throwable)e));
                });
            }, onFailure);
            long trimAboveSeqNo = startingSeqNo - 1L;
            sendSnapshotStep.whenComplete(r -> this.finalizeRecovery(r.targetLocalCheckpoint, trimAboveSeqNo, finalizeStep), onFailure);
            finalizeStep.whenComplete(r -> {
                long phase1ThrottlingWaitTime = 0L;
                SendSnapshotResult sendSnapshotResult = (SendSnapshotResult)sendSnapshotStep.result();
                SendFileResult sendFileResult = (SendFileResult)sendFileStep.result();
                RecoveryResponse response = new RecoveryResponse(sendFileResult.phase1FileNames, sendFileResult.phase1FileSizes, sendFileResult.phase1ExistingFileNames, sendFileResult.phase1ExistingFileSizes, sendFileResult.totalSize, sendFileResult.existingTotalSize, sendFileResult.took.millis(), 0L, ((TimeValue)prepareEngineStep.result()).millis(), sendSnapshotResult.totalOperations, sendSnapshotResult.tookTime.millis());
                try {
                    wrappedListener.onResponse(response);
                }
                finally {
                    IOUtils.close(this.resources);
                }
            }, onFailure);
        }
        catch (Exception e4) {
            IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e4));
        }
    }

    private boolean isTargetSameHistory() {
        String targetHistoryUUID = this.request.metadataSnapshot().getHistoryUUID();
        assert (targetHistoryUUID != null || this.shard.indexSettings().getIndexVersionCreated().before(Version.V_6_0_0_rc1)) : "incoming target history N/A but index was created after or on 6.0.0-rc1";
        return targetHistoryUUID != null && targetHistoryUUID.equals(this.shard.getHistoryUUID());
    }

    static void runUnderPrimaryPermit(CancellableThreads.Interruptible runnable, String reason, IndexShard primary, CancellableThreads cancellableThreads, Logger logger) {
        cancellableThreads.execute(() -> {
            final CompletableFuture permit = new CompletableFuture();
            ActionListener<Releasable> onAcquired = new ActionListener<Releasable>(){

                @Override
                public void onResponse(Releasable releasable) {
                    if (!permit.complete(releasable)) {
                        releasable.close();
                    }
                }

                @Override
                public void onFailure(Exception e) {
                    permit.completeExceptionally(e);
                }
            };
            primary.acquirePrimaryOperationPermit(onAcquired, "same", reason);
            try (Releasable ignored = (Releasable)FutureUtils.get(permit);){
                if (primary.isRelocatedPrimary()) {
                    throw new IndexShardRelocatedException(primary.shardId());
                }
                runnable.run();
            }
            finally {
                permit.whenComplete((r, e) -> {
                    if (r != null) {
                        r.close();
                    }
                    if (e != null) {
                        logger.trace("suppressing exception on completion (it was already bubbled up or the operation was aborted)", e);
                    }
                });
            }
        });
    }

    private Releasable acquireStore(Store store) {
        store.incRef();
        return Releasables.releaseOnce(() -> {
            PlainActionFuture future = new PlainActionFuture();
            assert (!this.threadPool.generic().isShutdown());
            this.threadPool.generic().execute(ActionRunnable.run(future, store::decRef));
            FutureUtils.get(future);
        });
    }

    void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, ActionListener<SendFileResult> listener) {
        this.cancellableThreads.checkForCancel();
        Store store = this.shard.store();
        try {
            Store.MetadataSnapshot recoverySourceMetadata;
            StopWatch stopWatch = new StopWatch().start();
            try {
                recoverySourceMetadata = store.getMetadata(snapshot);
            }
            catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) {
                this.shard.failShard("recovery", ex);
                throw ex;
            }
            for (String name : snapshot.getFileNames()) {
                StoreFileMetadata md = recoverySourceMetadata.get(name);
                if (md != null) continue;
                this.logger.info("Snapshot differs from actual index for file: {} meta: {}", (Object)name, recoverySourceMetadata.asMap());
                throw new CorruptIndexException("Snapshot differs from actual index - maybe index was removed metadata has " + recoverySourceMetadata.asMap().size() + " files", name);
            }
            if (!this.canSkipPhase1(recoverySourceMetadata, this.request.metadataSnapshot())) {
                ArrayList<String> phase1FileNames = new ArrayList<String>();
                ArrayList<Long> phase1FileSizes = new ArrayList<Long>();
                ArrayList<String> phase1ExistingFileNames = new ArrayList<String>();
                ArrayList<Long> phase1ExistingFileSizes = new ArrayList<Long>();
                long totalSizeInBytes = 0L;
                long existingTotalSizeInBytes = 0L;
                Store.RecoveryDiff diff = recoverySourceMetadata.recoveryDiff(this.request.metadataSnapshot());
                for (StoreFileMetadata storeFileMetadata : diff.identical) {
                    phase1ExistingFileNames.add(storeFileMetadata.name());
                    phase1ExistingFileSizes.add(storeFileMetadata.length());
                    existingTotalSizeInBytes += storeFileMetadata.length();
                    if (this.logger.isTraceEnabled()) {
                        this.logger.trace("recovery [phase1]: not recovering [{}], exist in local store and has checksum [{}], size [{}]", (Object)storeFileMetadata.name(), (Object)storeFileMetadata.checksum(), (Object)storeFileMetadata.length());
                    }
                    totalSizeInBytes += storeFileMetadata.length();
                }
                ArrayList<StoreFileMetadata> phase1Files = new ArrayList<StoreFileMetadata>(diff.different.size() + diff.missing.size());
                phase1Files.addAll(diff.different);
                phase1Files.addAll(diff.missing);
                for (StoreFileMetadata md : phase1Files) {
                    if (this.request.metadataSnapshot().asMap().containsKey(md.name())) {
                        this.logger.trace("recovery [phase1]: recovering [{}], exists in local store, but is different: remote [{}], local [{}]", (Object)md.name(), (Object)this.request.metadataSnapshot().asMap().get(md.name()), (Object)md);
                    } else {
                        this.logger.trace("recovery [phase1]: recovering [{}], does not exist in remote", (Object)md.name());
                    }
                    phase1FileNames.add(md.name());
                    phase1FileSizes.add(md.length());
                    totalSizeInBytes += md.length();
                }
                this.logger.trace("recovery [phase1]: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]", (Object)phase1FileNames.size(), (Object)new ByteSizeValue(totalSizeInBytes), (Object)phase1ExistingFileNames.size(), (Object)new ByteSizeValue(existingTotalSizeInBytes));
                StepListener<Void> stepListener = new StepListener<Void>();
                StepListener<Void> sendFilesStep = new StepListener<Void>();
                StepListener<RetentionLease> createRetentionLeaseStep = new StepListener<RetentionLease>();
                StepListener<Void> cleanFilesStep = new StepListener<Void>();
                this.cancellableThreads.checkForCancel();
                this.recoveryTarget.receiveFileInfo(phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, translogOps.getAsInt(), stepListener);
                stepListener.whenComplete(r -> this.sendFiles(store, phase1Files.toArray(new StoreFileMetadata[0]), translogOps, sendFilesStep), listener::onFailure);
                sendFilesStep.whenComplete(r -> this.createRetentionLease(startingSeqNo, createRetentionLeaseStep), listener::onFailure);
                createRetentionLeaseStep.whenComplete(retentionLease -> {
                    long lastKnownGlobalCheckpoint = this.shard.getLastKnownGlobalCheckpoint();
                    assert (retentionLease == null || retentionLease.retainingSequenceNumber() - 1L <= lastKnownGlobalCheckpoint) : retentionLease + " vs " + lastKnownGlobalCheckpoint;
                    this.cleanFiles(store, recoverySourceMetadata, translogOps, lastKnownGlobalCheckpoint, cleanFilesStep);
                }, listener::onFailure);
                long totalSize = totalSizeInBytes;
                long existingTotalSize = existingTotalSizeInBytes;
                cleanFilesStep.whenComplete(r -> {
                    TimeValue took = stopWatch.totalTime();
                    this.logger.trace("recovery [phase1]: took [{}]", (Object)took);
                    listener.onResponse(new SendFileResult(phase1FileNames, phase1FileSizes, totalSize, phase1ExistingFileNames, phase1ExistingFileSizes, existingTotalSize, took));
                }, listener::onFailure);
            } else {
                this.logger.trace("skipping [phase1] since source and target have identical sync id [{}]", (Object)recoverySourceMetadata.getSyncId());
                StepListener<RetentionLease> createRetentionLeaseStep = new StepListener<RetentionLease>();
                this.createRetentionLease(startingSeqNo, createRetentionLeaseStep);
                createRetentionLeaseStep.whenComplete(retentionLease -> {
                    TimeValue took = stopWatch.totalTime();
                    this.logger.trace("recovery [phase1]: took [{}]", (Object)took);
                    listener.onResponse(new SendFileResult(Collections.emptyList(), Collections.emptyList(), 0L, Collections.emptyList(), Collections.emptyList(), 0L, took));
                }, listener::onFailure);
            }
        }
        catch (Exception e) {
            throw new RecoverFilesRecoveryException(this.request.shardId(), 0, new ByteSizeValue(0L), e);
        }
    }

    void createRetentionLease(long startingSeqNo, ActionListener<RetentionLease> listener) {
        RecoverySourceHandler.runUnderPrimaryPermit(() -> {
            this.logger.trace("cloning primary's retention lease");
            try {
                StepListener<ReplicationResponse> cloneRetentionLeaseStep = new StepListener<ReplicationResponse>();
                RetentionLease clonedLease = this.shard.cloneLocalPeerRecoveryRetentionLease(this.request.targetNode().getId(), new ThreadedActionListener<ReplicationResponse>(this.logger, this.shard.getThreadPool(), "generic", cloneRetentionLeaseStep, false));
                this.logger.trace("cloned primary's retention lease as [{}]", (Object)clonedLease);
                cloneRetentionLeaseStep.whenComplete(rr -> listener.onResponse(clonedLease), listener::onFailure);
            }
            catch (RetentionLeaseNotFoundException e) {
                assert (this.shard.indexSettings().getIndexVersionCreated().before(Version.V_7_4_0) || !this.shard.indexSettings().isSoftDeleteEnabled());
                StepListener<ReplicationResponse> addRetentionLeaseStep = new StepListener<ReplicationResponse>();
                long estimatedGlobalCheckpoint = startingSeqNo - 1L;
                RetentionLease newLease = this.shard.addPeerRecoveryRetentionLease(this.request.targetNode().getId(), estimatedGlobalCheckpoint, new ThreadedActionListener<ReplicationResponse>(this.logger, this.shard.getThreadPool(), "generic", addRetentionLeaseStep, false));
                addRetentionLeaseStep.whenComplete(rr -> listener.onResponse(newLease), listener::onFailure);
                this.logger.trace("created retention lease with estimated checkpoint of [{}]", (Object)estimatedGlobalCheckpoint);
            }
        }, this.shardId + " establishing retention lease for [" + this.request.targetAllocationId() + "]", this.shard, this.cancellableThreads, this.logger);
    }

    boolean canSkipPhase1(Store.MetadataSnapshot source, Store.MetadataSnapshot target) {
        if (source.getSyncId() == null || !source.getSyncId().equals(target.getSyncId())) {
            return false;
        }
        if (source.getNumDocs() != target.getNumDocs()) {
            throw new IllegalStateException("try to recover " + this.request.shardId() + " from primary shard with sync id but number of docs differ: " + source.getNumDocs() + " (" + this.request.sourceNode().getName() + ", primary) vs " + target.getNumDocs() + "(" + this.request.targetNode().getName() + ")");
        }
        SequenceNumbers.CommitInfo sourceSeqNos = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(source.getCommitUserData().entrySet());
        SequenceNumbers.CommitInfo targetSeqNos = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(target.getCommitUserData().entrySet());
        if (sourceSeqNos.localCheckpoint != targetSeqNos.localCheckpoint || targetSeqNos.maxSeqNo != sourceSeqNos.maxSeqNo) {
            String message = "try to recover " + this.request.shardId() + " with sync id but seq_no stats are mismatched: [" + source.getCommitUserData() + "] vs [" + target.getCommitUserData() + "]";
            assert (false) : message;
            throw new IllegalStateException(message);
        }
        return true;
    }

    void prepareTargetForTranslog(int totalTranslogOps, ActionListener<TimeValue> listener) {
        StopWatch stopWatch = new StopWatch().start();
        ActionListener<Void> wrappedListener = ActionListener.wrap(nullVal -> {
            stopWatch.stop();
            TimeValue tookTime = stopWatch.totalTime();
            this.logger.trace("recovery [phase1]: remote engine start took [{}]", (Object)tookTime);
            listener.onResponse(tookTime);
        }, e -> listener.onFailure(new RecoveryEngineException(this.shard.shardId(), 1, "prepare target for translog failed", (Throwable)e)));
        this.logger.trace("recovery [phase1]: prepare remote engine for translog");
        this.cancellableThreads.checkForCancel();
        this.recoveryTarget.prepareForTranslogOperations(totalTranslogOps, wrappedListener);
    }

    void phase2(long startingSeqNo, long endingSeqNo, Translog.Snapshot snapshot, long maxSeenAutoIdTimestamp, long maxSeqNoOfUpdatesOrDeletes, RetentionLeases retentionLeases, long mappingVersion, ActionListener<SendSnapshotResult> listener) throws IOException {
        if (this.shard.state() == IndexShardState.CLOSED) {
            throw new IndexShardClosedException(this.request.shardId());
        }
        this.logger.trace("recovery [phase2]: sending transaction log operations (from [" + startingSeqNo + "] to [" + endingSeqNo + "]");
        AtomicInteger skippedOps = new AtomicInteger();
        AtomicInteger totalSentOps = new AtomicInteger();
        AtomicInteger lastBatchCount = new AtomicInteger();
        CheckedSupplier<List<Translog.Operation>, IOException> readNextBatch = () -> {
            Translog.Snapshot snapshot2 = snapshot;
            synchronized (snapshot2) {
                Translog.Operation operation;
                ArrayList<Translog.Operation> ops = lastBatchCount.get() > 0 ? new ArrayList<Translog.Operation>(lastBatchCount.get()) : new ArrayList();
                long batchSizeInBytes = 0L;
                while ((operation = snapshot.next()) != null) {
                    if (this.shard.state() == IndexShardState.CLOSED) {
                        throw new IndexShardClosedException(this.request.shardId());
                    }
                    this.cancellableThreads.checkForCancel();
                    long seqNo = operation.seqNo();
                    if (seqNo < startingSeqNo || seqNo > endingSeqNo) {
                        skippedOps.incrementAndGet();
                        continue;
                    }
                    ops.add(operation);
                    totalSentOps.incrementAndGet();
                    if ((batchSizeInBytes += operation.estimateSize()) < (long)this.chunkSizeInBytes) continue;
                    break;
                }
                lastBatchCount.set(ops.size());
                return ops;
            }
        };
        StopWatch stopWatch = new StopWatch().start();
        ActionListener<Long> batchedListener = ActionListener.map(listener, targetLocalCheckpoint -> {
            assert (snapshot.totalOperations() == snapshot.skippedOperations() + skippedOps.get() + totalSentOps.get()) : String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]", snapshot.totalOperations(), snapshot.skippedOperations(), skippedOps.get(), totalSentOps.get());
            stopWatch.stop();
            TimeValue tookTime = stopWatch.totalTime();
            this.logger.trace("recovery [phase2]: took [{}]", (Object)tookTime);
            return new SendSnapshotResult((long)targetLocalCheckpoint, totalSentOps.get(), tookTime);
        });
        this.sendBatch(readNextBatch, true, -2L, snapshot.totalOperations(), maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, retentionLeases, mappingVersion, batchedListener);
    }

    private void sendBatch(CheckedSupplier<List<Translog.Operation>, IOException> nextBatch, boolean firstBatch, long targetLocalCheckpoint, int totalTranslogOps, long maxSeenAutoIdTimestamp, long maxSeqNoOfUpdatesOrDeletes, RetentionLeases retentionLeases, long mappingVersionOnPrimary, ActionListener<Long> listener) throws IOException {
        assert (ThreadPool.assertCurrentMethodIsNotCalledRecursively());
        assert (Transports.assertNotTransportThread(this + "[send translog]"));
        List<Translog.Operation> operations = nextBatch.get();
        if (!operations.isEmpty() || firstBatch) {
            this.cancellableThreads.checkForCancel();
            this.recoveryTarget.indexTranslogOperations(operations, totalTranslogOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, retentionLeases, mappingVersionOnPrimary, ActionListener.wrap(newCheckpoint -> this.sendBatch(nextBatch, false, SequenceNumbers.max(targetLocalCheckpoint, newCheckpoint), totalTranslogOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, retentionLeases, mappingVersionOnPrimary, listener), listener::onFailure));
        } else {
            listener.onResponse(targetLocalCheckpoint);
        }
    }

    void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionListener<Void> listener) throws IOException {
        if (this.shard.state() == IndexShardState.CLOSED) {
            throw new IndexShardClosedException(this.request.shardId());
        }
        this.cancellableThreads.checkForCancel();
        StopWatch stopWatch = new StopWatch().start();
        this.logger.trace("finalizing recovery");
        RecoverySourceHandler.runUnderPrimaryPermit(() -> this.shard.markAllocationIdAsInSync(this.request.targetAllocationId(), targetLocalCheckpoint), this.shardId + " marking " + this.request.targetAllocationId() + " as in sync", this.shard, this.cancellableThreads, this.logger);
        long globalCheckpoint = this.shard.getLastKnownGlobalCheckpoint();
        StepListener<Void> finalizeListener = new StepListener<Void>();
        this.cancellableThreads.checkForCancel();
        this.recoveryTarget.finalizeRecovery(globalCheckpoint, trimAboveSeqNo, finalizeListener);
        finalizeListener.whenComplete(r -> {
            RecoverySourceHandler.runUnderPrimaryPermit(() -> this.shard.updateGlobalCheckpointForShard(this.request.targetAllocationId(), globalCheckpoint), this.shardId + " updating " + this.request.targetAllocationId() + "'s global checkpoint", this.shard, this.cancellableThreads, this.logger);
            if (this.request.isPrimaryRelocation()) {
                this.logger.trace("performing relocation hand-off");
                this.cancellableThreads.execute(() -> this.shard.relocated(this.request.targetAllocationId(), this.recoveryTarget::handoffPrimaryContext));
            }
            stopWatch.stop();
            this.logger.trace("finalizing recovery took [{}]", (Object)stopWatch.totalTime());
            listener.onResponse(null);
        }, listener::onFailure);
    }

    public void cancel(String reason) {
        this.cancellableThreads.cancel(reason);
        this.recoveryTarget.cancel();
    }

    public String toString() {
        return "ShardRecoveryHandler{shardId=" + this.request.shardId() + ", sourceNode=" + this.request.sourceNode() + ", targetNode=" + this.request.targetNode() + '}';
    }

    void sendFiles(final Store store, StoreFileMetadata[] files, final IntSupplier translogOps, ActionListener<Void> listener) {
        ArrayUtil.timSort(files, Comparator.comparingLong(StoreFileMetadata::length));
        ThreadContext threadContext = this.threadPool.getThreadContext();
        MultiFileTransfer<FileChunk> multiFileSender = new MultiFileTransfer<FileChunk>(this.logger, threadContext, listener, this.maxConcurrentFileChunks, Arrays.asList(files)){
            final Deque<byte[]> buffers;
            InputStreamIndexInput currentInput;
            long offset;
            {
                super(logger, threadContext, listener, maxConcurrentFileChunks, files);
                this.buffers = new ConcurrentLinkedDeque<byte[]>();
                this.currentInput = null;
                this.offset = 0L;
            }

            @Override
            protected void onNewFile(StoreFileMetadata md) throws IOException {
                this.offset = 0L;
                IOUtils.close(this.currentInput, () -> {
                    this.currentInput = null;
                });
                final IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE);
                this.currentInput = new InputStreamIndexInput(indexInput, md.length()){

                    @Override
                    public void close() throws IOException {
                        IOUtils.close(indexInput, () -> super.close());
                    }
                };
            }

            private byte[] acquireBuffer() {
                byte[] buffer = this.buffers.pollFirst();
                if (buffer != null) {
                    return buffer;
                }
                return new byte[RecoverySourceHandler.this.chunkSizeInBytes];
            }

            @Override
            protected FileChunk nextChunkRequest(StoreFileMetadata md) throws IOException {
                assert (Transports.assertNotTransportThread("read file chunk"));
                RecoverySourceHandler.this.cancellableThreads.checkForCancel();
                byte[] buffer = this.acquireBuffer();
                int bytesRead = this.currentInput.read(buffer);
                if (bytesRead == -1) {
                    throw new CorruptIndexException("file truncated; length=" + md.length() + " offset=" + this.offset, md.name());
                }
                boolean lastChunk = this.offset + (long)bytesRead == md.length();
                FileChunk chunk = new FileChunk(md, new BytesArray(buffer, 0, bytesRead), this.offset, lastChunk, () -> this.buffers.addFirst(buffer));
                this.offset += (long)bytesRead;
                return chunk;
            }

            @Override
            protected void executeChunkRequest(FileChunk request, ActionListener<Void> listener) {
                RecoverySourceHandler.this.cancellableThreads.checkForCancel();
                RecoverySourceHandler.this.recoveryTarget.writeFileChunk(request.md, request.position, request.content, request.lastChunk, translogOps.getAsInt(), ActionListener.runBefore(listener, request::close));
            }

            @Override
            protected void handleError(StoreFileMetadata md, Exception e) throws Exception {
                RecoverySourceHandler.this.handleErrorOnSendFiles(store, e, new StoreFileMetadata[]{md});
            }

            @Override
            public void close() throws IOException {
                IOUtils.close(this.currentInput, () -> {
                    this.currentInput = null;
                });
            }
        };
        this.resources.add(multiFileSender);
        multiFileSender.start();
    }

    private void cleanFiles(Store store, Store.MetadataSnapshot sourceMetadata, IntSupplier translogOps, long globalCheckpoint, ActionListener<Void> listener) {
        this.cancellableThreads.checkForCancel();
        this.recoveryTarget.cleanFiles(translogOps.getAsInt(), globalCheckpoint, sourceMetadata, ActionListener.delegateResponse(listener, (l, e) -> ActionListener.completeWith(l, () -> {
            StoreFileMetadata[] mds = (StoreFileMetadata[])StreamSupport.stream(sourceMetadata.spliterator(), false).toArray(StoreFileMetadata[]::new);
            ArrayUtil.timSort(mds, Comparator.comparingLong(StoreFileMetadata::length));
            this.handleErrorOnSendFiles(store, (Exception)e, mds);
            throw e;
        })));
    }

    private void handleErrorOnSendFiles(Store store, Exception e, StoreFileMetadata[] mds) throws Exception {
        IOException corruptIndexException = ExceptionsHelper.unwrapCorruption(e);
        assert (Transports.assertNotTransportThread(this + "[handle error on send/clean files]"));
        if (corruptIndexException != null) {
            IOException localException = null;
            for (StoreFileMetadata md : mds) {
                this.cancellableThreads.checkForCancel();
                this.logger.debug("checking integrity for file {} after remove corruption exception", (Object)md);
                if (store.checkIntegrityNoException(md)) continue;
                this.logger.warn("{} Corrupted file detected {} checksum mismatch", (Object)this.shardId, (Object)md);
                if (localException == null) {
                    localException = corruptIndexException;
                }
                this.failEngine(corruptIndexException);
            }
            if (localException != null) {
                throw localException;
            }
            RemoteTransportException remoteException = new RemoteTransportException("File corruption occurred on recovery but checksums are ok", null);
            remoteException.addSuppressed(e);
            this.logger.warn(() -> new ParameterizedMessage("{} Remote file corruption on node {}, recovering {}. local checksum OK", new Object[]{this.shardId, this.request.targetNode(), mds}), (Throwable)corruptIndexException);
            throw remoteException;
        }
        throw e;
    }

    protected void failEngine(IOException cause) {
        this.shard.failShard("recovery", cause);
    }

    static final class SendFileResult {
        final List<String> phase1FileNames;
        final List<Long> phase1FileSizes;
        final long totalSize;
        final List<String> phase1ExistingFileNames;
        final List<Long> phase1ExistingFileSizes;
        final long existingTotalSize;
        final TimeValue took;
        static final SendFileResult EMPTY = new SendFileResult(Collections.emptyList(), Collections.emptyList(), 0L, Collections.emptyList(), Collections.emptyList(), 0L, TimeValue.ZERO);

        SendFileResult(List<String> phase1FileNames, List<Long> phase1FileSizes, long totalSize, List<String> phase1ExistingFileNames, List<Long> phase1ExistingFileSizes, long existingTotalSize, TimeValue took) {
            this.phase1FileNames = phase1FileNames;
            this.phase1FileSizes = phase1FileSizes;
            this.totalSize = totalSize;
            this.phase1ExistingFileNames = phase1ExistingFileNames;
            this.phase1ExistingFileSizes = phase1ExistingFileSizes;
            this.existingTotalSize = existingTotalSize;
            this.took = took;
        }
    }

    static final class SendSnapshotResult {
        final long targetLocalCheckpoint;
        final int totalOperations;
        final TimeValue tookTime;

        SendSnapshotResult(long targetLocalCheckpoint, int totalOperations, TimeValue tookTime) {
            this.targetLocalCheckpoint = targetLocalCheckpoint;
            this.totalOperations = totalOperations;
            this.tookTime = tookTime;
        }
    }

    private static class FileChunk
    implements MultiFileTransfer.ChunkRequest,
    Releasable {
        final StoreFileMetadata md;
        final BytesReference content;
        final long position;
        final boolean lastChunk;
        final Releasable onClose;

        FileChunk(StoreFileMetadata md, BytesReference content, long position, boolean lastChunk, Releasable onClose) {
            this.md = md;
            this.content = content;
            this.position = position;
            this.lastChunk = lastChunk;
            this.onClose = onClose;
        }

        @Override
        public boolean lastChunk() {
            return this.lastChunk;
        }

        @Override
        public void close() {
            this.onClose.close();
        }
    }
}

