/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.alert;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.dolphinscheduler.alert.AlertConfig;
import org.apache.dolphinscheduler.alert.AlertPluginManager;
import org.apache.dolphinscheduler.alert.AlertServerMetrics;
import org.apache.dolphinscheduler.alert.api.AlertChannel;
import org.apache.dolphinscheduler.alert.api.AlertData;
import org.apache.dolphinscheduler.alert.api.AlertInfo;
import org.apache.dolphinscheduler.alert.api.AlertResult;
import org.apache.dolphinscheduler.common.enums.AlertStatus;
import org.apache.dolphinscheduler.common.enums.AlertType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance;
import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseCommand;
import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

@Service
public final class AlertSenderService
extends Thread {
    private static final Logger logger = LoggerFactory.getLogger(AlertSenderService.class);
    private final AlertDao alertDao;
    private final AlertPluginManager alertPluginManager;
    private final AlertConfig alertConfig;

    public AlertSenderService(AlertDao alertDao, AlertPluginManager alertPluginManager, AlertConfig alertConfig) {
        this.alertDao = alertDao;
        this.alertPluginManager = alertPluginManager;
        this.alertConfig = alertConfig;
    }

    @Override
    public synchronized void start() {
        super.setName("AlertSenderService");
        super.start();
    }

    @Override
    public void run() {
        logger.info("alert sender started");
        while (!ServerLifeCycleManager.isStopped()) {
            try {
                List alerts = this.alertDao.listPendingAlerts();
                AlertServerMetrics.registerPendingAlertGauge(alerts::size);
                this.send(alerts);
                ThreadUtils.sleep((long)5000L);
            }
            catch (Exception e) {
                logger.error("alert sender thread error", (Throwable)e);
            }
        }
    }

    public void send(List<Alert> alerts) {
        for (Alert alert : alerts) {
            int alertId = Optional.ofNullable(alert.getId()).orElse(0);
            int alertGroupId = Optional.ofNullable(alert.getAlertGroupId()).orElse(0);
            List alertInstanceList = this.alertDao.listInstanceByAlertGroupId(alertGroupId);
            if (CollectionUtils.isEmpty((Collection)alertInstanceList)) {
                logger.error("send alert msg fail,no bind plugin instance.");
                ArrayList alertResults = Lists.newArrayList((Object[])new AlertResult[]{new AlertResult("false", "no bind plugin instance")});
                this.alertDao.updateAlert(AlertStatus.EXECUTION_FAILURE, JSONUtils.toJsonString((Object)alertResults), alertId);
                continue;
            }
            AlertData alertData = AlertData.builder().id(alertId).content(alert.getContent()).log(alert.getLog()).title(alert.getTitle()).warnType(alert.getWarningType().getCode()).alertType(alert.getAlertType().getCode()).build();
            int sendSuccessCount = 0;
            ArrayList<AlertResult> alertResults = new ArrayList<AlertResult>();
            for (AlertPluginInstance instance : alertInstanceList) {
                AlertResult alertResult = this.alertResultHandler(instance, alertData);
                if (alertResult == null) continue;
                AlertStatus sendStatus = Boolean.parseBoolean(String.valueOf(alertResult.getStatus())) ? AlertStatus.EXECUTION_SUCCESS : AlertStatus.EXECUTION_FAILURE;
                this.alertDao.addAlertSendStatus(sendStatus, JSONUtils.toJsonString((Object)alertResult), alertId, instance.getId().intValue());
                if (sendStatus.equals((Object)AlertStatus.EXECUTION_SUCCESS)) {
                    ++sendSuccessCount;
                    AlertServerMetrics.incAlertSuccessCount();
                } else {
                    AlertServerMetrics.incAlertFailCount();
                }
                alertResults.add(alertResult);
            }
            AlertStatus alertStatus = AlertStatus.EXECUTION_SUCCESS;
            if (sendSuccessCount == 0) {
                alertStatus = AlertStatus.EXECUTION_FAILURE;
            } else if (sendSuccessCount < alertInstanceList.size()) {
                alertStatus = AlertStatus.EXECUTION_PARTIAL_SUCCESS;
            }
            this.alertDao.updateAlert(alertStatus, JSONUtils.toJsonString(alertResults), alertId);
        }
    }

    public AlertSendResponseCommand syncHandler(int alertGroupId, String title, String content, int warnType) {
        List alertInstanceList = this.alertDao.listInstanceByAlertGroupId(alertGroupId);
        AlertData alertData = AlertData.builder().content(content).title(title).warnType(warnType).build();
        boolean sendResponseStatus = true;
        ArrayList<AlertSendResponseResult> sendResponseResults = new ArrayList<AlertSendResponseResult>();
        if (CollectionUtils.isEmpty((Collection)alertInstanceList)) {
            AlertSendResponseResult alertSendResponseResult = new AlertSendResponseResult();
            String message = String.format("Alert GroupId %s send error : not found alert instance", alertGroupId);
            alertSendResponseResult.setSuccess(false);
            alertSendResponseResult.setMessage(message);
            sendResponseResults.add(alertSendResponseResult);
            logger.error("Alert GroupId {} send error : not found alert instance", (Object)alertGroupId);
            return new AlertSendResponseCommand(false, sendResponseResults);
        }
        for (AlertPluginInstance instance : alertInstanceList) {
            AlertResult alertResult = this.alertResultHandler(instance, alertData);
            if (alertResult == null) continue;
            AlertSendResponseResult alertSendResponseResult = new AlertSendResponseResult(Boolean.parseBoolean(String.valueOf(alertResult.getStatus())), alertResult.getMessage());
            sendResponseStatus = sendResponseStatus && alertSendResponseResult.isSuccess();
            sendResponseResults.add(alertSendResponseResult);
        }
        return new AlertSendResponseCommand(sendResponseStatus, sendResponseResults);
    }

    @Nullable
    private AlertResult alertResultHandler(AlertPluginInstance instance, AlertData alertData) {
        WarningType warningType;
        String pluginInstanceName = instance.getInstanceName();
        int pluginDefineId = instance.getPluginDefineId();
        Optional<AlertChannel> alertChannelOptional = this.alertPluginManager.getAlertChannel(instance.getPluginDefineId());
        if (!alertChannelOptional.isPresent()) {
            String message = String.format("Alert Plugin %s send error: the channel doesn't exist, pluginDefineId: %s", pluginInstanceName, pluginDefineId);
            logger.error("Alert Plugin {} send error : not found plugin {}", (Object)pluginInstanceName, (Object)pluginDefineId);
            return new AlertResult("false", message);
        }
        AlertChannel alertChannel = alertChannelOptional.get();
        Map paramsMap = JSONUtils.toMap((String)instance.getPluginInstanceParams());
        String instanceWarnType = WarningType.ALL.getDescp();
        if (MapUtils.isNotEmpty((Map)paramsMap)) {
            instanceWarnType = paramsMap.getOrDefault("WarningType", WarningType.ALL.getDescp());
        }
        if ((warningType = WarningType.of((String)instanceWarnType)) == null) {
            String message = String.format("Alert Plugin %s send error : plugin warnType is null", pluginInstanceName);
            logger.error("Alert Plugin {} send error : plugin warnType is null", (Object)pluginInstanceName);
            return new AlertResult("false", message);
        }
        boolean sendWarning = false;
        switch (warningType) {
            case ALL: {
                sendWarning = true;
                break;
            }
            case SUCCESS: {
                if (alertData.getWarnType() != WarningType.SUCCESS.getCode()) break;
                sendWarning = true;
                break;
            }
            case FAILURE: {
                if (alertData.getWarnType() != WarningType.FAILURE.getCode()) break;
                sendWarning = true;
                break;
            }
        }
        if (!sendWarning) {
            String message = String.format("Alert Plugin %s send ignore warning type not match: plugin warning type is %s, alert data warning type is %s", pluginInstanceName, warningType.getCode(), alertData.getWarnType());
            logger.info("Alert Plugin {} send ignore warning type not match: plugin warning type is {}, alert data warning type is {}", new Object[]{pluginInstanceName, warningType.getCode(), alertData.getWarnType()});
            return new AlertResult("false", message);
        }
        AlertInfo alertInfo = AlertInfo.builder().alertData(alertData).alertParams(paramsMap).alertPluginInstanceId(instance.getId().intValue()).build();
        int waitTimeout = this.alertConfig.getWaitTimeout();
        try {
            AlertResult alertResult;
            if (waitTimeout <= 0) {
                alertResult = alertData.getAlertType() == AlertType.CLOSE_ALERT.getCode() ? alertChannel.closeAlert(alertInfo) : alertChannel.process(alertInfo);
            } else {
                CompletableFuture<AlertResult> future = alertData.getAlertType() == AlertType.CLOSE_ALERT.getCode() ? CompletableFuture.supplyAsync(() -> alertChannel.closeAlert(alertInfo)) : CompletableFuture.supplyAsync(() -> alertChannel.process(alertInfo));
                alertResult = future.get(waitTimeout, TimeUnit.MILLISECONDS);
            }
            if (alertResult == null) {
                throw new RuntimeException("Alert result cannot be null");
            }
            return alertResult;
        }
        catch (InterruptedException e) {
            logger.error("send alert error alert data id :{},", (Object)alertData.getId(), (Object)e);
            Thread.currentThread().interrupt();
            return new AlertResult("false", e.getMessage());
        }
        catch (Exception e) {
            logger.error("send alert error alert data id :{},", (Object)alertData.getId(), (Object)e);
            return new AlertResult("false", e.getMessage());
        }
    }
}

