/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.streaming.jobs;

import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.engine.spark.builder.NBuildSourceInfo;
import org.apache.kylin.engine.spark.job.BuildJobInfos;
import org.apache.kylin.engine.spark.job.DFBuildJob;
import org.apache.kylin.job.execution.JobTypeEnum;
import org.apache.kylin.metadata.cube.cuboid.NSpanningTree;
import org.apache.kylin.metadata.cube.model.IndexEntity;
import org.apache.kylin.metadata.cube.model.LayoutEntity;
import org.apache.kylin.metadata.cube.model.NDataLayout;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.cube.model.NDataflowUpdate;
import org.apache.kylin.metadata.cube.utils.StreamingUtils;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.streaming.common.BuildJobEntry;
import org.apache.kylin.streaming.metadata.BuildLayoutWithRestUpdate;
import org.apache.kylin.streaming.request.StreamingSegmentRequest;
import org.apache.kylin.streaming.rest.RestSupport;
import org.apache.kylin.streaming.util.JobExecutionIdHolder;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

public class StreamingDFBuildJob
extends DFBuildJob {
    private Map<Long, Dataset<Row>> cuboidDatasetMap;

    public StreamingDFBuildJob(String project) {
        this.buildLayoutWithUpdate = new BuildLayoutWithRestUpdate(JobTypeEnum.STREAMING_BUILD);
        this.config = KylinConfig.getInstanceFromEnv();
        this.dfMgr = NDataflowManager.getInstance((KylinConfig)this.config, (String)project);
        this.project = project;
    }

    public void streamBuild(BuildJobEntry buildJobEntry) throws IOException {
        if (this.ss == null) {
            this.ss = buildJobEntry.spark();
            this.ss.sparkContext().setLocalProperty("spark.sql.execution.id", null);
        }
        this.jobId = RandomUtil.randomUUIDStr();
        if (this.infos == null) {
            this.infos = new BuildJobInfos();
        }
        if (this.cuboidDatasetMap == null) {
            this.cuboidDatasetMap = new ConcurrentHashMap<Long, Dataset<Row>>();
        }
        this.setParam("dataflowId", buildJobEntry.dataflowId());
        Preconditions.checkState((buildJobEntry.toBuildTree().getRootIndexEntities().size() != 0 ? 1 : 0) != 0, (Object)"streaming mast have one root index");
        NBuildSourceInfo theRootLevelBuildInfos = new NBuildSourceInfo();
        theRootLevelBuildInfos.setFlattableDS(buildJobEntry.streamingFlatDS());
        theRootLevelBuildInfos.setSparkSession(this.ss);
        theRootLevelBuildInfos.setToBuildCuboids(buildJobEntry.toBuildTree().getRootIndexEntities());
        this.build(Sets.newHashSet((Object[])new NBuildSourceInfo[]{theRootLevelBuildInfos}), buildJobEntry.batchSegment().getId(), buildJobEntry.toBuildTree());
        logger.info("start update segment");
        if (this.config.isUTEnv()) {
            EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
                NDataflowManager dfMgr = NDataflowManager.getInstance((KylinConfig)KylinConfig.getInstanceFromEnv(), (String)this.project);
                NDataflow newDF = dfMgr.getDataflow(buildJobEntry.dataflowId()).copy();
                NDataSegment segUpdate = newDF.getSegment(buildJobEntry.batchSegment().getId());
                segUpdate.setStatus(SegmentStatusEnum.READY);
                segUpdate.setSourceCount(buildJobEntry.flatTableCount());
                NDataflowUpdate dfUpdate = new NDataflowUpdate(buildJobEntry.dataflowId());
                dfUpdate.setToUpdateSegs(new NDataSegment[]{segUpdate});
                dfUpdate.setStatus(RealizationStatusEnum.ONLINE);
                dfMgr.updateDataflow(dfUpdate);
                return 0;
            }, (String)this.project);
        } else {
            this.updateSegment(buildJobEntry);
        }
        this.infos.clear();
        this.cuboidDatasetMap.clear();
    }

    public void updateSegment(BuildJobEntry buildJobEntry) {
        String url = "/streaming_jobs/dataflow/segment";
        StreamingSegmentRequest req = new StreamingSegmentRequest(this.project, buildJobEntry.dataflowId(), buildJobEntry.flatTableCount());
        req.setNewSegId(buildJobEntry.batchSegment().getId());
        req.setStatus(RealizationStatusEnum.ONLINE.name());
        req.setJobType(JobTypeEnum.STREAMING_BUILD.name());
        String jobId = StreamingUtils.getJobId((String)buildJobEntry.dataflowId(), (String)req.getJobType());
        req.setJobExecutionId(JobExecutionIdHolder.getJobExecutionId(jobId));
        try (RestSupport rest = this.createRestSupport();){
            rest.execute((HttpRequestBase)rest.createHttpPut(url), req);
        }
        StreamingUtils.replayAuditlog();
    }

    public RestSupport createRestSupport() {
        return new RestSupport(this.config);
    }

    protected List<NBuildSourceInfo> constructTheNextLayerBuildInfos(NSpanningTree st, NDataSegment seg, Collection<IndexEntity> allIndexesInCurrentLayer) {
        ArrayList<NBuildSourceInfo> childrenBuildSourceInfos = new ArrayList<NBuildSourceInfo>();
        for (IndexEntity index : allIndexesInCurrentLayer) {
            Collection children = st.getChildrenByIndexPlan(index);
            if (children.isEmpty()) continue;
            NBuildSourceInfo theRootLevelBuildInfos = new NBuildSourceInfo();
            theRootLevelBuildInfos.setSparkSession(this.ss);
            LayoutEntity layout = (LayoutEntity)new ArrayList(st.getLayouts(index)).get(0);
            Dataset<Row> parentDataset = this.cuboidDatasetMap.get(layout.getId());
            theRootLevelBuildInfos.setLayoutId(layout.getId());
            theRootLevelBuildInfos.setToBuildCuboids(children);
            theRootLevelBuildInfos.setFlattableDS(parentDataset);
            childrenBuildSourceInfos.add(theRootLevelBuildInfos);
        }
        return childrenBuildSourceInfos;
    }

    protected NDataLayout saveAndUpdateLayout(Dataset<Row> dataset, NDataSegment seg, LayoutEntity layout) throws IOException {
        this.cuboidDatasetMap.put(layout.getId(), dataset);
        return super.saveAndUpdateLayout(dataset, seg, layout);
    }

    public NDataSegment getSegment(String segId) {
        StreamingUtils.replayAuditlog();
        return super.getSegment(segId);
    }

    public void shutdown() {
        if (this.buildLayoutWithUpdate != null) {
            this.buildLayoutWithUpdate.shutDown();
        }
    }
}

