/*
 * Decompiled with CFR 0.152.
 */
package org.graylog2.system.processing.control;

import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.RetryListener;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.graylog2.cluster.Node;
import org.graylog2.cluster.nodes.NodeService;
import org.graylog2.cluster.nodes.ServerNodeDto;
import org.graylog2.rest.RemoteInterfaceProvider;
import org.graylog2.shared.utilities.StringUtils;
import org.graylog2.system.processing.control.ClusterProcessingControlException;
import org.graylog2.system.processing.control.OutputBufferDrainFailureException;
import org.graylog2.system.processing.control.RemoteProcessingControlResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import retrofit2.Call;
import retrofit2.Response;

public class ClusterProcessingControl<F extends RemoteProcessingControlResource> {
    private final Logger LOG = LoggerFactory.getLogger(ClusterProcessingControl.class);
    private static final String OUTPUT_RATE_METRIC_NAME = "org.graylog2.throughput.output.1-sec-rate";
    protected final String authorizationToken;
    protected final RemoteInterfaceProvider remoteInterfaceProvider;
    protected final NodeService<ServerNodeDto> nodeService;
    protected final com.github.joschi.jadconfig.util.Duration connectionTimeout;
    private final com.github.joschi.jadconfig.util.Duration bufferDrainInterval;
    private final int maxBufferDrainRetries;

    public ClusterProcessingControl(String authorizationToken, RemoteInterfaceProvider remoteInterfaceProvider, NodeService<ServerNodeDto> nodeService, com.github.joschi.jadconfig.util.Duration connectionTimeout, com.github.joschi.jadconfig.util.Duration bufferDrainInterval, int maxBufferDrainRetries) {
        this.authorizationToken = authorizationToken;
        this.remoteInterfaceProvider = remoteInterfaceProvider;
        this.nodeService = nodeService;
        this.connectionTimeout = connectionTimeout;
        this.bufferDrainInterval = bufferDrainInterval;
        this.maxBufferDrainRetries = maxBufferDrainRetries;
    }

    public void pauseProcessing() {
        this.runOnAllActiveNodes("pause processing", RemoteProcessingControlResource::pauseProcessing, true);
    }

    protected <R> Map<String, R> runOnAllActiveNodes(String operationName, Function<F, Call<R>> callRemoteResource, boolean stopOnFirstException) {
        HashMap result = new HashMap();
        ArrayList exceptions = new ArrayList();
        this.printNodeDebugInfo();
        this.nodeService.allActive().entrySet().forEach(entry -> {
            block5: {
                Node nodeValue = (Node)entry.getValue();
                try {
                    this.LOG.info("Attempting to call '{}' on node [{}].", (Object)operationName, (Object)nodeValue.getNodeId());
                    Response response = this.getrResponse(callRemoteResource, (Map.Entry<String, ServerNodeDto>)entry);
                    if (!response.isSuccessful()) {
                        String message = StringUtils.f("Unable to call '%s' on node [%s] code [%s] body [%s]", operationName, nodeValue.getNodeId(), response.code(), response.body());
                        this.LOG.error("Unable to call '{}' on node [{}] code [{}] body [{}].", new Object[]{operationName, nodeValue.getNodeId(), response.code(), response.body()});
                        throw new ClusterProcessingControlException(message);
                    }
                    result.put((String)entry.getKey(), response.body());
                    this.LOG.info("Successfully called '{}' on node [{}].", (Object)operationName, (Object)nodeValue.getNodeId());
                }
                catch (Exception e) {
                    if (e instanceof ClusterProcessingControlException) {
                        exceptions.add((ClusterProcessingControlException)e);
                    } else {
                        String message = StringUtils.f("Unable to call '%s' on node [%s]", operationName, nodeValue.getNodeId());
                        this.LOG.error(message, (Throwable)e);
                        exceptions.add(new ClusterProcessingControlException(message, e));
                    }
                    if (!stopOnFirstException) break block5;
                    throw (ClusterProcessingControlException)exceptions.get(0);
                }
            }
        });
        if (!exceptions.isEmpty()) {
            throw (ClusterProcessingControlException)exceptions.get(0);
        }
        return result;
    }

