/*
 * Decompiled with CFR 0.152.
 */
package conductor.org.elasticsearch.indices.recovery;

import conductor.org.apache.logging.log4j.Logger;
import conductor.org.apache.logging.log4j.message.ParameterizedMessage;
import conductor.org.apache.lucene.index.CorruptIndexException;
import conductor.org.apache.lucene.index.IndexCommit;
import conductor.org.apache.lucene.index.IndexFormatTooNewException;
import conductor.org.apache.lucene.index.IndexFormatTooOldException;
import conductor.org.apache.lucene.store.IOContext;
import conductor.org.apache.lucene.store.IndexInput;
import conductor.org.apache.lucene.util.ArrayUtil;
import conductor.org.elasticsearch.ElasticsearchException;
import conductor.org.elasticsearch.ExceptionsHelper;
import conductor.org.elasticsearch.Version;
import conductor.org.elasticsearch.action.ActionListener;
import conductor.org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import conductor.org.elasticsearch.cluster.routing.ShardRouting;
import conductor.org.elasticsearch.common.Nullable;
import conductor.org.elasticsearch.common.StopWatch;
import conductor.org.elasticsearch.common.bytes.BytesArray;
import conductor.org.elasticsearch.common.lease.Releasable;
import conductor.org.elasticsearch.common.logging.Loggers;
import conductor.org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import conductor.org.elasticsearch.common.unit.ByteSizeValue;
import conductor.org.elasticsearch.common.util.CancellableThreads;
import conductor.org.elasticsearch.common.util.concurrent.FutureUtils;
import conductor.org.elasticsearch.core.internal.io.IOUtils;
import conductor.org.elasticsearch.core.internal.io.Streams;
import conductor.org.elasticsearch.index.engine.Engine;
import conductor.org.elasticsearch.index.engine.RecoveryEngineException;
import conductor.org.elasticsearch.index.seqno.LocalCheckpointTracker;
import conductor.org.elasticsearch.index.shard.IndexShard;
import conductor.org.elasticsearch.index.shard.IndexShardClosedException;
import conductor.org.elasticsearch.index.shard.IndexShardRelocatedException;
import conductor.org.elasticsearch.index.shard.IndexShardState;
import conductor.org.elasticsearch.index.store.Store;
import conductor.org.elasticsearch.index.store.StoreFileMetaData;
import conductor.org.elasticsearch.index.translog.Translog;
import conductor.org.elasticsearch.indices.recovery.DelayRecoveryException;
import conductor.org.elasticsearch.indices.recovery.RecoverFilesRecoveryException;
import conductor.org.elasticsearch.indices.recovery.RecoveryResponse;
import conductor.org.elasticsearch.indices.recovery.RecoveryTargetHandler;
import conductor.org.elasticsearch.indices.recovery.StartRecoveryRequest;
import conductor.org.elasticsearch.transport.RemoteTransportException;
import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Locale;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.StreamSupport;

public class RecoverySourceHandler {
    protected final Logger logger;
    private final IndexShard shard;
    private final int shardId;
    private final StartRecoveryRequest request;
    private final int chunkSizeInBytes;
    private final RecoveryTargetHandler recoveryTarget;
    protected final RecoveryResponse response;
    private final CancellableThreads cancellableThreads = new CancellableThreads(){

        @Override
        protected void onCancel(String reason, @Nullable Exception suppressedException) {
            ElasticsearchException e = RecoverySourceHandler.this.shard.state() == IndexShardState.CLOSED ? new IndexShardClosedException(RecoverySourceHandler.this.shard.shardId(), "shard is closed and recovery was canceled reason [" + reason + "]") : new CancellableThreads.ExecutionCancelledException("recovery was canceled reason [" + reason + "]");
            if (suppressedException != null) {
                e.addSuppressed(suppressedException);
            }
            throw e;
        }
    };

