/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.alert.service;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import lombok.Generated;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.curator.shaded.com.google.common.collect.Lists;
import org.apache.dolphinscheduler.alert.api.AlertChannel;
import org.apache.dolphinscheduler.alert.api.AlertData;
import org.apache.dolphinscheduler.alert.api.AlertInfo;
import org.apache.dolphinscheduler.alert.api.AlertResult;
import org.apache.dolphinscheduler.alert.config.AlertConfig;
import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
import org.apache.dolphinscheduler.common.enums.AlertStatus;
import org.apache.dolphinscheduler.common.enums.AlertType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance;
import org.apache.dolphinscheduler.dao.entity.AlertSendStatus;
import org.apache.dolphinscheduler.dao.entity.ListenerEvent;
import org.apache.dolphinscheduler.dao.entity.event.AbstractListenerEvent;
import org.apache.dolphinscheduler.dao.entity.event.ProcessDefinitionCreatedListenerEvent;
import org.apache.dolphinscheduler.dao.entity.event.ProcessDefinitionDeletedListenerEvent;
import org.apache.dolphinscheduler.dao.entity.event.ProcessDefinitionUpdatedListenerEvent;
import org.apache.dolphinscheduler.dao.entity.event.ProcessEndListenerEvent;
import org.apache.dolphinscheduler.dao.entity.event.ProcessFailListenerEvent;
import org.apache.dolphinscheduler.dao.entity.event.ProcessStartListenerEvent;
import org.apache.dolphinscheduler.dao.entity.event.ServerDownListenerEvent;
import org.apache.dolphinscheduler.dao.entity.event.TaskEndListenerEvent;
import org.apache.dolphinscheduler.dao.entity.event.TaskFailListenerEvent;
import org.apache.dolphinscheduler.dao.entity.event.TaskStartListenerEvent;
import org.apache.dolphinscheduler.dao.mapper.AlertPluginInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ListenerEventMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