    protected <R> Response<R> getrResponse(Function<F, Call<R>> callRemoteResource, Map.Entry<String, ServerNodeDto> entry) throws IOException {
        RemoteProcessingControlResource remoteProcessingControlResource = this.remoteInterfaceProvider.get(entry.getValue(), this.authorizationToken, RemoteProcessingControlResource.class, Duration.ofSeconds(this.connectionTimeout.toSeconds()));
        return callRemoteResource.apply(remoteProcessingControlResource).execute();
    }

    public void waitForEmptyBuffers() throws OutputBufferDrainFailureException {
        this.printNodeDebugInfo();
        Retryer retryer = RetryerBuilder.newBuilder().retryIfResult(value -> !value.success).withWaitStrategy(WaitStrategies.fixedWait((long)this.bufferDrainInterval.toSeconds(), (TimeUnit)TimeUnit.SECONDS)).withStopStrategy(StopStrategies.stopAfterAttempt((int)this.maxBufferDrainRetries)).withRetryListener(new RetryListener(){

            public <V> void onRetry(Attempt<V> attempt) {
                if (attempt.getAttemptNumber() > 1L) {
                    ClusterProcessingControl.this.LOG.info("Checking again for empty output buffers (attempt #{}).", (Object)attempt.getAttemptNumber());
                }
            }
        }).build();
        try {
            retryer.call(() -> {
                Map<String, Double> nodeOutputRateMap = this.runOnAllActiveNodes("fetching output rate metric value", res -> res.getMetric(OUTPUT_RATE_METRIC_NAME), true).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> (Double)((HashMap)entry.getValue()).get("value")));
                boolean allZero = new HashSet<Double>(nodeOutputRateMap.values()).stream().allMatch(this::isOutputRateCloseToZero);
                Set<String> nonZeroNodes = nodeOutputRateMap.entrySet().stream().filter(e -> !this.isOutputRateCloseToZero((Double)e.getValue())).map(Map.Entry::getKey).collect(Collectors.toSet());
                if (allZero) {
                    this.LOG.info("Output buffer is now empty on all nodes.");
                } else {
                    this.LOG.info("Output rate has not yet reached zero on nodes [{}].", nonZeroNodes);
                }
                return new NodeOperationResult(allZero, nonZeroNodes);
            });
        }
        catch (RetryException e) {
            String message = StringUtils.f("The [%s] rate failed to reach zero on all nodes in [%s] with [%s] retries. Giving up. This is configurable with the [%s] and [%s] configuration properties", OUTPUT_RATE_METRIC_NAME, this.bufferDrainInterval.toSeconds(), this.maxBufferDrainRetries, "install_output_buffer_drain_interval", "install_output_buffer_max_retries");
            this.LOG.error(message);
            throw new OutputBufferDrainFailureException(this.bufferDrainInterval.toSeconds(), this.maxBufferDrainRetries, ClusterProcessingControl.tryGetExceptionNodes(e));
        }
        catch (Exception e) {
            throw new ClusterProcessingControlException("Failed to request node output rate on all nodes.", e);
        }
    }

    protected static Set<String> tryGetExceptionNodes(RetryException e) {
        try {
            return ((NodeOperationResult)e.getLastFailedAttempt().get()).nonZeroOutputRateNodeIds();
        }
        catch (ExecutionException ex) {
            return Collections.emptySet();
        }
    }

    protected boolean isOutputRateCloseToZero(double outputRate) {
        return outputRate < 1.0E-4;
    }

    public void resumeGraylogMessageProcessing() {
        this.LOG.info("Attempting to resume processing on all nodes...");
        this.runOnAllActiveNodes("resume processing", RemoteProcessingControlResource::resumeProcessing, false);
        this.LOG.info("Done resuming processing on all nodes.");
    }

    protected void printNodeDebugInfo() {
        if (this.LOG.isDebugEnabled()) {
            this.LOG.debug("The Graylog cluster contains the following nodes:");
            this.nodeService.allActive().entrySet().forEach(entry -> {
                Node node = (Node)entry.getValue();
                this.LOG.debug("Node ID [{}] Transport Address [{}] Last Seen [{}]", new Object[]{node.getNodeId(), node.getTransportAddress(), node.getLastSeen()});
            });
        }
    }

    public record NodeOperationResult(boolean success, Set<String> nonZeroOutputRateNodeIds) {
    }
}

