/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.streaming.app;

import io.kyligence.kap.guava20.shaded.common.base.Preconditions;
import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Generated;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.kylin.cluster.IClusterManager;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.StorageURL;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.metadata.AuditLogStore;
import org.apache.kylin.common.persistence.metadata.JdbcPartialAuditLogStore;
import org.apache.kylin.common.response.RestResponse;
import org.apache.kylin.common.util.AddressUtil;
import org.apache.kylin.common.util.Application;
import org.apache.kylin.common.util.TimeZoneUtils;
import org.apache.kylin.common.util.Unsafe;
import org.apache.kylin.engine.spark.job.KylinBuildEnv;
import org.apache.kylin.engine.spark.job.UdfManager;
import org.apache.kylin.engine.spark.utils.JobMetricsUtils;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.JobTypeEnum;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.cube.utils.StreamingUtils;
import org.apache.kylin.streaming.jobs.GracefulStopInterface;
import org.apache.kylin.streaming.manager.StreamingJobManager;
import org.apache.kylin.streaming.metadata.StreamingJobMeta;
import org.apache.kylin.streaming.request.StreamingJobUpdateRequest;
import org.apache.kylin.streaming.rest.RestSupport;
import org.apache.kylin.streaming.util.JobExecutionIdHolder;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.KylinSession;
import org.apache.spark.sql.KylinSession$;
import org.apache.spark.sql.SparderEnv;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSessionExtensions;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.catalyst.rules.Rule;
import org.apache.spark.sql.execution.datasource.AlignmentTableStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function1;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;

