/*
 * Decompiled with CFR 0.152.
 */
package org.graylog.plugins.views.storage.migration.state.actions;

import com.codahale.metrics.Counter;
import com.codahale.metrics.MetricRegistry;
import com.google.inject.assistedinject.Assisted;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.graylog.plugins.views.storage.migration.state.actions.DatanodeDirectoryCompatibilityCheckResource;
import org.graylog.plugins.views.storage.migration.state.actions.DatanodeOpensearchClusterCheckResource;
import org.graylog.plugins.views.storage.migration.state.actions.MigrationActions;
import org.graylog.plugins.views.storage.migration.state.actions.OpensearchNodeLock;
import org.graylog.plugins.views.storage.migration.state.actions.TrafficSnapshot;
import org.graylog.plugins.views.storage.migration.state.machine.MigrationStateMachineContext;
import org.graylog.security.certutil.CaKeystore;
import org.graylog2.bootstrap.preflight.PreflightConfigResult;
import org.graylog2.bootstrap.preflight.PreflightConfigService;
import org.graylog2.cluster.NodeNotFoundException;
import org.graylog2.cluster.nodes.DataNodeDto;
import org.graylog2.cluster.nodes.DataNodeStatus;
import org.graylog2.cluster.nodes.NodeService;
import org.graylog2.datanode.DataNodeCommandService;
import org.graylog2.datanode.DatanodeStartType;
import org.graylog2.featureflag.FeatureFlags;
import org.graylog2.indexer.datanode.RemoteReindexRequest;
import org.graylog2.indexer.datanode.RemoteReindexingMigrationAdapter;
import org.graylog2.notifications.Notification;
import org.graylog2.notifications.NotificationService;
import org.graylog2.plugin.certificates.RenewalPolicy;
import org.graylog2.plugin.cluster.ClusterConfigService;
import org.graylog2.rest.resources.datanodes.DatanodeRestApiProxy;
import org.graylog2.shared.utilities.StringUtils;
import org.graylog2.storage.providers.ElasticsearchVersionProvider;
import org.graylog2.system.processing.control.ClusterProcessingControl;
import org.graylog2.system.processing.control.ClusterProcessingControlFactory;
import org.graylog2.system.processing.control.RemoteProcessingControlResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MigrationActionsImpl
implements MigrationActions {
    private static final Logger LOG = LoggerFactory.getLogger(MigrationActionsImpl.class);
    private static final String FEATURE_FLAG_REMOTE_REINDEX_MIGRATION = "remote_reindex_migration";
    private final ClusterConfigService clusterConfigService;
    private final ClusterProcessingControlFactory clusterProcessingControlFactory;
    private final NodeService<DataNodeDto> nodeService;
    private final CaKeystore caKeystore;
    private final PreflightConfigService preflightConfigService;
    private final MigrationStateMachineContext stateMachineContext;
    private final DataNodeCommandService dataNodeCommandService;
    private final RemoteReindexingMigrationAdapter migrationService;
    private final MetricRegistry metricRegistry;
    private final DatanodeRestApiProxy datanodeProxy;
    private final ElasticsearchVersionProvider searchVersionProvider;
    private final List<URI> elasticsearchHosts;
    private final NotificationService notificationService;
    private final FeatureFlags featureFlags;

    @Inject
    public MigrationActionsImpl(@Assisted MigrationStateMachineContext stateMachineContext, ClusterConfigService clusterConfigService, NodeService<DataNodeDto> nodeService, CaKeystore caKeystore, DataNodeCommandService dataNodeCommandService, RemoteReindexingMigrationAdapter migrationService, ClusterProcessingControlFactory clusterProcessingControlFactory, PreflightConfigService preflightConfigService, MetricRegistry metricRegistry, DatanodeRestApiProxy datanodeProxy, ElasticsearchVersionProvider searchVersionProvider, @Named(value="elasticsearch_hosts") List<URI> elasticsearchHosts, NotificationService notificationService, FeatureFlags featureFlags) {
        this.stateMachineContext = stateMachineContext;
        this.clusterConfigService = clusterConfigService;
        this.nodeService = nodeService;
        this.caKeystore = caKeystore;
        this.dataNodeCommandService = dataNodeCommandService;
        this.clusterProcessingControlFactory = clusterProcessingControlFactory;
        this.migrationService = migrationService;
        this.preflightConfigService = preflightConfigService;
        this.metricRegistry = metricRegistry;
        this.datanodeProxy = datanodeProxy;
        this.searchVersionProvider = searchVersionProvider;
        this.elasticsearchHosts = elasticsearchHosts;
        this.notificationService = notificationService;
        this.featureFlags = featureFlags;
    }

    @Override
    public void runDirectoryCompatibilityCheck() {
        Collection results = this.datanodeProxy.remoteInterface("all", DatanodeDirectoryCompatibilityCheckResource.class, DatanodeDirectoryCompatibilityCheckResource::compatibility).values();
        this.stateMachineContext.addExtendedState("compatibilityCheckResult", results.stream().allMatch(r -> r.compatibilityErrors().isEmpty()));
        this.stateMachineContext.setResponse(results);
    }

    @Override
    public boolean isOldClusterStopped() {
        Map results = this.datanodeProxy.remoteInterface("all", DatanodeOpensearchClusterCheckResource.class, DatanodeOpensearchClusterCheckResource::checkLocks);
        boolean anyLocked = results.values().stream().anyMatch(v -> v.locks().stream().anyMatch(OpensearchNodeLock::locked));
        if (anyLocked) {
            results.forEach((key, value) -> value.locks().stream().filter(OpensearchNodeLock::locked).forEach(v -> LOG.info("Data directory of datanode {} is still locked by another Opensearch process. Lock file: {}", key, (Object)v.path().toAbsolutePath())));
        }
        return !anyLocked;
    }

    @Override
    public void rollingUpgradeSelected() {
        Counter traffic = (Counter)this.metricRegistry.getMetrics().get("org.graylog2.traffic.input");
        this.stateMachineContext.addExtendedState("traffic_snapshot", new TrafficSnapshot(traffic.getCount()));
    }

    @Override
    public boolean directoryCompatibilityCheckOk() {
        return this.stateMachineContext.getExtendedState("compatibilityCheckResult", Boolean.class).orElse(false);
    }

    @Override
    public void reindexUpgradeSelected() {
    }

    @Override
    public void stopMessageProcessing() {
        String authToken = (String)this.stateMachineContext.getExtendedState("authToken");
        ClusterProcessingControl<RemoteProcessingControlResource> control = this.clusterProcessingControlFactory.create(authToken);
        LOG.info("Attempting to pause processing on all nodes...");
        control.pauseProcessing();
        LOG.info("Done pausing processing on all nodes.");
        LOG.info("Waiting for output buffer to drain on all nodes...");
        control.waitForEmptyBuffers();
        LOG.info("Done waiting for output buffer to drain on all nodes.");
    }

    @Override
    public void startMessageProcessing() {
        String authToken = (String)this.stateMachineContext.getExtendedState("authToken");
        ClusterProcessingControl<RemoteProcessingControlResource> control = this.clusterProcessingControlFactory.create(authToken);
        LOG.info("Resuming message processing.");
        control.resumeGraylogMessageProcessing();
    }

    @Override
    public boolean caDoesNotExist() {
        return !this.caKeystore.exists();
    }

    @Override
    public boolean renewalPolicyDoesNotExist() {
        return this.clusterConfigService.get(RenewalPolicy.class) == null;
    }

    @Override
    public boolean caAndRenewalPolicyExist() {
        return !this.caDoesNotExist() && !this.renewalPolicyDoesNotExist();
    }

    @Override
    public boolean compatibleDatanodesRunning() {
        Map<String, DataNodeDto> nodes = this.nodeService.allActive();
        return !nodes.isEmpty() && nodes.values().stream().allMatch(DataNodeDto::isCompatibleWithVersion);
    }

    @Override
    public void provisionDataNodes() {
        PreflightConfigResult preflight = this.preflightConfigService.getPreflightConfigResult();
        if (preflight == null || !preflight.equals((Object)PreflightConfigResult.PREPARED)) {
            this.preflightConfigService.setConfigResult(PreflightConfigResult.PREPARED);
        }
        Map<String, DataNodeDto> activeDataNodes = this.nodeService.allActive();
        activeDataNodes.values().stream().filter(node -> node.getDataNodeStatus() != DataNodeStatus.AVAILABLE).forEach(nodeDto -> this.triggerCSR((DataNodeDto)nodeDto, DatanodeStartType.MANUALLY));
    }

    private void triggerCSR(DataNodeDto nodeDto, DatanodeStartType startType) {
        try {
            this.dataNodeCommandService.triggerCertificateSigningRequest(nodeDto.getNodeId(), startType);
        }
        catch (NodeNotFoundException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void provisionAndStartDataNodes() {
        Map<String, DataNodeDto> activeDataNodes = this.nodeService.allActive();
        activeDataNodes.values().stream().filter(node -> node.getDataNodeStatus() != DataNodeStatus.AVAILABLE).forEach(nodeDto -> this.triggerCSR((DataNodeDto)nodeDto, DatanodeStartType.AUTOMATICALLY));
    }

    @Override
    public boolean provisioningFinished() {
        return this.nodeService.allActive().values().stream().allMatch(node -> node.getDataNodeStatus() == DataNodeStatus.AVAILABLE);
    }

    @Override
    public boolean allDatanodesPrepared() {
        return this.nodeService.allActive().values().stream().allMatch(node -> node.getDataNodeStatus() == DataNodeStatus.PREPARED);
    }

    @Override
    public void startDataNodes() {
        Map<String, DataNodeDto> activeDataNodes = this.nodeService.allActive();
        activeDataNodes.values().forEach(this::startDataNode);
    }

    private void startDataNode(DataNodeDto node) {
        try {
            this.dataNodeCommandService.startNode(node.getNodeId());
        }
        catch (NodeNotFoundException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public boolean allDatanodesAvailable() {
        Map<String, DataNodeDto> activeNodes = this.nodeService.allActive();
        return !activeNodes.isEmpty() && activeNodes.values().stream().allMatch(node -> node.getDataNodeStatus() == DataNodeStatus.AVAILABLE);
    }

    @Override
    public void setPreflightFinished() {
        PreflightConfigResult preflight = this.preflightConfigService.getPreflightConfigResult();
        if (preflight == null || !preflight.equals((Object)PreflightConfigResult.FINISHED)) {
            this.preflightConfigService.setConfigResult(PreflightConfigResult.FINISHED);
        }
    }

    @Override
    public void startRemoteReindex() {
        String allowlist = this.stateMachineContext.getActionArgumentOpt("allowlist", String.class).orElse(null);
        String host = StringUtils.requireNonBlank(this.stateMachineContext.getActionArgument("hostname", String.class), "hostname has to be provided");
        if (host.endsWith("/")) {
            host = host.substring(0, host.length() - 1);
        }
        URI hostname = URI.create(host);
        String user = this.stateMachineContext.getActionArgumentOpt("user", String.class).orElse(null);
        String password = this.stateMachineContext.getActionArgumentOpt("password", String.class).orElse(null);
        List<String> indices = this.stateMachineContext.getActionArgumentOpt("indices", List.class).orElse(Collections.emptyList());
        boolean trustUnknownCerts = this.stateMachineContext.getActionArgumentOpt("trust_unknown_certs", Boolean.class).orElse(false);
        int threadsCount = this.stateMachineContext.getActionArgumentOpt("threads", Integer.class).orElse(4);
        String migrationID = this.migrationService.start(new RemoteReindexRequest(allowlist, hostname, user, password, indices, threadsCount, trustUnknownCerts));
        this.stateMachineContext.addExtendedState("migrationID", migrationID);
    }

    @Override
    public void requestMigrationStatus() {
        this.stateMachineContext.getExtendedState("migrationID", String.class).map(this.migrationService::status).ifPresent(status -> this.stateMachineContext.setResponse(status));
    }

    @Override
    public void calculateTrafficEstimate() {
        Counter currentTraffic = (Counter)this.metricRegistry.getMetrics().get("org.graylog2.traffic.input");
        MigrationStateMachineContext context = this.stateMachineContext;
        if (context.getExtendedState("estimated_traffic") == null) {
            context.getExtendedState("traffic_snapshot", TrafficSnapshot.class).ifPresent(traffic -> context.addExtendedState("estimated_traffic", traffic.calculateEstimatedTrafficPerMinute(currentTraffic.getCount())));
        }
    }

    @Override
    public void verifyRemoteIndexerConnection() {
        URI hostname = Objects.requireNonNull(URI.create(this.stateMachineContext.getActionArgument("hostname", String.class)), "hostname has to be provided");
        String user = this.stateMachineContext.getActionArgumentOpt("user", String.class).orElse(null);
        String password = this.stateMachineContext.getActionArgumentOpt("password", String.class).orElse(null);
        boolean trustUnknownCerts = this.stateMachineContext.getActionArgumentOpt("trust_unknown_certs", Boolean.class).orElse(false);
        String allowlist = this.stateMachineContext.getActionArgumentOpt("allowlist", String.class).orElse(null);
        this.stateMachineContext.setResponse(this.migrationService.checkConnection(hostname, user, password, allowlist, trustUnknownCerts));
    }

    @Override
    public boolean isCompatibleInPlaceMigrationVersion() {
        return !this.searchVersionProvider.get().isElasticsearch();
    }

    @Override
    public void getElasticsearchHosts() {
        this.stateMachineContext.setResponse(Map.of("elasticsearch_hosts", this.elasticsearchHosts.stream().map(URI::toString).collect(Collectors.joining(",")), "allowlist_hosts", this.elasticsearchHosts.stream().map(host -> host.getHost() + ":" + host.getPort()).collect(Collectors.joining(","))));
    }

    @Override
    public boolean isRemoteReindexingFinished() {
        return Optional.ofNullable(this.stateMachineContext).flatMap(ctx -> ctx.getExtendedState("migrationID", String.class)).map(this.migrationService::status).filter(m -> m.status() == RemoteReindexingMigrationAdapter.Status.FINISHED).isPresent();
    }

    @Override
    public void stopDatanodes() {
        this.nodeService.allActive().values().stream().filter(n -> n.getDataNodeStatus() == DataNodeStatus.AVAILABLE).forEach(node -> {
            try {
                this.dataNodeCommandService.stopNode(node.getNodeId());
            }
            catch (Exception exception) {
                // empty catch block
            }
        });
    }

    @Override
    public void finishRemoteReindexMigration() {
        this.notificationService.destroyAllByType(Notification.Type.REMOTE_REINDEX_FINISHED);
    }

    @Override
    public boolean isRemoteReindexMigrationEnabled() {
        return this.featureFlags.isOn(FEATURE_FLAG_REMOTE_REINDEX_MIGRATION);
    }
}

