/*
 * Decompiled with CFR 0.152.
 */
package org.graylog.shaded.opensearch2.org.opensearch.indices.replication;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.Message;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.graylog.shaded.opensearch2.org.opensearch.action.support.ChannelActionListener;
import org.graylog.shaded.opensearch2.org.opensearch.cluster.ClusterChangedEvent;
import org.graylog.shaded.opensearch2.org.opensearch.cluster.ClusterStateListener;
import org.graylog.shaded.opensearch2.org.opensearch.cluster.metadata.IndexMetadata;
import org.graylog.shaded.opensearch2.org.opensearch.cluster.node.DiscoveryNode;
import org.graylog.shaded.opensearch2.org.opensearch.cluster.routing.ShardRouting;
import org.graylog.shaded.opensearch2.org.opensearch.cluster.service.ClusterService;
import org.graylog.shaded.opensearch2.org.opensearch.common.Nullable;
import org.graylog.shaded.opensearch2.org.opensearch.common.component.AbstractLifecycleComponent;
import org.graylog.shaded.opensearch2.org.opensearch.common.settings.Settings;
import org.graylog.shaded.opensearch2.org.opensearch.index.IndexService;
import org.graylog.shaded.opensearch2.org.opensearch.index.shard.IndexEventListener;
import org.graylog.shaded.opensearch2.org.opensearch.index.shard.IndexShard;
import org.graylog.shaded.opensearch2.org.opensearch.index.shard.ShardId;
import org.graylog.shaded.opensearch2.org.opensearch.indices.IndicesService;
import org.graylog.shaded.opensearch2.org.opensearch.indices.recovery.RecoverySettings;
import org.graylog.shaded.opensearch2.org.opensearch.indices.recovery.RetryableTransportClient;
import org.graylog.shaded.opensearch2.org.opensearch.indices.replication.CheckpointInfoRequest;
import org.graylog.shaded.opensearch2.org.opensearch.indices.replication.CheckpointInfoResponse;
import org.graylog.shaded.opensearch2.org.opensearch.indices.replication.GetSegmentFilesRequest;
import org.graylog.shaded.opensearch2.org.opensearch.indices.replication.OngoingSegmentReplications;
import org.graylog.shaded.opensearch2.org.opensearch.indices.replication.RemoteSegmentFileChunkWriter;
import org.graylog.shaded.opensearch2.org.opensearch.indices.replication.UpdateVisibleCheckpointRequest;
import org.graylog.shaded.opensearch2.org.opensearch.indices.replication.common.CopyState;
import org.graylog.shaded.opensearch2.org.opensearch.indices.replication.common.ReplicationTimer;
import org.graylog.shaded.opensearch2.org.opensearch.tasks.Task;
import org.graylog.shaded.opensearch2.org.opensearch.transport.TransportChannel;
import org.graylog.shaded.opensearch2.org.opensearch.transport.TransportRequestHandler;
import org.graylog.shaded.opensearch2.org.opensearch.transport.TransportResponse;
import org.graylog.shaded.opensearch2.org.opensearch.transport.TransportService;

