/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.dataflow.util;

import com.google.api.services.dataflow.model.JobMessage;
import com.google.api.services.dataflow.model.ListJobMessagesResponse;
import java.io.IOException;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import org.apache.beam.repackaged.beam_runners_google_cloud_dataflow_java.com.google.common.base.MoreObjects;
import org.apache.beam.repackaged.beam_runners_google_cloud_dataflow_java.com.google.common.base.Strings;
import org.apache.beam.repackaged.beam_runners_google_cloud_dataflow_java.com.google.common.collect.ImmutableMap;
import org.apache.beam.runners.dataflow.DataflowClient;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.TimeUtil;
import org.apache.beam.sdk.PipelineResult;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MonitoringUtil {
    private static final String GCLOUD_DATAFLOW_PREFIX = "gcloud dataflow";
    private static final String ENDPOINT_OVERRIDE_ENV_VAR = "CLOUDSDK_API_ENDPOINT_OVERRIDES_DATAFLOW";
    private static final Map<String, PipelineResult.State> DATAFLOW_STATE_TO_JOB_STATE = ImmutableMap.builder().put("JOB_STATE_UNKNOWN", PipelineResult.State.UNKNOWN).put("JOB_STATE_STOPPED", PipelineResult.State.STOPPED).put("JOB_STATE_RUNNING", PipelineResult.State.RUNNING).put("JOB_STATE_DONE", PipelineResult.State.DONE).put("JOB_STATE_FAILED", PipelineResult.State.FAILED).put("JOB_STATE_CANCELLED", PipelineResult.State.CANCELLED).put("JOB_STATE_UPDATED", PipelineResult.State.UPDATED).put("JOB_STATE_DRAINING", PipelineResult.State.RUNNING).put("JOB_STATE_DRAINED", PipelineResult.State.DONE).build();
    private static final String JOB_MESSAGE_ERROR = "JOB_MESSAGE_ERROR";
    private static final String JOB_MESSAGE_WARNING = "JOB_MESSAGE_WARNING";
    private static final String JOB_MESSAGE_BASIC = "JOB_MESSAGE_BASIC";
    private static final String JOB_MESSAGE_DETAILED = "JOB_MESSAGE_DETAILED";
    private static final String JOB_MESSAGE_DEBUG = "JOB_MESSAGE_DEBUG";
    private final DataflowClient dataflowClient;

    public MonitoringUtil(DataflowClient dataflowClient) {
        this.dataflowClient = dataflowClient;
    }

    public List<JobMessage> getJobMessages(String jobId, long startTimestampMs) throws IOException {
        Instant startTimestamp = new Instant(startTimestampMs);
        ArrayList<JobMessage> allMessages = new ArrayList<JobMessage>();
        String pageToken = null;
        while (true) {
            ListJobMessagesResponse response;
            if ((response = this.dataflowClient.listJobMessages(jobId, pageToken)) == null || response.getJobMessages() == null) {
                return allMessages;
            }
            for (JobMessage m : response.getJobMessages()) {
                Instant timestamp = TimeUtil.fromCloudTime(m.getTime());
                if (timestamp == null || !timestamp.isAfter((ReadableInstant)startTimestamp)) continue;
                allMessages.add(m);
            }
            if (response.getNextPageToken() == null) break;
            pageToken = response.getNextPageToken();
        }
        allMessages.sort(new TimeStampComparator());
        return allMessages;
    }

    @Deprecated
    public static String getJobMonitoringPageURL(String projectName, String jobId) {
        return MonitoringUtil.getJobMonitoringPageURL(projectName, "us-central1", jobId);
    }

    public static String getJobMonitoringPageURL(String projectName, String regionId, String jobId) {
        try {
            return String.format("https://console.cloud.google.com/dataflow/jobsDetail/locations/%s/jobs/%s?project=%s", URLEncoder.encode(regionId, StandardCharsets.UTF_8.name()), URLEncoder.encode(jobId, StandardCharsets.UTF_8.name()), URLEncoder.encode(projectName, StandardCharsets.UTF_8.name()));
        }
        catch (UnsupportedEncodingException e) {
            throw new AssertionError("UTF-8 encoding is not supported by the environment", e);
        }
    }

    public static String getGcloudCancelCommand(DataflowPipelineOptions options, String jobId) {
        String dataflowApiOverridePrefix = "";
        String apiUrl = options.getDataflowClient().getBaseUrl();
        if (!apiUrl.equals("https://dataflow.googleapis.com/")) {
            dataflowApiOverridePrefix = String.format("%s=%s ", ENDPOINT_OVERRIDE_ENV_VAR, apiUrl);
        }
        return String.format("%s%s jobs --project=%s cancel --region=%s %s", dataflowApiOverridePrefix, GCLOUD_DATAFLOW_PREFIX, options.getProject(), options.getRegion(), jobId);
    }

    public static PipelineResult.State toState(String stateName) {
        return MoreObjects.firstNonNull(DATAFLOW_STATE_TO_JOB_STATE.get(stateName), PipelineResult.State.UNKNOWN);
    }

    public static class TimeStampComparator
    implements Comparator<JobMessage>,
    Serializable {
        @Override
        public int compare(JobMessage o1, JobMessage o2) {
            Instant t1 = TimeUtil.fromCloudTime(o1.getTime());
            if (t1 == null) {
                return -1;
            }
            Instant t2 = TimeUtil.fromCloudTime(o2.getTime());
            if (t2 == null) {
                return 1;
            }
            return t1.compareTo((ReadableInstant)t2);
        }
    }

    public static class LoggingHandler
    implements JobMessagesHandler {
        private static final Logger LOG = LoggerFactory.getLogger(LoggingHandler.class);

        @Override
        public void process(List<JobMessage> messages) {
            block13: for (JobMessage message : messages) {
                if (Strings.isNullOrEmpty(message.getMessageText())) continue;
                Instant time = TimeUtil.fromCloudTime(message.getTime());
                String logMessage = (time == null ? "UNKNOWN TIMESTAMP: " : time + ": ") + message.getMessageText();
                switch (message.getMessageImportance()) {
                    case "JOB_MESSAGE_ERROR": {
                        LOG.error(logMessage);
                        continue block13;
                    }
                    case "JOB_MESSAGE_WARNING": {
                        LOG.warn(logMessage);
                        continue block13;
                    }
                    case "JOB_MESSAGE_BASIC": 
                    case "JOB_MESSAGE_DETAILED": {
                        LOG.info(logMessage);
                        continue block13;
                    }
                    case "JOB_MESSAGE_DEBUG": {
                        LOG.debug(logMessage);
                        continue block13;
                    }
                }
                LOG.trace(logMessage);
            }
        }
    }

    public static interface JobMessagesHandler {
        public void process(List<JobMessage> var1);
    }
}