public abstract class StreamingApplication
implements Application,
GracefulStopInterface {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(StreamingApplication.class);
    protected final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
    protected SparkSession ss;
    protected String project;
    protected String dataflowId;
    protected String distMetaUrl;
    protected JobTypeEnum jobType;
    protected String jobId;
    private final AtomicReference<Object> metaResPathSet = new AtomicReference();
    protected Integer jobExecId;

    private void prepareKylinConfig() throws Exception {
        StorageURL jobStorageUrl = StorageURL.valueOf((String)this.distMetaUrl);
        if (!jobStorageUrl.getScheme().equals("hdfs")) {
            this.kylinConfig.setMetadataUrl(this.distMetaUrl);
            return;
        }
        JdbcPartialAuditLogStore auditLogStore = new JdbcPartialAuditLogStore(this.kylinConfig, resPath -> resPath.startsWith(String.format(Locale.ROOT, "/%s%s/%s", this.project, "/dataflow_details", this.dataflowId)) || this.getMetaResPathSet().contains(resPath));
        this.kylinConfig.setMetadataUrl(this.distMetaUrl);
        Preconditions.checkState((boolean)"hdfs".equals(this.kylinConfig.getMetadataUrl().getScheme()));
        ResourceStore resourceStore = ResourceStore.getKylinMetaStore((KylinConfig)this.kylinConfig);
        resourceStore.getMetadataStore().setAuditLogStore((AuditLogStore)auditLogStore);
        resourceStore.catchup();
        log.info("start job from offset:{}", (Object)auditLogStore.getLogOffset());
    }

    private Set<String> initMetaPathSet() {
        Set dumpMetaPathSet = NDataflowManager.getInstance((KylinConfig)this.kylinConfig, (String)this.project).getDataflow(this.dataflowId).collectPrecalculationResource();
        dumpMetaPathSet.add(String.format(Locale.ROOT, "/%s%s/%s", this.project, "/streaming", this.jobId));
        return dumpMetaPathSet;
    }

    protected void prepareBeforeExecute() throws ExecuteException {
        try {
            TimeZoneUtils.setDefaultTimeZone((KylinConfig)this.kylinConfig);
            if (this.isJobOnCluster()) {
                this.prepareKylinConfig();
            }
            this.getOrCreateSparkSession(KylinBuildEnv.getOrCreate((KylinConfig)this.kylinConfig).sparkConf());
            this.jobExecId = this.reportApplicationInfo();
            JobExecutionIdHolder.setJobExecutionId(this.jobId, this.jobExecId);
            this.startJobExecutionIdCheckThread();
        }
        catch (Exception e) {
            throw new ExecuteException((Throwable)e);
        }
    }

    public abstract void parseParams(String[] var1);

    public void execute(String[] args) {
        try {
            this.parseParams(args);
            this.prepareBeforeExecute();
            this.doExecute();
        }
        catch (Exception e) {
            log.error("{} execute error", (Object)this.getClass().getCanonicalName(), (Object)e);
            ExceptionUtils.rethrow((Throwable)e);
        }
    }

    protected abstract void doExecute() throws ExecuteException;

    public void getOrCreateSparkSession(SparkConf sparkConf) {
        boolean createWithSparkSession;
        SparkSession.Builder sessionBuilder = SparkSession.builder().withExtensions((Function1)new AbstractFunction1<SparkSessionExtensions, BoxedUnit>(){

            public BoxedUnit apply(SparkSessionExtensions v1) {
                v1.injectPostHocResolutionRule((Function1)new AbstractFunction1<SparkSession, Rule<LogicalPlan>>(){

                    public Rule<LogicalPlan> apply(SparkSession session) {
                        return new AlignmentTableStats(session);
                    }
                });
                return BoxedUnit.UNIT;
            }
        }).enableHiveSupport().config(sparkConf).config("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false");
        boolean bl = createWithSparkSession = !this.isJobOnCluster() && SparderEnv.isSparkAvailable();
        if (createWithSparkSession) {
            boolean isKylinSession = SparderEnv.getSparkSession() instanceof KylinSession;
            createWithSparkSession = !isKylinSession;
        }
        this.ss = createWithSparkSession ? sessionBuilder.getOrCreate() : KylinSession$.MODULE$.KylinBuilder(sessionBuilder).buildCluster().getOrCreateKylinSession();
        UdfManager.create((SparkSession)this.ss);
        JobMetricsUtils.registerListener((SparkSession)this.ss);
        if (this.isJobOnCluster()) {
            KylinConfig config = KylinConfig.getInstanceFromEnv();
            Unsafe.setProperty((String)"kylin.env", (String)config.getDeployEnv());
        }
    }

    public void closeAuditLogStore(SparkSession ss) {
        if (this.isJobOnCluster()) {
            JobMetricsUtils.unRegisterListener((SparkSession)ss);
            ResourceStore store = ResourceStore.getKylinMetaStore((KylinConfig)KylinConfig.getInstanceFromEnv());
            try {
                store.getAuditLogStore().close();
            }
            catch (IOException e) {
                log.error("close audit log error", (Throwable)e);
            }
        }
    }

    public Integer reportApplicationInfo() {
        KylinBuildEnv buildEnv = this.getOrCreateKylinBuildEnv(this.kylinConfig);
        String appId = this.ss.sparkContext().applicationId();
        String trackingUrl = "";
        IClusterManager cm = buildEnv.clusterManager();
        trackingUrl = this.getTrackingUrl(cm, this.ss);
        boolean isIpPreferred = this.kylinConfig.isTrackingUrlIpAddressEnabled();
        try {
            if (StringUtils.isBlank((CharSequence)trackingUrl)) {
                log.info("Get tracking url of application $appId, but empty url found.");
            }
            if (isIpPreferred && !StringUtils.isEmpty((CharSequence)trackingUrl)) {
                trackingUrl = this.tryReplaceHostAddress(trackingUrl);
            }
        }
        catch (Exception e) {
            log.error("get tracking url failed!", (Throwable)e);
        }
        StreamingJobUpdateRequest request = new StreamingJobUpdateRequest(this.project, this.dataflowId, this.jobType.name(), appId, trackingUrl);
        request.setProcessId(StreamingUtils.getProcessId());
        request.setNodeInfo(AddressUtil.getZkLocalInstance());
        try (RestSupport rest = this.createRestSupport(this.kylinConfig);){
            RestResponse<String> restResp = rest.execute((HttpRequestBase)rest.createHttpPut("/streaming_jobs/spark"), request);
            Integer n = Integer.parseInt((String)restResp.getData());
            return n;
        }
    }

    public KylinBuildEnv getOrCreateKylinBuildEnv(KylinConfig config) {
        return KylinBuildEnv.getOrCreate((KylinConfig)config);
    }

    public String getTrackingUrl(IClusterManager cm, SparkSession sparkSession) {
        return cm.getBuildTrackingUrl(sparkSession);
    }

    public String tryReplaceHostAddress(String url) {
        String originHost = null;
        try {
            URI uri = URI.create(url);
            originHost = uri.getHost();
            String hostAddress = InetAddress.getByName(originHost).getHostAddress();
            return url.replace(originHost, hostAddress);
        }
        catch (UnknownHostException uhe) {
            log.error("failed to get the ip address of $originHost, step back to use the origin tracking url.", (Throwable)uhe);
            return url;
        }
    }

    public void systemExit(int code) {
        if (this.isJobOnCluster()) {
            Unsafe.systemExit((int)code);
        }
    }

    public boolean isJobOnCluster() {
        KylinConfig config = KylinConfig.getInstanceFromEnv();
        return !StreamingUtils.isLocalMode() && !config.isUTEnv();
    }

    protected void closeSparkSession() {
        if (!StreamingUtils.isLocalMode() && !this.ss.sparkContext().isStopped()) {
            this.ss.stop();
        }
    }

    public SparkSession getSparkSession() {
        return this.ss;
    }

    public void setSparkSession(SparkSession ss) {
        this.ss = ss;
    }

    public Map<String, String> getJobParams(StreamingJobMeta jobMeta) {
        return jobMeta.getParams();
    }

    public boolean isGracefulShutdown(String project, String uuid) {
        KylinConfig config = KylinConfig.getInstanceFromEnv();
        StreamingJobManager mgr = StreamingJobManager.getInstance(config, project);
        StreamingJobMeta meta = mgr.getStreamingJobByUuid(uuid);
        return "GRACEFUL_SHUTDOWN".equals(meta.getAction());
    }

    public boolean isRunning() {
        return !this.getStopFlag() && !this.ss.sparkContext().isStopped();
    }

    public void startJobExecutionIdCheckThread() {
        Thread processCheckThread = new Thread(() -> {
            KylinConfig conf = KylinConfig.getInstanceFromEnv();
            long jobExecutionIdCheckInterval = conf.getStreamingJobExecutionIdCheckInterval();
            while (this.isRunning()) {
                try {
                    StreamingUtils.replayAuditlog();
                    StreamingJobManager mgr = StreamingJobManager.getInstance(conf, this.project);
                    StreamingJobMeta meta = mgr.getStreamingJobByUuid(this.jobId);
                    if (!Objects.equals(this.jobExecId, meta.getJobExecutionId())) {
                        this.closeSparkSession();
                        break;
                    }
                }
                catch (Exception e) {
                    log.warn("check JobExecutionId error:", (Throwable)e);
                }
                StreamingUtils.sleep((long)TimeUnit.MINUTES.toMillis(jobExecutionIdCheckInterval));
            }
        });
        processCheckThread.setDaemon(true);
        processCheckThread.start();
    }

    public RestSupport createRestSupport(KylinConfig config) {
        return new RestSupport(config);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Generated
    public Set<String> getMetaResPathSet() {
        Object value = this.metaResPathSet.get();
        if (value == null) {
            AtomicReference<Object> atomicReference = this.metaResPathSet;
            synchronized (atomicReference) {
                value = this.metaResPathSet.get();
                if (value == null) {
                    Set<String> actualValue = this.initMetaPathSet();
                    value = actualValue == null ? this.metaResPathSet : actualValue;
                    this.metaResPathSet.set(value);
                }
            }
        }
        return (Set)(value == this.metaResPathSet ? null : value);
    }
}

