/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.engine.server.rest;

import com.hazelcast.cluster.impl.MemberImpl;
import com.hazelcast.internal.ascii.TextCommand;
import com.hazelcast.internal.ascii.TextCommandService;
import com.hazelcast.internal.ascii.rest.HttpCommand;
import com.hazelcast.internal.ascii.rest.HttpCommandProcessor;
import com.hazelcast.internal.ascii.rest.HttpPostCommand;
import com.hazelcast.internal.ascii.rest.HttpStatusCode;
import com.hazelcast.internal.json.Json;
import com.hazelcast.internal.json.JsonArray;
import com.hazelcast.internal.json.JsonObject;
import com.hazelcast.internal.json.JsonValue;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.NodeEngineImpl;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.core.starter.utils.ConfigShadeUtils;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.server.CoordinatorService;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.log.Log4j2HttpPostCommandProcessor;
import org.apache.seatunnel.engine.server.operation.CancelJobOperation;
import org.apache.seatunnel.engine.server.operation.SavePointJobOperation;
import org.apache.seatunnel.engine.server.operation.SubmitJobOperation;
import org.apache.seatunnel.engine.server.rest.RestJobExecutionEnvironment;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
import org.apache.seatunnel.engine.server.utils.RestUtil;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
import scala.Tuple2;

