/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.rest.config.initialize;

import io.kyligence.kap.guava20.shaded.common.eventbus.Subscribe;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Generated;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.transaction.UnitOfWork;
import org.apache.kylin.common.scheduler.EpochStartedNotifier;
import org.apache.kylin.common.scheduler.ProjectControlledNotifier;
import org.apache.kylin.common.scheduler.ProjectEscapedNotifier;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.NExecutableManager;
import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
import org.apache.kylin.metadata.epoch.EpochManager;
import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
import org.apache.kylin.rest.service.UserAclService;
import org.apache.kylin.rest.service.UserService;
import org.apache.kylin.rest.service.task.QueryHistoryTaskScheduler;
import org.apache.kylin.rest.service.task.RecommendationTopNUpdateScheduler;
import org.apache.kylin.rest.util.CreateAdminUserUtils;
import org.apache.kylin.rest.util.InitResourceGroupUtils;
import org.apache.kylin.rest.util.InitUserGroupUtils;
import org.apache.kylin.streaming.jobs.scheduler.StreamingScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;

@Component
public class EpochChangedListener {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(EpochChangedListener.class);
    private static final String GLOBAL = "_global";
    @Autowired
    Environment env;
    @Autowired
    @Qualifier(value="userService")
    UserService userService;
    @Autowired
    @Qualifier(value="userAclService")
    UserAclService userAclService;
    @Autowired
    @Qualifier(value="recommendationUpdateScheduler")
    RecommendationTopNUpdateScheduler recommendationUpdateScheduler;

    @Subscribe
    public void onProjectControlled(ProjectControlledNotifier notifier) throws IOException {
        String project = notifier.getProject();
        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
        EpochManager epochManager = EpochManager.getInstance();
        if (!GLOBAL.equals(project)) {
            if (!EpochManager.getInstance().checkEpochValid(project)) {
                log.warn("epoch:{} is invalid in project controlled", (Object)project);
                return;
            }
            NDefaultScheduler oldScheduler = NDefaultScheduler.getInstance((String)project);
            if (oldScheduler.hasStarted() && epochManager.checkEpochId(oldScheduler.getContext().getEpochId(), project)) {
                return;
            }
            if (oldScheduler.hasStarted()) {
                oldScheduler.forceShutdown();
            }
            log.info("start thread of project: {}", (Object)project);
            NDefaultScheduler scheduler = NDefaultScheduler.getInstance((String)project);
            EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
                scheduler.init(new JobEngineConfig(kylinConfig));
                if (!scheduler.hasStarted()) {
                    throw new RuntimeException("Scheduler for " + project + " has not been started");
                }
                StreamingScheduler ss = StreamingScheduler.getInstance((String)project);
                ss.init();
                if (!ss.getHasStarted().get()) {
                    throw new RuntimeException("Streaming Scheduler for " + project + " has not been started");
                }
                QueryHistoryTaskScheduler qhAccelerateScheduler = QueryHistoryTaskScheduler.getInstance(project);
                qhAccelerateScheduler.init();
                if (!qhAccelerateScheduler.hasStarted()) {
                    throw new RuntimeException("Query history accelerate scheduler for " + project + " has not been started");
                }
                this.recommendationUpdateScheduler.addProject(project);
                return 0;
            }, (String)project, (int)1);
            scheduler.setHasFinishedTransactions(new AtomicBoolean(true));
        } else {
            CreateAdminUserUtils.createAllAdmins(this.userService, this.env);
            InitUserGroupUtils.initUserGroups(this.env);
            UnitOfWork.doInTransactionWithRetry(() -> {
                ResourceStore.getKylinMetaStore((KylinConfig)KylinConfig.getInstanceFromEnv()).createMetaStoreUuidIfNotExist();
                return null;
            }, (String)"", (int)1);
            InitResourceGroupUtils.initResourceGroup();
            this.userAclService.syncAdminUserAcl();
        }
    }

    @Subscribe
    public void onProjectEscaped(ProjectEscapedNotifier notifier) {
        String project = notifier.getProject();
        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
        if (!GLOBAL.equals(project)) {
            log.info("Shutdown related thread: {}", (Object)project);
            try {
                NExecutableManager.getInstance((KylinConfig)kylinConfig, (String)project).destoryAllProcess();
                QueryHistoryTaskScheduler.shutdownByProject(project);
                NDefaultScheduler.shutdownByProject((String)project);
                StreamingScheduler.shutdownByProject((String)project);
                this.recommendationUpdateScheduler.removeProject(project);
            }
            catch (Exception e) {
                log.warn("error when shutdown " + project + " thread", (Throwable)e);
            }
        }
    }

    @Subscribe
    public void onEpochStarted(EpochStartedNotifier notifier) {
        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
        ResourceStore resourceStore = ResourceStore.getKylinMetaStore((KylinConfig)kylinConfig);
        resourceStore.leaderCatchup();
    }
}