public class SegmentReplicationSourceService
extends AbstractLifecycleComponent
implements ClusterStateListener,
IndexEventListener {
    private static final Logger logger = LogManager.getLogger(SegmentReplicationSourceService.class);
    private final RecoverySettings recoverySettings;
    private final TransportService transportService;
    private final IndicesService indicesService;
    private final OngoingSegmentReplications ongoingSegmentReplications;

    protected SegmentReplicationSourceService(IndicesService indicesService, TransportService transportService, RecoverySettings recoverySettings, OngoingSegmentReplications ongoingSegmentReplications) {
        this.transportService = transportService;
        this.indicesService = indicesService;
        this.recoverySettings = recoverySettings;
        this.ongoingSegmentReplications = ongoingSegmentReplications;
        transportService.registerRequestHandler("internal:index/shard/replication/get_checkpoint_info", "generic", CheckpointInfoRequest::new, new CheckpointInfoRequestHandler());
        transportService.registerRequestHandler("internal:index/shard/replication/get_segment_files", "generic", GetSegmentFilesRequest::new, new GetSegmentFilesRequestHandler());
        transportService.registerRequestHandler("internal:index/shard/replication/update_visible_checkpoint", "generic", UpdateVisibleCheckpointRequest::new, new UpdateVisibleCheckpointRequestHandler());
    }

    public SegmentReplicationSourceService(IndicesService indicesService, TransportService transportService, RecoverySettings recoverySettings) {
        this(indicesService, transportService, recoverySettings, new OngoingSegmentReplications(indicesService, recoverySettings));
    }

    @Override
    public void clusterChanged(ClusterChangedEvent event) {
        if (event.nodesRemoved()) {
            for (DiscoveryNode removedNode : event.nodesDelta().removedNodes()) {
                this.ongoingSegmentReplications.cancelReplication(removedNode);
            }
        }
        if (event.routingTableChanged()) {
            for (IndexService indexService : this.indicesService) {
                if (!indexService.getIndexSettings().isSegRepEnabled()) continue;
                for (IndexShard indexShard : indexService) {
                    if (!indexShard.routingEntry().primary()) continue;
                    IndexMetadata indexMetadata = indexService.getIndexSettings().getIndexMetadata();
                    HashSet<String> inSyncAllocationIds = new HashSet<String>(indexMetadata.inSyncAllocationIds(indexShard.shardId().id()));
                    if (indexShard.isPrimaryMode()) {
                        Set<String> shardTrackerInSyncIds = indexShard.getReplicationGroup().getInSyncAllocationIds();
                        inSyncAllocationIds.addAll(shardTrackerInSyncIds);
                    }
                    this.ongoingSegmentReplications.clearOutOfSyncIds(indexShard.shardId(), inSyncAllocationIds);
                }
            }
        }
    }

    @Override
    protected void doStart() {
        ClusterService clusterService = this.indicesService.clusterService();
        if (DiscoveryNode.isDataNode(clusterService.getSettings())) {
            clusterService.addListener(this);
        }
    }

    @Override
    protected void doStop() {
        ClusterService clusterService = this.indicesService.clusterService();
        if (DiscoveryNode.isDataNode(clusterService.getSettings())) {
            this.indicesService.clusterService().removeListener(this);
        }
    }

    @Override
    protected void doClose() throws IOException {
    }

    @Override
    public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) {
        if (indexShard != null && indexShard.indexSettings().isSegRepEnabled()) {
            this.ongoingSegmentReplications.cancel(indexShard, "shard is closed");
        }
    }

    @Override
    public void shardRoutingChanged(IndexShard indexShard, @Nullable ShardRouting oldRouting, ShardRouting newRouting) {
        if (indexShard != null && indexShard.indexSettings().isSegRepEnabled() && !oldRouting.primary() && newRouting.primary()) {
            this.ongoingSegmentReplications.cancel(indexShard.routingEntry().allocationId().getId(), "Relocating primary shard.");
        }
    }

    public static class Actions {
        public static final String GET_CHECKPOINT_INFO = "internal:index/shard/replication/get_checkpoint_info";
        public static final String GET_SEGMENT_FILES = "internal:index/shard/replication/get_segment_files";
        public static final String UPDATE_VISIBLE_CHECKPOINT = "internal:index/shard/replication/update_visible_checkpoint";
    }

    private class CheckpointInfoRequestHandler
    implements TransportRequestHandler<CheckpointInfoRequest> {
        private CheckpointInfoRequestHandler() {
        }

        @Override
        public void messageReceived(CheckpointInfoRequest request, TransportChannel channel, Task task) throws Exception {
            ReplicationTimer timer = new ReplicationTimer();
            timer.start();
            RemoteSegmentFileChunkWriter segmentSegmentFileChunkWriter = new RemoteSegmentFileChunkWriter(request.getReplicationId(), SegmentReplicationSourceService.this.recoverySettings, new RetryableTransportClient(SegmentReplicationSourceService.this.transportService, request.getTargetNode(), SegmentReplicationSourceService.this.recoverySettings.internalActionRetryTimeout(), logger), request.getCheckpoint().getShardId(), "internal:index/shard/replication/file_chunk", new AtomicLong(0L), throttleTime -> {});
            CopyState copyState = SegmentReplicationSourceService.this.ongoingSegmentReplications.prepareForReplication(request, segmentSegmentFileChunkWriter);
            channel.sendResponse(new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()));
            timer.stop();
            logger.trace((Message)new ParameterizedMessage("[replication id {}] Source node sent checkpoint info [{}] to target node [{}], timing: {}", new Object[]{request.getReplicationId(), copyState.getCheckpoint(), request.getTargetNode().getId(), timer.time()}));
        }
    }

    private class GetSegmentFilesRequestHandler
    implements TransportRequestHandler<GetSegmentFilesRequest> {
        private GetSegmentFilesRequestHandler() {
        }

        @Override
        public void messageReceived(GetSegmentFilesRequest request, TransportChannel channel, Task task) throws Exception {
            SegmentReplicationSourceService.this.ongoingSegmentReplications.startSegmentCopy(request, new ChannelActionListener(channel, "internal:index/shard/replication/get_segment_files", request));
        }
    }

    private class UpdateVisibleCheckpointRequestHandler
    implements TransportRequestHandler<UpdateVisibleCheckpointRequest> {
        private UpdateVisibleCheckpointRequestHandler() {
        }

        @Override
        public void messageReceived(UpdateVisibleCheckpointRequest request, TransportChannel channel, Task task) throws Exception {
            try {
                IndexService indexService = SegmentReplicationSourceService.this.indicesService.indexServiceSafe(request.getPrimaryShardId().getIndex());
                IndexShard indexShard = indexService.getShard(request.getPrimaryShardId().id());
                indexShard.updateVisibleCheckpointForShard(request.getTargetAllocationId(), request.getCheckpoint());
                channel.sendResponse(TransportResponse.Empty.INSTANCE);
            }
            catch (Exception e) {
                channel.sendResponse(e);
            }
        }
    }
}