@Service
public final class ListenerEventPostService
extends BaseDaemonThread
implements AutoCloseable {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(ListenerEventPostService.class);
    @Value(value="${alert.query_alert_threshold:100}")
    private Integer QUERY_ALERT_THRESHOLD;
    @Autowired
    private ListenerEventMapper listenerEventMapper;
    @Autowired
    private AlertPluginInstanceMapper alertPluginInstanceMapper;
    @Autowired
    private AlertPluginManager alertPluginManager;
    @Autowired
    private AlertConfig alertConfig;

    public ListenerEventPostService() {
        super("ListenerEventPostService");
    }

    public void run() {
        log.info("listener event post thread started");
        while (!ServerLifeCycleManager.isStopped()) {
            try {
                List listenerEvents = this.listenerEventMapper.listingListenerEventByStatus(AlertStatus.WAIT_EXECUTION, this.QUERY_ALERT_THRESHOLD.intValue());
                if (CollectionUtils.isEmpty((Collection)listenerEvents)) {
                    log.debug("There is no waiting listener events");
                    continue;
                }
                this.send(listenerEvents);
            }
            catch (Exception e) {
                log.error("listener event post thread meet an exception", (Throwable)e);
            }
            finally {
                ThreadUtils.sleep((long)5000L);
            }
        }
        log.info("listener event post thread stopped");
    }

    public void send(List<ListenerEvent> listenerEvents) {
        for (ListenerEvent listenerEvent : listenerEvents) {
            int eventId = listenerEvent.getId();
            List globalAlertInstanceList = this.alertPluginInstanceMapper.queryAllGlobalAlertPluginInstanceList();
            if (CollectionUtils.isEmpty((Collection)globalAlertInstanceList)) {
                log.error("post listener event fail,no bind global plugin instance.");
                this.listenerEventMapper.updateListenerEvent(eventId, AlertStatus.EXECUTION_FAILURE, "no bind plugin instance", new Date());
                continue;
            }
            AbstractListenerEvent event = this.generateEventFromContent(listenerEvent);
            if (event == null) {
                log.error("parse listener event to abstract listener event fail.ed {}", (Object)listenerEvent.getContent());
                this.listenerEventMapper.updateListenerEvent(eventId, AlertStatus.EXECUTION_FAILURE, "parse listener event to abstract listener event failed", new Date());
                continue;
            }
            ArrayList events = Lists.newArrayList((Object[])new AbstractListenerEvent[]{event});
            AlertData alertData = AlertData.builder().id(eventId).content(JSONUtils.toJsonString((Object)events)).log(listenerEvent.getLog()).title(event.getTitle()).warnType(WarningType.GLOBAL.getCode()).alertType(event.getEventType().getCode()).build();
            int sendSuccessCount = 0;
            ArrayList<AlertSendStatus> failedPostResults = new ArrayList<AlertSendStatus>();
            for (AlertPluginInstance instance : globalAlertInstanceList) {
                AlertStatus sendStatus;
                AlertResult alertResult = this.alertResultHandler(instance, alertData);
                if (alertResult == null) continue;
                AlertStatus alertStatus = sendStatus = Boolean.parseBoolean(alertResult.getStatus()) ? AlertStatus.EXECUTION_SUCCESS : AlertStatus.EXECUTION_FAILURE;
                if (AlertStatus.EXECUTION_SUCCESS.equals((Object)sendStatus)) {
                    ++sendSuccessCount;
                    continue;
                }
                AlertSendStatus alertSendStatus = AlertSendStatus.builder().alertId(eventId).alertPluginInstanceId(instance.getId().intValue()).sendStatus(sendStatus).log(JSONUtils.toJsonString((Object)alertResult)).createTime(new Date()).build();
                failedPostResults.add(alertSendStatus);
            }
            if (sendSuccessCount == globalAlertInstanceList.size()) {
                this.listenerEventMapper.deleteById((Serializable)Integer.valueOf(eventId));
                continue;
            }
            AlertStatus alertStatus = sendSuccessCount == 0 ? AlertStatus.EXECUTION_FAILURE : AlertStatus.EXECUTION_PARTIAL_SUCCESS;
            this.listenerEventMapper.updateListenerEvent(eventId, alertStatus, JSONUtils.toJsonString(failedPostResults), new Date());
        }
    }

    @Nullable
    private AlertResult alertResultHandler(AlertPluginInstance instance, AlertData alertData) {
        String pluginInstanceName = instance.getInstanceName();
        int pluginDefineId = instance.getPluginDefineId();
        Optional<AlertChannel> alertChannelOptional = this.alertPluginManager.getAlertChannel(instance.getPluginDefineId());
        if (!alertChannelOptional.isPresent()) {
            String message = String.format("Global Alert Plugin %s send error: the channel doesn't exist, pluginDefineId: %s", pluginInstanceName, pluginDefineId);
            log.error("Global Alert Plugin {} send error : not found plugin {}", (Object)pluginInstanceName, (Object)pluginDefineId);
            return new AlertResult("false", message);
        }
        AlertChannel alertChannel = alertChannelOptional.get();
        Map paramsMap = JSONUtils.toMap((String)instance.getPluginInstanceParams());
        AlertInfo alertInfo = AlertInfo.builder().alertData(alertData).alertParams(paramsMap).alertPluginInstanceId(instance.getId().intValue()).build();
        int waitTimeout = this.alertConfig.getWaitTimeout();
        try {
            AlertResult alertResult;
            if (waitTimeout <= 0) {
                alertResult = alertData.getAlertType() == AlertType.CLOSE_ALERT.getCode() ? alertChannel.closeAlert(alertInfo) : alertChannel.process(alertInfo);
            } else {
                CompletableFuture<AlertResult> future = alertData.getAlertType() == AlertType.CLOSE_ALERT.getCode() ? CompletableFuture.supplyAsync(() -> alertChannel.closeAlert(alertInfo)) : CompletableFuture.supplyAsync(() -> alertChannel.process(alertInfo));
                alertResult = future.get(waitTimeout, TimeUnit.MILLISECONDS);
            }
            if (alertResult == null) {
                throw new RuntimeException("Alert result cannot be null");
            }
            return alertResult;
        }
        catch (InterruptedException e) {
            log.error("post listener event error alert data id :{},", (Object)alertData.getId(), (Object)e);
            Thread.currentThread().interrupt();
            return new AlertResult("false", e.getMessage());
        }
        catch (Exception e) {
            log.error("post listener event error alert data id :{},", (Object)alertData.getId(), (Object)e);
            return new AlertResult("false", e.getMessage());
        }
    }

    private AbstractListenerEvent generateEventFromContent(ListenerEvent listenerEvent) {
        String content = listenerEvent.getContent();
        switch (listenerEvent.getEventType()) {
            case SERVER_DOWN: {
                return (AbstractListenerEvent)JSONUtils.parseObject((String)content, ServerDownListenerEvent.class);
            }
            case PROCESS_DEFINITION_CREATED: {
                return (AbstractListenerEvent)JSONUtils.parseObject((String)content, ProcessDefinitionCreatedListenerEvent.class);
            }
            case PROCESS_DEFINITION_UPDATED: {
                return (AbstractListenerEvent)JSONUtils.parseObject((String)content, ProcessDefinitionUpdatedListenerEvent.class);
            }
            case PROCESS_DEFINITION_DELETED: {
                return (AbstractListenerEvent)JSONUtils.parseObject((String)content, ProcessDefinitionDeletedListenerEvent.class);
            }
            case PROCESS_START: {
                return (AbstractListenerEvent)JSONUtils.parseObject((String)content, ProcessStartListenerEvent.class);
            }
            case PROCESS_END: {
                return (AbstractListenerEvent)JSONUtils.parseObject((String)content, ProcessEndListenerEvent.class);
            }
            case PROCESS_FAIL: {
                return (AbstractListenerEvent)JSONUtils.parseObject((String)content, ProcessFailListenerEvent.class);
            }
            case TASK_START: {
                return (AbstractListenerEvent)JSONUtils.parseObject((String)content, TaskStartListenerEvent.class);
            }
            case TASK_END: {
                return (AbstractListenerEvent)JSONUtils.parseObject((String)content, TaskEndListenerEvent.class);
            }
            case TASK_FAIL: {
                return (AbstractListenerEvent)JSONUtils.parseObject((String)content, TaskFailListenerEvent.class);
            }
        }
        return null;
    }

    @Override
    public void close() {
        log.info("Closed ListenerEventPostService...");
    }
}