public class RestHttpPostCommandProcessor
extends HttpCommandProcessor<HttpPostCommand> {
    private final Log4j2HttpPostCommandProcessor original;

    public RestHttpPostCommandProcessor(TextCommandService textCommandService) {
        this(textCommandService, new Log4j2HttpPostCommandProcessor(textCommandService));
    }

    protected RestHttpPostCommandProcessor(TextCommandService textCommandService, Log4j2HttpPostCommandProcessor log4j2HttpPostCommandProcessor) {
        super(textCommandService, textCommandService.getNode().getLogger(Log4j2HttpPostCommandProcessor.class));
        this.original = log4j2HttpPostCommandProcessor;
    }

    public void handle(HttpPostCommand httpPostCommand) {
        String uri = httpPostCommand.getURI();
        try {
            if (uri.startsWith("/hazelcast/rest/maps/submit-jobs")) {
                this.handleSubmitJobs(httpPostCommand);
            } else if (uri.startsWith("/hazelcast/rest/maps/submit-job")) {
                this.handleSubmitJob(httpPostCommand, uri);
            } else if (uri.startsWith("/hazelcast/rest/maps/stop-jobs")) {
                this.handleStopJobs(httpPostCommand);
            } else if (uri.startsWith("/hazelcast/rest/maps/stop-job")) {
                this.handleStopJob(httpPostCommand);
            } else if (uri.startsWith("/hazelcast/rest/maps/encrypt-config")) {
                this.handleEncrypt(httpPostCommand);
            } else if (uri.startsWith("/hazelcast/rest/maps/update-tags")) {
                this.handleUpdateTags(httpPostCommand);
            } else {
                this.original.handle(httpPostCommand);
            }
        }
        catch (IllegalArgumentException e) {
            this.prepareResponse(HttpStatusCode.SC_400, (HttpCommand)httpPostCommand, RestHttpPostCommandProcessor.exceptionResponse((Throwable)e));
        }
        catch (Throwable e) {
            this.logger.warning("An error occurred while handling request " + httpPostCommand, e);
            this.prepareResponse(HttpStatusCode.SC_500, (HttpCommand)httpPostCommand, RestHttpPostCommandProcessor.exceptionResponse((Throwable)e));
        }
        this.textCommandService.sendResponse((TextCommand)httpPostCommand);
    }

    private SeaTunnelServer getSeaTunnelServer() {
        Map extensionServices = this.textCommandService.getNode().getNodeExtension().createExtensionServices();
        return (SeaTunnelServer)extensionServices.get("st:impl:seaTunnelServer");
    }

    private void handleSubmitJobs(HttpPostCommand httpPostCommand) throws IllegalArgumentException {
        List<Tuple2<Map<String, String>, Config>> configTuples = RestUtil.buildConfigList(this.requestHandle(httpPostCommand), false);
        JsonArray jsonArray = configTuples.stream().map(tuple -> {
            String urlParams = this.mapToUrlParams((Map)tuple._1);
            HashMap<String, String> requestParams = new HashMap<String, String>();
            RestUtil.buildRequestParams(requestParams, urlParams);
            return this.submitJobInternal((Config)tuple._2, requestParams);
        }).collect(JsonArray::new, JsonArray::add, JsonArray::add);
        this.prepareResponse((HttpCommand)httpPostCommand, jsonArray);
    }

    private String mapToUrlParams(Map<String, String> params) {
        return params.entrySet().stream().map(entry -> (String)entry.getKey() + "=" + (String)entry.getValue()).collect(Collectors.joining("&", "?", ""));
    }

    private void handleSubmitJob(HttpPostCommand httpPostCommand, String uri) throws IllegalArgumentException {
        HashMap<String, String> requestParams = new HashMap<String, String>();
        RestUtil.buildRequestParams(requestParams, uri);
        Config config = RestUtil.buildConfig(this.requestHandle(httpPostCommand), false);
        JsonObject jsonObject = this.submitJobInternal(config, requestParams);
        this.prepareResponse((HttpCommand)httpPostCommand, jsonObject);
    }

    private JsonObject submitJobInternal(Config config, Map<String, String> requestParams) {
        ReadonlyConfig envOptions = ReadonlyConfig.fromConfig((Config)config.getConfig("env"));
        String jobName = (String)envOptions.get(EnvCommonOptions.JOB_NAME);
        JobConfig jobConfig = new JobConfig();
        jobConfig.setName(StringUtils.isEmpty((CharSequence)requestParams.get("jobName")) ? jobName : requestParams.get("jobName"));
        boolean startWithSavePoint = Boolean.parseBoolean(requestParams.get("isStartWithSavePoint"));
        String jobIdStr = requestParams.get("jobId");
        Long finalJobId = StringUtils.isNotBlank((CharSequence)jobIdStr) ? Long.valueOf(Long.parseLong(jobIdStr)) : null;
        SeaTunnelServer seaTunnelServer = this.getSeaTunnelServer();
        RestJobExecutionEnvironment restJobExecutionEnvironment = new RestJobExecutionEnvironment(seaTunnelServer, jobConfig, config, this.textCommandService.getNode(), startWithSavePoint, finalJobId);
        JobImmutableInformation jobImmutableInformation = restJobExecutionEnvironment.build();
        long jobId = jobImmutableInformation.getJobId();
        if (!seaTunnelServer.isMasterNode()) {
            NodeEngineUtil.sendOperationToMasterNode((NodeEngine)this.getNode().nodeEngine, new SubmitJobOperation(jobId, this.getNode().nodeEngine.toData((Object)jobImmutableInformation), jobImmutableInformation.isStartWithSavePoint())).join();
        } else {
            this.submitJob(seaTunnelServer, jobImmutableInformation, jobConfig);
        }
        return new JsonObject().add("jobId", String.valueOf(jobId)).add("jobName", jobConfig.getName());
    }

    private void handleStopJobs(HttpPostCommand command) {
        List jobList = JsonUtils.toList((String)this.requestHandle(command).toString(), Map.class);
        JsonArray jsonResponse = new JsonArray();
        jobList.forEach(job -> {
            this.handleStopJob((Map<String, Object>)job);
            jsonResponse.add((JsonValue)new JsonObject().add("jobId", ((Long)job.get("jobId")).longValue()));
        });
        this.prepareResponse((HttpCommand)command, jsonResponse);
    }

    private void handleStopJob(HttpPostCommand httpPostCommand) {
        Map map = JsonUtils.toMap((JsonNode)this.requestHandle(httpPostCommand));
        this.handleStopJob(map);
        this.prepareResponse((HttpCommand)httpPostCommand, new JsonObject().add("jobId", map.get("jobId").toString()));
    }

    private void handleStopJob(Map<String, Object> map) {
        SeaTunnelServer seaTunnelServer;
        boolean isStopWithSavePoint = false;
        if (map.get("jobId") == null) {
            throw new IllegalArgumentException("jobId cannot be empty.");
        }
        long jobId = Long.parseLong(map.get("jobId").toString());
        if (map.get("isStopWithSavePoint") != null) {
            isStopWithSavePoint = Boolean.parseBoolean(map.get("isStopWithSavePoint").toString());
        }
        if (!(seaTunnelServer = this.getSeaTunnelServer()).isMasterNode()) {
            if (isStopWithSavePoint) {
                NodeEngineUtil.sendOperationToMasterNode((NodeEngine)this.getNode().nodeEngine, new SavePointJobOperation(jobId)).join();
            } else {
                NodeEngineUtil.sendOperationToMasterNode((NodeEngine)this.getNode().nodeEngine, new CancelJobOperation(jobId)).join();
            }
        } else {
            CoordinatorService coordinatorService = seaTunnelServer.getCoordinatorService();
            if (isStopWithSavePoint) {
                coordinatorService.savePoint(jobId);
            } else {
                coordinatorService.cancelJob(jobId);
            }
        }
        this.logger.info("Stop job with jobId: " + jobId);
    }

    private void handleEncrypt(HttpPostCommand httpPostCommand) {
        Config config = RestUtil.buildConfig(this.requestHandle(httpPostCommand), true);
        Config encryptConfig = ConfigShadeUtils.encryptConfig((Config)config);
        String encryptString = encryptConfig.root().render(ConfigRenderOptions.concise().setJson(true));
        JsonObject jsonObject = Json.parse((String)encryptString).asObject();
        this.prepareResponse((HttpCommand)httpPostCommand, jsonObject);
    }

    private void handleUpdateTags(HttpPostCommand httpPostCommand) {
        Map params = JsonUtils.toMap((JsonNode)this.requestHandle(httpPostCommand));
        SeaTunnelServer seaTunnelServer = this.getSeaTunnelServer();
        NodeEngineImpl nodeEngine = seaTunnelServer.getNodeEngine();
        MemberImpl localMember = nodeEngine.getLocalMember();
        Map<String, String> tags = params.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, value -> value.getValue() != null ? value.getValue().toString() : ""));
        localMember.updateAttribute(tags);
        this.prepareResponse((HttpCommand)httpPostCommand, new JsonObject().add("status", HttpCommandProcessor.ResponseType.SUCCESS.toString()).add("message", "update node tags done."));
    }

    public void handleRejection(HttpPostCommand httpPostCommand) {
        this.handle(httpPostCommand);
    }

    private JsonNode requestHandle(HttpPostCommand httpPostCommand) {
        JsonNode requestBodyJsonNode;
        byte[] requestBody = httpPostCommand.getData();
        if (requestBody.length == 0) {
            throw new IllegalArgumentException("Request body is empty.");
        }
        try {
            requestBodyJsonNode = RestUtil.convertByteToJsonNode(requestBody);
        }
        catch (IOException e) {
            throw new IllegalArgumentException("Invalid JSON format in request body.");
        }
        return requestBodyJsonNode;
    }

    private void submitJob(SeaTunnelServer seaTunnelServer, JobImmutableInformation jobImmutableInformation, JobConfig jobConfig) {
        CoordinatorService coordinatorService = seaTunnelServer.getCoordinatorService();
        Data data = this.textCommandService.getNode().nodeEngine.getSerializationService().toData((Object)jobImmutableInformation);
        PassiveCompletableFuture<Void> voidPassiveCompletableFuture = coordinatorService.submitJob(Long.parseLong(jobConfig.getJobContext().getJobId()), data, jobImmutableInformation.isStartWithSavePoint());
        voidPassiveCompletableFuture.join();
    }
}