    public RecoverySourceHandler(IndexShard shard, RecoveryTargetHandler recoveryTarget, StartRecoveryRequest request, int fileChunkSizeInBytes) {
        this.shard = shard;
        this.recoveryTarget = recoveryTarget;
        this.request = request;
        this.shardId = this.request.shardId().id();
        this.logger = Loggers.getLogger(this.getClass(), request.shardId(), "recover to " + request.targetNode().getName());
        this.chunkSizeInBytes = fileChunkSizeInBytes;
        this.response = new RecoveryResponse();
    }

    public StartRecoveryRequest getRequest() {
        return this.request;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public RecoveryResponse recoverToTarget() throws IOException {
        RecoverySourceHandler.runUnderPrimaryPermit(() -> {
            IndexShardRoutingTable routingTable = this.shard.getReplicationGroup().getRoutingTable();
            ShardRouting targetShardRouting = routingTable.getByAllocationId(this.request.targetAllocationId());
            if (targetShardRouting == null) {
                this.logger.debug("delaying recovery of {} as it is not listed as assigned to target node {}", (Object)this.request.shardId(), (Object)this.request.targetNode());
                throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node");
            }
            assert (targetShardRouting.initializing()) : "expected recovery target to be initializing but was " + targetShardRouting;
        }, this.shardId + " validating recovery target [" + this.request.targetAllocationId() + "] registered ", this.shard, this.cancellableThreads, this.logger);
        try (Closeable ignored = this.shard.acquireRetentionLockForPeerRecovery();){
            long targetLocalCheckpoint;
            long requiredSeqNoRangeStart;
            long startingSeqNo;
            boolean isSequenceNumberBasedRecovery;
            boolean bl = isSequenceNumberBasedRecovery = this.request.startingSeqNo() != -2L && this.isTargetSameHistory() && this.shard.hasCompleteHistoryOperations("peer-recovery", this.request.startingSeqNo());
            if (isSequenceNumberBasedRecovery) {
                this.logger.trace("performing sequence numbers based recovery. starting at [{}]", (Object)this.request.startingSeqNo());
                requiredSeqNoRangeStart = startingSeqNo = this.request.startingSeqNo();
            } else {
                Engine.IndexCommitRef phase1Snapshot;
                try {
                    phase1Snapshot = this.shard.acquireSafeIndexCommit();
                }
                catch (Exception e) {
                    throw new RecoveryEngineException(this.shard.shardId(), 1, "snapshot failed", e);
                }
                requiredSeqNoRangeStart = Long.parseLong(phase1Snapshot.getIndexCommit().getUserData().get("local_checkpoint")) + 1L;
                startingSeqNo = this.shard.indexSettings().isSoftDeleteEnabled() ? requiredSeqNoRangeStart : 0L;
                try {
                    int estimateNumOps = this.shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo);
                    this.phase1(phase1Snapshot.getIndexCommit(), () -> estimateNumOps);
                }
                catch (Exception e) {
                    try {
                        throw new RecoveryEngineException(this.shard.shardId(), 1, "phase1 failed", e);
                    }
                    catch (Throwable throwable) {
                        try {
                            IOUtils.close(phase1Snapshot);
                            throw throwable;
                        }
                        catch (IOException ex) {
                            this.logger.warn("releasing snapshot caused exception", (Throwable)ex);
                        }
                        throw throwable;
                    }
                }
                try {
                    IOUtils.close(phase1Snapshot);
                }
                catch (IOException ex) {
                    this.logger.warn("releasing snapshot caused exception", (Throwable)ex);
                }
            }
            assert (startingSeqNo >= 0L) : "startingSeqNo must be non negative. got: " + startingSeqNo;
            assert (requiredSeqNoRangeStart >= startingSeqNo) : "requiredSeqNoRangeStart [" + requiredSeqNoRangeStart + "] is lower than [" + startingSeqNo + "]";
            try {
                this.prepareTargetForTranslog(!isSequenceNumberBasedRecovery, this.shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo));
            }
            catch (Exception e) {
                throw new RecoveryEngineException(this.shard.shardId(), 1, "prepare target for translog failed", e);
            }
            RecoverySourceHandler.runUnderPrimaryPermit(() -> this.shard.initiateTracking(this.request.targetAllocationId()), this.shardId + " initiating tracking of " + this.request.targetAllocationId(), this.shard, this.cancellableThreads, this.logger);
            long endingSeqNo = this.shard.seqNoStats().getMaxSeqNo();
            this.cancellableThreads.execute(() -> this.shard.waitForOpsToComplete(endingSeqNo));
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("all operations up to [{}] completed, which will be used as an ending sequence number", (Object)endingSeqNo);
                this.logger.trace("snapshot translog for recovery; current size is [{}]", (Object)this.shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo));
            }
            try (Translog.Snapshot snapshot = this.shard.getHistoryOperations("peer-recovery", startingSeqNo);){
                long maxSeenAutoIdTimestamp = this.shard.getMaxSeenAutoIdTimestamp();
                long maxSeqNoOfUpdatesOrDeletes = this.shard.getMaxSeqNoOfUpdatesOrDeletes();
                targetLocalCheckpoint = this.phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes);
            }
            catch (Exception e) {
                throw new RecoveryEngineException(this.shard.shardId(), 2, "phase2 failed", e);
            }
            this.finalizeRecovery(targetLocalCheckpoint);
            return this.response;
        }
    }

    private boolean isTargetSameHistory() {
        String targetHistoryUUID = this.request.metadataSnapshot().getHistoryUUID();
        assert (targetHistoryUUID != null || this.shard.indexSettings().getIndexVersionCreated().before(Version.V_6_0_0_rc1)) : "incoming target history N/A but index was created after or on 6.0.0-rc1";
        return targetHistoryUUID != null && targetHistoryUUID.equals(this.shard.getHistoryUUID());
    }

    static void runUnderPrimaryPermit(CancellableThreads.Interruptable runnable, String reason, IndexShard primary, CancellableThreads cancellableThreads, Logger logger) {
        cancellableThreads.execute(() -> {
            final CompletableFuture permit = new CompletableFuture();
            ActionListener<Releasable> onAcquired = new ActionListener<Releasable>(){

                @Override
                public void onResponse(Releasable releasable) {
                    if (!permit.complete(releasable)) {
                        releasable.close();
                    }
                }

                @Override
                public void onFailure(Exception e) {
                    permit.completeExceptionally(e);
                }
            };
            primary.acquirePrimaryOperationPermit(onAcquired, "same", reason);
            try (Releasable ignored = (Releasable)FutureUtils.get(permit);){
                if (primary.isRelocatedPrimary()) {
                    throw new IndexShardRelocatedException(primary.shardId());
                }
                runnable.run();
            }
            finally {
                permit.whenComplete((r, e) -> {
                    if (r != null) {
                        r.close();
                    }
                    if (e != null) {
                        logger.trace("suppressing exception on completion (it was already bubbled up or the operation was aborted)", (Throwable)e);
                    }
                });
            }
        });
    }

    public void phase1(IndexCommit snapshot, Supplier<Integer> translogOps) {
        this.cancellableThreads.checkForCancel();
        long totalSize = 0L;
        long existingTotalSize = 0L;
        Store store = this.shard.store();
        store.incRef();
        try {
            boolean recoverWithSyncId;
            Store.MetadataSnapshot recoverySourceMetadata;
            StopWatch stopWatch = new StopWatch().start();
            try {
                recoverySourceMetadata = store.getMetadata(snapshot);
            }
            catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) {
                this.shard.failShard("recovery", ex);
                throw ex;
            }
            for (String name : snapshot.getFileNames()) {
                StoreFileMetaData md2 = recoverySourceMetadata.get(name);
                if (md2 != null) continue;
                this.logger.info("Snapshot differs from actual index for file: {} meta: {}", (Object)name, (Object)recoverySourceMetadata.asMap());
                throw new CorruptIndexException("Snapshot differs from actual index - maybe index was removed metadata has " + recoverySourceMetadata.asMap().size() + " files", name);
            }
            String recoverySourceSyncId = recoverySourceMetadata.getSyncId();
            String recoveryTargetSyncId = this.request.metadataSnapshot().getSyncId();
            boolean bl = recoverWithSyncId = recoverySourceSyncId != null && recoverySourceSyncId.equals(recoveryTargetSyncId);
            if (recoverWithSyncId) {
                long l;
                long numDocsTarget = this.request.metadataSnapshot().getNumDocs();
                if (numDocsTarget != (l = recoverySourceMetadata.getNumDocs())) {
                    throw new IllegalStateException("try to recover " + this.request.shardId() + " from primary shard with sync id but number of docs differ: " + l + " (" + this.request.sourceNode().getName() + ", primary) vs " + numDocsTarget + "(" + this.request.targetNode().getName() + ")");
                }
                this.logger.trace("skipping [phase1]- identical sync id [{}] found on both source and target", (Object)recoverySourceSyncId);
            } else {
                Store.RecoveryDiff diff = recoverySourceMetadata.recoveryDiff(this.request.metadataSnapshot());
                for (StoreFileMetaData storeFileMetaData : diff.identical) {
                    this.response.phase1ExistingFileNames.add(storeFileMetaData.name());
                    this.response.phase1ExistingFileSizes.add(storeFileMetaData.length());
                    existingTotalSize += storeFileMetaData.length();
                    if (this.logger.isTraceEnabled()) {
                        this.logger.trace("recovery [phase1]: not recovering [{}], exist in local store and has checksum [{}], size [{}]", (Object)storeFileMetaData.name(), (Object)storeFileMetaData.checksum(), (Object)storeFileMetaData.length());
                    }
                    totalSize += storeFileMetaData.length();
                }
                ArrayList<StoreFileMetaData> phase1Files = new ArrayList<StoreFileMetaData>(diff.different.size() + diff.missing.size());
                phase1Files.addAll(diff.different);
                phase1Files.addAll(diff.missing);
                for (StoreFileMetaData md4 : phase1Files) {
                    if (this.request.metadataSnapshot().asMap().containsKey(md4.name())) {
                        this.logger.trace("recovery [phase1]: recovering [{}], exists in local store, but is different: remote [{}], local [{}]", (Object)md4.name(), (Object)this.request.metadataSnapshot().asMap().get(md4.name()), (Object)md4);
                    } else {
                        this.logger.trace("recovery [phase1]: recovering [{}], does not exist in remote", (Object)md4.name());
                    }
                    this.response.phase1FileNames.add(md4.name());
                    this.response.phase1FileSizes.add(md4.length());
                    totalSize += md4.length();
                }
                this.response.phase1TotalSize = totalSize;
                this.response.phase1ExistingTotalSize = existingTotalSize;
                this.logger.trace("recovery [phase1]: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]", (Object)this.response.phase1FileNames.size(), (Object)new ByteSizeValue(totalSize), (Object)this.response.phase1ExistingFileNames.size(), (Object)new ByteSizeValue(existingTotalSize));
                this.cancellableThreads.execute(() -> this.recoveryTarget.receiveFileInfo(this.response.phase1FileNames, this.response.phase1FileSizes, this.response.phase1ExistingFileNames, this.response.phase1ExistingFileSizes, (Integer)translogOps.get()));
                Function<StoreFileMetaData, OutputStream> function = md -> new BufferedOutputStream(new RecoveryOutputStream((StoreFileMetaData)md, translogOps), this.chunkSizeInBytes);
                this.sendFiles(store, phase1Files.toArray(new StoreFileMetaData[phase1Files.size()]), function);
                try {
                    this.cancellableThreads.executeIO(() -> this.recoveryTarget.cleanFiles((Integer)translogOps.get(), recoverySourceMetadata));
                }
                catch (RemoteTransportException | IOException targetException) {
                    IOException corruptIndexException = ExceptionsHelper.unwrapCorruption(targetException);
                    if (corruptIndexException != null) {
                        try {
                            Store.MetadataSnapshot recoverySourceMetadata1 = store.getMetadata(snapshot);
                            StoreFileMetaData[] metadata = (StoreFileMetaData[])StreamSupport.stream(recoverySourceMetadata1.spliterator(), false).toArray(StoreFileMetaData[]::new);
                            ArrayUtil.timSort(metadata, Comparator.comparingLong(StoreFileMetaData::length));
                            for (StoreFileMetaData md5 : metadata) {
                                this.cancellableThreads.checkForCancel();
                                this.logger.debug("checking integrity for file {} after remove corruption exception", (Object)md5);
                                if (store.checkIntegrityNoException(md5)) continue;
                                this.shard.failShard("recovery", corruptIndexException);
                                this.logger.warn("Corrupted file detected {} checksum mismatch", (Object)md5);
                                throw corruptIndexException;
                            }
                        }
                        catch (IOException ex) {
                            targetException.addSuppressed(ex);
                            throw targetException;
                        }
                        RemoteTransportException exception = new RemoteTransportException("File corruption occurred on recovery but checksums are ok", null);
                        exception.addSuppressed(targetException);
                        this.logger.warn(() -> new ParameterizedMessage("{} Remote file corruption during finalization of recovery on node {}. local checksum OK", (Object)this.shard.shardId(), (Object)this.request.targetNode()), (Throwable)corruptIndexException);
                        throw exception;
                    }
                    throw targetException;
                }
            }
            this.logger.trace("recovery [phase1]: took [{}]", (Object)stopWatch.totalTime());
            this.response.phase1Time = stopWatch.totalTime().millis();
        }
        catch (Exception e) {
            throw new RecoverFilesRecoveryException(this.request.shardId(), this.response.phase1FileNames.size(), new ByteSizeValue(totalSize), e);
        }
        finally {
            store.decRef();
        }
    }

    void prepareTargetForTranslog(boolean fileBasedRecovery, int totalTranslogOps) throws IOException {
        StopWatch stopWatch = new StopWatch().start();
        this.logger.trace("recovery [phase1]: prepare remote engine for translog");
        long startEngineStart = stopWatch.totalTime().millis();
        this.cancellableThreads.executeIO(() -> this.recoveryTarget.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps));
        stopWatch.stop();
        this.response.startTime = stopWatch.totalTime().millis() - startEngineStart;
        this.logger.trace("recovery [phase1]: remote engine start took [{}]", (Object)stopWatch.totalTime());
    }

    long phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot, long maxSeenAutoIdTimestamp, long maxSeqNoOfUpdatesOrDeletes) throws IOException {
        if (this.shard.state() == IndexShardState.CLOSED) {
            throw new IndexShardClosedException(this.request.shardId());
        }
        this.cancellableThreads.checkForCancel();
        StopWatch stopWatch = new StopWatch().start();
        this.logger.trace("recovery [phase2]: sending transaction log operations (seq# from [" + startingSeqNo + "], required [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "]");
        SendSnapshotResult result = this.sendSnapshot(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes);
        stopWatch.stop();
        this.logger.trace("recovery [phase2]: took [{}]", (Object)stopWatch.totalTime());
        this.response.phase2Time = stopWatch.totalTime().millis();
        this.response.phase2Operations = result.totalOperations;
        return result.targetLocalCheckpoint;
    }

    public void finalizeRecovery(long targetLocalCheckpoint) throws IOException {
        if (this.shard.state() == IndexShardState.CLOSED) {
            throw new IndexShardClosedException(this.request.shardId());
        }
        this.cancellableThreads.checkForCancel();
        StopWatch stopWatch = new StopWatch().start();
        this.logger.trace("finalizing recovery");
        RecoverySourceHandler.runUnderPrimaryPermit(() -> this.shard.markAllocationIdAsInSync(this.request.targetAllocationId(), targetLocalCheckpoint), this.shardId + " marking " + this.request.targetAllocationId() + " as in sync", this.shard, this.cancellableThreads, this.logger);
        long globalCheckpoint = this.shard.getGlobalCheckpoint();
        this.cancellableThreads.executeIO(() -> this.recoveryTarget.finalizeRecovery(globalCheckpoint));
        RecoverySourceHandler.runUnderPrimaryPermit(() -> this.shard.updateGlobalCheckpointForShard(this.request.targetAllocationId(), globalCheckpoint), this.shardId + " updating " + this.request.targetAllocationId() + "'s global checkpoint", this.shard, this.cancellableThreads, this.logger);
        if (this.request.isPrimaryRelocation()) {
            this.logger.trace("performing relocation hand-off");
            this.cancellableThreads.execute(() -> this.shard.relocated(this.recoveryTarget::handoffPrimaryContext));
        }
        stopWatch.stop();
        this.logger.trace("finalizing recovery took [{}]", (Object)stopWatch.totalTime());
    }

    protected SendSnapshotResult sendSnapshot(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot, long maxSeenAutoIdTimestamp, long maxSeqNoOfUpdatesOrDeletes) throws IOException {
        Translog.Operation operation;
        assert (requiredSeqNoRangeStart <= endingSeqNo + 1L) : "requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo " + endingSeqNo;
        assert (startingSeqNo <= requiredSeqNoRangeStart) : "startingSeqNo " + startingSeqNo + " is larger than requiredSeqNoRangeStart " + requiredSeqNoRangeStart;
        int ops = 0;
        long size = 0L;
        int skippedOps = 0;
        int totalSentOps = 0;
        AtomicLong targetLocalCheckpoint = new AtomicLong(-2L);
        ArrayList<Translog.Operation> operations = new ArrayList<Translog.Operation>();
        LocalCheckpointTracker requiredOpsTracker = new LocalCheckpointTracker(endingSeqNo, requiredSeqNoRangeStart - 1L);
        int expectedTotalOps = snapshot.totalOperations();
        if (expectedTotalOps == 0) {
            this.logger.trace("no translog operations to send");
        }
        CancellableThreads.IOInterruptable sendBatch = () -> {
            long targetCheckpoint = this.recoveryTarget.indexTranslogOperations(operations, expectedTotalOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes);
            targetLocalCheckpoint.set(targetCheckpoint);
        };
        while ((operation = snapshot.next()) != null) {
            if (this.shard.state() == IndexShardState.CLOSED) {
                throw new IndexShardClosedException(this.request.shardId());
            }
            this.cancellableThreads.checkForCancel();
            long seqNo = operation.seqNo();
            if (seqNo < startingSeqNo || seqNo > endingSeqNo) {
                ++skippedOps;
                continue;
            }
            operations.add(operation);
            ++ops;
            ++totalSentOps;
            requiredOpsTracker.markSeqNoAsCompleted(seqNo);
            if ((size += operation.estimateSize()) < (long)this.chunkSizeInBytes) continue;
            this.cancellableThreads.executeIO(sendBatch);
            this.logger.trace("sent batch of [{}][{}] (total: [{}]) translog operations", (Object)ops, (Object)new ByteSizeValue(size), (Object)expectedTotalOps);
            ops = 0;
            size = 0L;
            operations.clear();
        }
        if (!operations.isEmpty() || totalSentOps == 0) {
            this.cancellableThreads.executeIO(sendBatch);
        }
        assert (expectedTotalOps == snapshot.skippedOperations() + skippedOps + totalSentOps) : String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]", expectedTotalOps, snapshot.skippedOperations(), skippedOps, totalSentOps);
        if (requiredOpsTracker.getCheckpoint() < endingSeqNo) {
            throw new IllegalStateException("translog replay failed to cover required sequence numbers (required range [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "). first missing op is [" + (requiredOpsTracker.getCheckpoint() + 1L) + "]");
        }
        this.logger.trace("sent final batch of [{}][{}] (total: [{}]) translog operations", (Object)ops, (Object)new ByteSizeValue(size), (Object)expectedTotalOps);
        return new SendSnapshotResult(targetLocalCheckpoint.get(), totalSentOps);
    }

    public void cancel(String reason) {
        this.cancellableThreads.cancel(reason);
    }

    public String toString() {
        return "ShardRecoveryHandler{shardId=" + this.request.shardId() + ", sourceNode=" + this.request.sourceNode() + ", targetNode=" + this.request.targetNode() + '}';
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void sendFiles(Store store, StoreFileMetaData[] files, Function<StoreFileMetaData, OutputStream> outputStreamFactory) throws Exception {
        store.incRef();
        try {
            ArrayUtil.timSort(files, Comparator.comparingLong(StoreFileMetaData::length));
            for (int i = 0; i < files.length; ++i) {
                StoreFileMetaData md = files[i];
                try (IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE);){
                    Streams.copy(new InputStreamIndexInput(indexInput, md.length()), outputStreamFactory.apply(md));
                    continue;
                }
                catch (Exception e) {
                    IOException corruptIndexException = ExceptionsHelper.unwrapCorruption(e);
                    if (corruptIndexException != null) {
                        if (!store.checkIntegrityNoException(md)) {
                            this.logger.warn("{} Corrupted file detected {} checksum mismatch", (Object)this.shardId, (Object)md);
                            this.failEngine(corruptIndexException);
                            throw corruptIndexException;
                        }
                        RemoteTransportException exception = new RemoteTransportException("File corruption occurred on recovery but checksums are ok", null);
                        exception.addSuppressed(e);
                        this.logger.warn(() -> new ParameterizedMessage("{} Remote file corruption on node {}, recovering {}. local checksum OK", this.shardId, this.request.targetNode(), md), (Throwable)corruptIndexException);
                        throw exception;
                    }
                    throw e;
                }
            }
        }
        finally {
            store.decRef();
        }
    }

    protected void failEngine(IOException cause) {
        this.shard.failShard("recovery", cause);
    }

    final class RecoveryOutputStream
    extends OutputStream {
        private final StoreFileMetaData md;
        private final Supplier<Integer> translogOps;
        private long position = 0L;

        RecoveryOutputStream(StoreFileMetaData md, Supplier<Integer> translogOps) {
            this.md = md;
            this.translogOps = translogOps;
        }

        @Override
        public void write(int b) throws IOException {
            throw new UnsupportedOperationException("we can't send single bytes over the wire");
        }

        @Override
        public void write(byte[] b, int offset, int length) throws IOException {
            this.sendNextChunk(this.position, new BytesArray(b, offset, length), this.md.length() == this.position + (long)length);
            this.position += (long)length;
            assert (this.md.length() >= this.position) : "length: " + this.md.length() + " but positions was: " + this.position;
        }

        private void sendNextChunk(long position, BytesArray content, boolean lastChunk) throws IOException {
            RecoverySourceHandler.this.cancellableThreads.executeIO(() -> RecoverySourceHandler.this.recoveryTarget.writeFileChunk(this.md, position, content, lastChunk, this.translogOps.get()));
            if (RecoverySourceHandler.this.shard.state() == IndexShardState.CLOSED) {
                throw new IndexShardClosedException(RecoverySourceHandler.this.request.shardId());
            }
        }
    }

    static class SendSnapshotResult {
        final long targetLocalCheckpoint;
        final int totalOperations;

        SendSnapshotResult(long targetLocalCheckpoint, int totalOperations) {
            this.targetLocalCheckpoint = targetLocalCheckpoint;
            this.totalOperations = totalOperations;
        }
    }
}

