/*
 * Decompiled with CFR 0.152.
 */
package org.graylog.shaded.opensearch2.org.opensearch.indices.replication;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.graylog.shaded.opensearch2.org.opensearch.OpenSearchException;
import org.graylog.shaded.opensearch2.org.opensearch.action.ActionListener;
import org.graylog.shaded.opensearch2.org.opensearch.cluster.node.DiscoveryNode;
import org.graylog.shaded.opensearch2.org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.graylog.shaded.opensearch2.org.opensearch.index.IndexService;
import org.graylog.shaded.opensearch2.org.opensearch.index.shard.IndexShard;
import org.graylog.shaded.opensearch2.org.opensearch.index.shard.ShardId;
import org.graylog.shaded.opensearch2.org.opensearch.indices.IndicesService;
import org.graylog.shaded.opensearch2.org.opensearch.indices.recovery.FileChunkWriter;
import org.graylog.shaded.opensearch2.org.opensearch.indices.recovery.RecoverySettings;
import org.graylog.shaded.opensearch2.org.opensearch.indices.replication.CheckpointInfoRequest;
import org.graylog.shaded.opensearch2.org.opensearch.indices.replication.GetSegmentFilesRequest;
import org.graylog.shaded.opensearch2.org.opensearch.indices.replication.GetSegmentFilesResponse;
import org.graylog.shaded.opensearch2.org.opensearch.indices.replication.SegmentReplicationSourceHandler;
import org.graylog.shaded.opensearch2.org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.graylog.shaded.opensearch2.org.opensearch.indices.replication.common.CopyState;

class OngoingSegmentReplications {
    private static final Logger logger = LogManager.getLogger(OngoingSegmentReplications.class);
    private final RecoverySettings recoverySettings;
    private final IndicesService indicesService;
    private final Map<ReplicationCheckpoint, CopyState> copyStateMap;
    private final Map<String, SegmentReplicationSourceHandler> allocationIdToHandlers;

    OngoingSegmentReplications(IndicesService indicesService, RecoverySettings recoverySettings) {
        this.indicesService = indicesService;
        this.recoverySettings = recoverySettings;
        this.copyStateMap = Collections.synchronizedMap(new HashMap());
        this.allocationIdToHandlers = ConcurrentCollections.newConcurrentMap();
    }

    synchronized CopyState getCachedCopyState(ReplicationCheckpoint checkpoint) throws IOException {
        if (this.isInCopyStateMap(checkpoint)) {
            CopyState copyState = this.fetchFromCopyStateMap(checkpoint);
            copyState.incRef();
            return copyState;
        }
        ShardId shardId = checkpoint.getShardId();
        IndexService indexService = this.indicesService.indexServiceSafe(shardId.getIndex());
        IndexShard indexShard = indexService.getShard(shardId.id());
        CopyState copyState = new CopyState(checkpoint, indexShard);
        this.addToCopyStateMap(checkpoint, copyState);
        return copyState;
    }

    void startSegmentCopy(GetSegmentFilesRequest request, ActionListener<GetSegmentFilesResponse> listener) {
        SegmentReplicationSourceHandler handler = this.allocationIdToHandlers.get(request.getTargetAllocationId());
        if (handler != null) {
            if (handler.isReplicating()) {
                throw new OpenSearchException("Replication to shard {}, on node {} has already started", request.getCheckpoint().getShardId(), request.getTargetNode());
            }
            ActionListener<GetSegmentFilesResponse> wrappedListener = ActionListener.runBefore(listener, () -> {
                SegmentReplicationSourceHandler sourceHandler = this.allocationIdToHandlers.remove(request.getTargetAllocationId());
                if (sourceHandler != null) {
                    this.removeCopyState(sourceHandler.getCopyState());
                }
            });
            handler.sendFiles(request, wrappedListener);
        } else {
            listener.onResponse(new GetSegmentFilesResponse(Collections.emptyList()));
        }
    }

    CopyState prepareForReplication(CheckpointInfoRequest request, FileChunkWriter fileChunkWriter) throws IOException {
        CopyState copyState = this.getCachedCopyState(request.getCheckpoint());
        this.allocationIdToHandlers.compute(request.getTargetAllocationId(), (allocationId, segrepHandler) -> {
            if (segrepHandler != null) {
                logger.warn("Override handler for allocation id {}", (Object)request.getTargetAllocationId());
                this.cancelHandlers(handler -> handler.getAllocationId().equals(request.getTargetAllocationId()), "cancel due to retry");
            }
            return this.createTargetHandler(request.getTargetNode(), copyState, request.getTargetAllocationId(), fileChunkWriter);
        });
        return copyState;
    }

    synchronized void cancel(IndexShard shard, String reason) {
        this.cancelHandlers(handler -> handler.getCopyState().getShard().shardId().equals(shard.shardId()), reason);
    }

    synchronized void cancel(String allocationId, String reason) {
        SegmentReplicationSourceHandler handler = this.allocationIdToHandlers.remove(allocationId);
        if (handler != null) {
            handler.cancel(reason);
            this.removeCopyState(handler.getCopyState());
        }
    }

    void cancelReplication(DiscoveryNode node) {
        this.cancelHandlers(handler -> handler.getTargetNode().equals(node), "Node left");
    }

    boolean isInCopyStateMap(ReplicationCheckpoint replicationCheckpoint) {
        return this.copyStateMap.containsKey(replicationCheckpoint);
    }

    int size() {
        return this.allocationIdToHandlers.size();
    }

    Map<String, SegmentReplicationSourceHandler> getHandlers() {
        return this.allocationIdToHandlers;
    }

    int cachedCopyStateSize() {
        return this.copyStateMap.size();
    }

    private SegmentReplicationSourceHandler createTargetHandler(DiscoveryNode node, CopyState copyState, String allocationId, FileChunkWriter fileChunkWriter) {
        return new SegmentReplicationSourceHandler(node, fileChunkWriter, copyState.getShard().getThreadPool(), copyState, allocationId, Math.toIntExact(this.recoverySettings.getChunkSize().getBytes()), this.recoverySettings.getMaxConcurrentFileChunks());
    }

    private void addToCopyStateMap(ReplicationCheckpoint checkpoint, CopyState copyState) {
        this.copyStateMap.putIfAbsent(checkpoint, copyState);
    }

    private CopyState fetchFromCopyStateMap(ReplicationCheckpoint replicationCheckpoint) {
        return this.copyStateMap.get(replicationCheckpoint);
    }

    private synchronized void removeCopyState(CopyState copyState) {
        if (copyState.decRef()) {
            this.copyStateMap.remove(copyState.getRequestedReplicationCheckpoint());
        }
    }

    private void cancelHandlers(Predicate<? super SegmentReplicationSourceHandler> predicate, String reason) {
        List allocationIds = this.allocationIdToHandlers.values().stream().filter(predicate).map(SegmentReplicationSourceHandler::getAllocationId).collect(Collectors.toList());
        if (allocationIds.size() == 0) {
            return;
        }
        logger.warn(() -> new ParameterizedMessage("Cancelling replications for allocationIds {}", (Object)allocationIds));
        for (String allocationId : allocationIds) {
            this.cancel(allocationId, reason);
        }
    }

    public void clearOutOfSyncIds(ShardId shardId, Set<String> inSyncAllocationIds) {
        this.cancelHandlers(handler -> handler.getCopyState().getShard().shardId().equals(shardId) && !inSyncAllocationIds.contains(handler.getAllocationId()), "Shard is no longer in-sync with the primary");
    }
}

