/*
 * Decompiled with CFR 0.152.
 */
package org.openmetadata.service.clients.pipeline.airflow;

import com.fasterxml.jackson.core.type.TypeReference;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.security.KeyStoreException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import javax.net.ssl.SSLContext;
import javax.ws.rs.core.Response;
import org.apache.http.client.utils.URIBuilder;
import org.json.JSONObject;
import org.openmetadata.schema.ServiceEntityInterface;
import org.openmetadata.schema.api.configuration.pipelineServiceClient.PipelineServiceClientConfiguration;
import org.openmetadata.schema.entity.app.App;
import org.openmetadata.schema.entity.app.AppMarketPlaceDefinition;
import org.openmetadata.schema.entity.automations.Workflow;
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineServiceClientResponse;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus;
import org.openmetadata.sdk.PipelineServiceClient;
import org.openmetadata.sdk.exception.PipelineServiceClientException;
import org.openmetadata.service.exception.IngestionPipelineDeploymentException;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.SSLUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AirflowRESTClient
extends PipelineServiceClient {
    private static final Logger LOG = LoggerFactory.getLogger(AirflowRESTClient.class);
    private static final String PLATFORM = "Airflow";
    private static final String USERNAME_KEY = "username";
    private static final String PASSWORD_KEY = "password";
    private static final String TIMEOUT_KEY = "timeout";
    private static final String TRUSTSTORE_PATH_KEY = "truststorePath";
    private static final String TRUSTSTORE_PASSWORD_KEY = "truststorePassword";
    private static final String DOCS_LINK = "Follow [this guide](https://docs.open-metadata.org/deployment/ingestion/openmetadata) for further details.";
    protected final String username;
    protected final String password;
    protected final HttpClient client;
    protected final URL serviceURL;
    private static final List<String> API_ENDPOINT_SEGMENTS = List.of("api", "v1", "openmetadata");
    private static final String DAG_ID = "dag_id";

    public AirflowRESTClient(PipelineServiceClientConfiguration config) throws KeyStoreException {
        super(config);
        this.setPlatform(PLATFORM);
        this.username = (String)config.getParameters().getAdditionalProperties().get(USERNAME_KEY);
        this.password = (String)config.getParameters().getAdditionalProperties().get(PASSWORD_KEY);
        this.serviceURL = this.validateServiceURL(config.getApiEndpoint());
        SSLContext sslContext = AirflowRESTClient.createAirflowSSLContext(config);
        HttpClient.Builder clientBuilder = HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1).connectTimeout(Duration.ofSeconds(((Integer)config.getParameters().getAdditionalProperties().get(TIMEOUT_KEY)).intValue()));
        this.client = sslContext == null ? clientBuilder.build() : clientBuilder.sslContext(sslContext).build();
    }

    private static SSLContext createAirflowSSLContext(PipelineServiceClientConfiguration config) throws KeyStoreException {
        String truststorePath = (String)config.getParameters().getAdditionalProperties().get(TRUSTSTORE_PATH_KEY);
        String truststorePassword = (String)config.getParameters().getAdditionalProperties().get(TRUSTSTORE_PASSWORD_KEY);
        return SSLUtil.createSSLContext(truststorePath, truststorePassword, PLATFORM);
    }

    public final HttpResponse<String> post(String endpoint, String payload, boolean authenticate) throws IOException, InterruptedException {
        HttpRequest.Builder requestBuilder = HttpRequest.newBuilder(URI.create(endpoint)).header("Content-Type", "application/json").POST(HttpRequest.BodyPublishers.ofString(payload));
        if (authenticate) {
            requestBuilder.header("Authorization", this.getBasicAuthenticationHeader(this.username, this.password));
        }
        return this.client.send(requestBuilder.build(), HttpResponse.BodyHandlers.ofString());
    }

    public final HttpResponse<String> post(String endpoint, String payload) throws IOException, InterruptedException {
        return this.post(endpoint, payload, true);
    }

    public PipelineServiceClientResponse deployPipeline(IngestionPipeline ingestionPipeline, ServiceEntityInterface service) {
        HttpResponse<String> response;
        try {
            String deployUrl = this.buildURI("deploy").build().toString();
            String pipelinePayload = JsonUtils.pojoToJson(ingestionPipeline);
            response = this.post(deployUrl, pipelinePayload);
            if (response.statusCode() == 200) {
                ingestionPipeline.setDeployed(Boolean.valueOf(true));
                return this.getResponse(200, response.body());
            }
        }
        catch (IOException | URISyntaxException e) {
            throw IngestionPipelineDeploymentException.byMessage(ingestionPipeline.getName(), "DEPLOYMENT_ERROR", e.getMessage());
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw IngestionPipelineDeploymentException.byMessage(ingestionPipeline.getName(), "DEPLOYMENT_ERROR", e.getMessage());
        }
        throw new PipelineServiceClientException(String.format("%s Failed to deploy Ingestion Pipeline due to airflow API returned %s and response %s", ingestionPipeline.getName(), Response.Status.fromStatusCode((int)response.statusCode()), response.body()));
    }

    public PipelineServiceClientResponse deletePipeline(IngestionPipeline ingestionPipeline) {
        String pipelineName = ingestionPipeline.getName();
        try {
            URIBuilder uri = this.buildURI("delete");
            uri.addParameter(DAG_ID, pipelineName);
            HttpResponse<String> response = this.deleteRequestAuthenticatedForJsonContent(uri.build().toString());
            if (response.statusCode() == 200) {
                return this.getResponse(200, response.body());
            }
        }
        catch (IOException | URISyntaxException e) {
            LOG.error(String.format("Failed to delete Airflow Pipeline %s from Airflow DAGS", pipelineName));
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOG.error(String.format("Failed to delete Airflow Pipeline %s from Airflow DAGS", pipelineName));
        }
        return this.getResponse(500, String.format("Failed to delete Airflow Pipeline %s from Airflow DAGS", pipelineName));
    }

    public PipelineServiceClientResponse runPipeline(IngestionPipeline ingestionPipeline, ServiceEntityInterface service) {
        HttpResponse<String> response;
        String pipelineName = ingestionPipeline.getName();
        try {
            String triggerUrl = this.buildURI("trigger").build().toString();
            JSONObject requestPayload = new JSONObject();
            requestPayload.put(DAG_ID, (Object)pipelineName);
            response = this.post(triggerUrl, requestPayload.toString());
            if (response.statusCode() == 200) {
                return this.getResponse(200, response.body());
            }
        }
        catch (IOException | URISyntaxException e) {
            throw IngestionPipelineDeploymentException.byMessage(pipelineName, "TRIGGER_ERROR", e.getMessage());
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw IngestionPipelineDeploymentException.byMessage(pipelineName, "TRIGGER_ERROR", e.getMessage());
        }
        throw IngestionPipelineDeploymentException.byMessage(pipelineName, "TRIGGER_ERROR", "Failed to trigger IngestionPipeline", Response.Status.fromStatusCode((int)response.statusCode()));
    }

    public PipelineServiceClientResponse toggleIngestion(IngestionPipeline ingestionPipeline) {
        HttpResponse<String> response;
        try {
            JSONObject requestPayload = new JSONObject();
            requestPayload.put(DAG_ID, (Object)ingestionPipeline.getName());
            if (ingestionPipeline.getEnabled().equals(Boolean.TRUE)) {
                String toggleUrl = this.buildURI("disable").build().toString();
                response = this.post(toggleUrl, requestPayload.toString());
                if (response.statusCode() == 200) {
                    ingestionPipeline.setEnabled(Boolean.valueOf(false));
                    return this.getResponse(200, response.body());
                }
                if (response.statusCode() == 404) {
                    ingestionPipeline.setDeployed(Boolean.valueOf(false));
                    return this.getResponse(404, response.body());
                }
            } else {
                String toggleUrl = this.buildURI("enable").build().toString();
                response = this.post(toggleUrl, requestPayload.toString());
                if (response.statusCode() == 200) {
                    ingestionPipeline.setEnabled(Boolean.valueOf(true));
                    ingestionPipeline.setEnabled(Boolean.valueOf(false));
                } else if (response.statusCode() == 404) {
                    ingestionPipeline.setDeployed(Boolean.valueOf(false));
                    return this.getResponse(404, response.body());
                }
            }
        }
        catch (IOException | URISyntaxException e) {
            throw this.clientException(ingestionPipeline, e);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw this.clientException(ingestionPipeline, (Exception)e);
        }
        throw this.clientException(ingestionPipeline, "Failed to toggle ingestion pipeline state", response);
    }

    public List<PipelineStatus> getQueuedPipelineStatusInternal(IngestionPipeline ingestionPipeline) {
        HttpResponse<String> response;
        try {
            URIBuilder uri = this.buildURI("status");
            uri.addParameter(DAG_ID, ingestionPipeline.getName());
            uri.addParameter("only_queued", "true");
            response = this.getRequestAuthenticatedForJsonContent(uri.build().toString());
            if (response.statusCode() == 200) {
                return JsonUtils.readObjects(response.body(), PipelineStatus.class);
            }
        }
        catch (IOException | URISyntaxException e) {
            throw this.clientException(ingestionPipeline, e);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw this.clientException(ingestionPipeline, (Exception)e);
        }
        LOG.error(String.format("Got status code [%s] trying to get queued statuses: [%s]", response.statusCode(), response.body()));
        return new ArrayList<PipelineStatus>();
    }

    public PipelineServiceClientResponse getServiceStatusInternal() {
        try {
            String healthUrl = this.buildURI("health-auth").build().toString();
            HttpResponse<String> response = this.getRequestAuthenticatedForJsonContent(healthUrl);
            if (response.statusCode() == 200) {
                JSONObject responseJSON = new JSONObject(response.body());
                String ingestionVersion = responseJSON.getString("version");
                return Boolean.TRUE.equals(this.validServerClientVersions(ingestionVersion)) ? this.buildHealthyStatus(ingestionVersion) : this.buildUnhealthyStatus(this.buildVersionMismatchErrorMessage(ingestionVersion, SERVER_VERSION));
            }
            if (response.statusCode() == 401 || response.statusCode() == 403) {
                return this.buildUnhealthyStatus(String.format("Authentication failed for user [%s] trying to access the Airflow APIs.", this.username));
            }
            if (response.statusCode() == 404) {
                return this.buildUnhealthyStatus(String.format("Airflow APIs not found. Please validate if the OpenMetadata Airflow plugin is installed correctly. %s", DOCS_LINK));
            }
            return this.buildUnhealthyStatus(String.format("Unexpected status response: code [%s] - [%s]", response.statusCode(), response.body()));
        }
        catch (IOException | URISyntaxException e) {
            String exceptionMsg = e.getMessage() != null ? String.format("Failed to get Airflow status due to [%s].", e.getMessage()) : "Failed to connect to Airflow.";
            return this.buildUnhealthyStatus(String.format("%s %s", exceptionMsg, DOCS_LINK));
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return this.buildUnhealthyStatus(String.format("Failed to connect to Airflow. %s", DOCS_LINK));
        }
    }

    public PipelineServiceClientResponse runAutomationsWorkflow(Workflow workflow) {
        HttpResponse<String> response;
        try {
            String automationsUrl = this.buildURI("run_automation").build().toString();
            String workflowPayload = JsonUtils.pojoToJson(workflow);
            response = this.post(automationsUrl, workflowPayload);
            if (response.statusCode() == 200) {
                return this.getResponse(200, response.body());
            }
        }
        catch (IOException | URISyntaxException e) {
            throw IngestionPipelineDeploymentException.byMessage(workflow.getName(), "TRIGGER_ERROR", "No response from the test connection. Make sure your service is reachable and accepting connections");
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw IngestionPipelineDeploymentException.byMessage(workflow.getName(), "TRIGGER_ERROR", e.getMessage());
        }
        throw new PipelineServiceClientException(String.format("%s Failed to trigger workflow due to airflow API returned %s and response %s", workflow.getName(), Response.Status.fromStatusCode((int)response.statusCode()), response.body()));
    }

    public PipelineServiceClientResponse runApplicationFlow(App application) {
        return this.sendPost("run_application", application);
    }

    public PipelineServiceClientResponse validateAppRegistration(AppMarketPlaceDefinition appMarketPlaceDefinition) {
        return this.getResponse(200, "Success");
    }

    private PipelineServiceClientResponse sendPost(String endpoint, Object request) {
        HttpResponse<String> response;
        String workflowPayload = JsonUtils.pojoToJson(request);
        try {
            String automationsUrl = this.buildURI(endpoint).build().toString();
            response = this.post(automationsUrl, workflowPayload);
            if (response.statusCode() == 200) {
                return this.getResponse(200, response.body());
            }
        }
        catch (IOException | URISyntaxException e) {
            throw IngestionPipelineDeploymentException.byMessage(workflowPayload, "DEPLOYMENT_ERROR", e.getMessage());
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw IngestionPipelineDeploymentException.byMessage(workflowPayload, "DEPLOYMENT_ERROR", e.getMessage());
        }
        throw new PipelineServiceClientException(String.format("%s Failed to trigger flow due to airflow API returned %s and response %s", workflowPayload, Response.Status.fromStatusCode((int)response.statusCode()), response.body()));
    }

    public PipelineServiceClientResponse killIngestion(IngestionPipeline ingestionPipeline) {
        HttpResponse<String> response;
        try {
            String killUrl = this.buildURI("kill").build().toString();
            JSONObject requestPayload = new JSONObject();
            requestPayload.put(DAG_ID, (Object)ingestionPipeline.getName());
            response = this.post(killUrl, requestPayload.toString());
            if (response.statusCode() == 200) {
                return this.getResponse(200, response.body());
            }
        }
        catch (IOException | URISyntaxException e) {
            throw this.clientException("Failed to kill running workflows", e);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw this.clientException("Failed to kill running workflows", (Exception)e);
        }
        throw new PipelineServiceClientException(String.format("Failed to kill running workflows due to %s", response.body()));
    }

    public Map<String, String> requestGetHostIp() {
        HttpResponse<String> response;
        try {
            response = this.getRequestAuthenticatedForJsonContent(this.buildURI("ip").build().toString());
            if (response.statusCode() == 200) {
                return JsonUtils.readValue(response.body(), new TypeReference<Map<String, String>>(){});
            }
        }
        catch (IOException | URISyntaxException e) {
            throw this.clientException("Failed to get Pipeline Service host IP.", e);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw this.clientException("Failed to get Pipeline Service host IP.", (Exception)e);
        }
        throw new PipelineServiceClientException(String.format("Failed to get Pipeline Service host IP due to %s", response.body()));
    }

    public Map<String, String> getLastIngestionLogs(IngestionPipeline ingestionPipeline, String after) {
        HttpResponse<String> response;
        String taskId = (String)TYPE_TO_TASK.get(ingestionPipeline.getPipelineType().toString());
        URIBuilder uri = this.buildURI("last_dag_logs");
        if (after != null) {
            uri.addParameter("after", after);
        }
        uri.addParameter(DAG_ID, ingestionPipeline.getName());
        uri.addParameter("task_id", taskId);
        try {
            response = this.getRequestAuthenticatedForJsonContent(uri.build().toString());
            if (response.statusCode() == 200) {
                return JsonUtils.readValue(response.body(), new TypeReference<Map<String, String>>(){});
            }
        }
        catch (IOException | URISyntaxException e) {
            throw this.clientException("Failed to get last ingestion logs.", e);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw this.clientException("Failed to get last ingestion logs.", (Exception)e);
        }
        throw new PipelineServiceClientException(String.format("Failed to get last ingestion logs due to %s", response.body()));
    }

    public URIBuilder buildURI(String path) {
        try {
            ArrayList<String> pathInternal = new ArrayList<String>(API_ENDPOINT_SEGMENTS);
            pathInternal.add(path);
            URIBuilder builder = new URIBuilder(String.valueOf(this.serviceURL));
            ArrayList<String> segments = new ArrayList<String>(builder.getPathSegments());
            segments.addAll(pathInternal);
            return builder.setPathSegments(segments);
        }
        catch (Exception e) {
            throw this.clientException(String.format("Failed to built request URI for path [%s].", path), e);
        }
    }

    private HttpResponse<String> getRequestAuthenticatedForJsonContent(String url) throws IOException, InterruptedException {
        HttpRequest request = this.authenticatedRequestBuilder(url).GET().build();
        return this.client.send(request, HttpResponse.BodyHandlers.ofString());
    }

    private HttpResponse<String> deleteRequestAuthenticatedForJsonContent(String url) throws IOException, InterruptedException {
        HttpRequest request = this.authenticatedRequestBuilder(url).DELETE().build();
        return this.client.send(request, HttpResponse.BodyHandlers.ofString());
    }

    private HttpRequest.Builder authenticatedRequestBuilder(String url) {
        return HttpRequest.newBuilder(URI.create(url)).header("Content-Type", "application/json").header("Authorization", this.getBasicAuthenticationHeader(this.username, this.password));
    }

    private PipelineServiceClientResponse getResponse(int code, String body) {
        return new PipelineServiceClientResponse().withCode(Integer.valueOf(code)).withReason(body).withPlatform(this.getPlatform());
    }

    private PipelineServiceClientException clientException(String message, Exception e) {
        return PipelineServiceClientException.byMessage((String)message, (String)e.getMessage());
    }

    private PipelineServiceClientException clientException(IngestionPipeline pipeline, Exception e) {
        return this.clientException(pipeline.getName(), e);
    }

    private PipelineServiceClientException clientException(IngestionPipeline pipeline, String message, HttpResponse<String> response) {
        return PipelineServiceClientException.byMessage((String)pipeline.getName(), (String)message, (Response.Status)Response.Status.fromStatusCode((int)response.statusCode()));
    }
}

