/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.source.kafka;

import java.util.List;
import java.util.Map;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.mr.IInput;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.engine.mr.common.MapReduceExecutable;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.job.JoinedFlatTable;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.job.util.FlatTableSqlQuoteUtils;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.TableRef;
import org.apache.kylin.shaded.com.google.common.collect.Lists;
import org.apache.kylin.source.hive.CreateFlatHiveTableStep;
import org.apache.kylin.source.hive.GarbageCollectionStep;
import org.apache.kylin.source.kafka.hadoop.KafkaFlatTableJob;
import org.apache.kylin.source.kafka.job.MergeOffsetStep;
import org.apache.kylin.source.kafka.model.StreamCubeFactTableDesc;

public class KafkaInputBase {
    protected static AbstractExecutable createMergeOffsetStep(String jobId, CubeSegment cubeSegment) {
        MergeOffsetStep result = new MergeOffsetStep();
        result.setName("Merge offset step");
        CubingExecutableUtil.setCubeName((String)cubeSegment.getCubeInstance().getName(), (Map)result.getParams());
        CubingExecutableUtil.setSegmentId((String)cubeSegment.getUuid(), (Map)result.getParams());
        CubingExecutableUtil.setCubingJobId((String)jobId, (Map)result.getParams());
        return result;
    }

    protected static MapReduceExecutable createSaveKafkaDataStep(String jobId, String location, CubeSegment seg) {
        MapReduceExecutable result = new MapReduceExecutable();
        result.setName("Save data from Kafka");
        result.setMapReduceJobClass(KafkaFlatTableJob.class);
        JobBuilderSupport jobBuilderSupport = new JobBuilderSupport(seg, "system");
        StringBuilder cmd = new StringBuilder();
        jobBuilderSupport.appendMapReduceParameters(cmd);
        JobBuilderSupport.appendExecCmdParameters((StringBuilder)cmd, (String)"cubename", (String)seg.getRealization().getName());
        JobBuilderSupport.appendExecCmdParameters((StringBuilder)cmd, (String)"output", (String)location);
        JobBuilderSupport.appendExecCmdParameters((StringBuilder)cmd, (String)"segmentId", (String)seg.getUuid());
        JobBuilderSupport.appendExecCmdParameters((StringBuilder)cmd, (String)"jobname", (String)("Kylin_Save_Kafka_Data_" + seg.getRealization().getName() + "_Step"));
        result.setMapReduceParams(cmd.toString());
        return result;
    }

    protected static AbstractExecutable createFlatTable(String hiveTableDatabase, String baseLocation, String cubeName, StreamCubeFactTableDesc streamFactDesc, List<String> intermediateTables, List<String> intermediatePaths) {
        IJoinedFlatTableDesc flatDesc = streamFactDesc.getFlatTableDesc();
        String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements((String)hiveTableDatabase);
        String dropFactTableHql = JoinedFlatTable.generateDropTableStatement((IJoinedFlatTableDesc)streamFactDesc);
        String createFactTableHql = JoinedFlatTable.generateCreateTableStatement((IJoinedFlatTableDesc)streamFactDesc, (String)baseLocation, (String)"SEQUENCEFILE");
        String dropTableHql = JoinedFlatTable.generateDropTableStatement((IJoinedFlatTableDesc)flatDesc);
        String createTableHql = JoinedFlatTable.generateCreateTableStatement((IJoinedFlatTableDesc)flatDesc, (String)baseLocation);
        String insertDataHqls = JoinedFlatTable.generateInsertDataStatement((IJoinedFlatTableDesc)flatDesc);
        insertDataHqls = insertDataHqls.replace(FlatTableSqlQuoteUtils.quoteTableIdentity((TableRef)flatDesc.getDataModel().getRootFactTable(), null) + " ", FlatTableSqlQuoteUtils.quoteTableIdentity((String)hiveTableDatabase, (String)streamFactDesc.getTableName(), null) + " ");
        CreateFlatHiveTableStep step = new CreateFlatHiveTableStep();
        CubingExecutableUtil.setCubeName((String)cubeName, (Map)step.getParams());
        step.setInitStatement(hiveInitStatements);
        step.setCreateTableStatement(dropFactTableHql + createFactTableHql + dropTableHql + createTableHql + insertDataHqls);
        step.setName("Create Intermediate Flat Hive Table");
        intermediateTables.add(flatDesc.getTableName());
        intermediateTables.add(streamFactDesc.getTableName());
        intermediatePaths.add(baseLocation + "/" + flatDesc.getTableName());
        intermediatePaths.add(baseLocation + "/" + streamFactDesc.getTableName());
        return step;
    }

    protected static AbstractExecutable createGCStep(List<String> intermediateTables, List<String> intermediatePaths) {
        GarbageCollectionStep step = new GarbageCollectionStep();
        step.setName("Hive Cleanup");
        step.setIntermediateTables(intermediateTables);
        step.setExternalDataPaths(intermediatePaths);
        return step;
    }

    public static class BaseBatchMergeInputSide
    implements IInput.IBatchMergeInputSide {
        private CubeSegment cubeSegment;

        BaseBatchMergeInputSide(CubeSegment cubeSegment) {
            this.cubeSegment = cubeSegment;
        }

        public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
            jobFlow.addTask(KafkaInputBase.createMergeOffsetStep(jobFlow.getId(), this.cubeSegment));
        }
    }

    public static class BaseBatchCubingInputSide
    implements IInput.IBatchCubingInputSide {
        final JobEngineConfig conf;
        final CubeSegment seg;
        private CubeDesc cubeDesc;
        private KylinConfig config;
        protected IJoinedFlatTableDesc flatDesc;
        protected String hiveTableDatabase;
        private final List<String> intermediateTables = Lists.newArrayList();
        private final List<String> intermediatePaths = Lists.newArrayList();
        private String cubeName;

        public BaseBatchCubingInputSide(CubeSegment seg, IJoinedFlatTableDesc flatDesc) {
            this.conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
            this.config = seg.getConfig();
            this.flatDesc = flatDesc;
            this.hiveTableDatabase = this.config.getHiveDatabaseForIntermediateTable();
            this.seg = seg;
            this.cubeDesc = seg.getCubeDesc();
            this.cubeName = seg.getCubeInstance().getName();
        }

        public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
            boolean onlyOneTable = this.cubeDesc.getModel().getLookupTables().size() == 0;
            String baseLocation = this.getJobWorkingDir(jobFlow);
            if (onlyOneTable) {
                String intermediateFactTable = this.flatDesc.getTableName();
                String tableLocation = baseLocation + "/" + intermediateFactTable;
                jobFlow.addTask((AbstractExecutable)KafkaInputBase.createSaveKafkaDataStep(jobFlow.getId(), tableLocation, this.seg));
                this.intermediatePaths.add(tableLocation);
            } else {
                StreamCubeFactTableDesc streamFactDesc = new StreamCubeFactTableDesc(this.cubeDesc, this.seg, this.flatDesc);
                jobFlow.addTask((AbstractExecutable)KafkaInputBase.createSaveKafkaDataStep(jobFlow.getId(), baseLocation + "/" + streamFactDesc.getTableName(), this.seg));
                jobFlow.addTask(KafkaInputBase.createFlatTable(this.hiveTableDatabase, baseLocation, this.cubeName, streamFactDesc, this.intermediateTables, this.intermediatePaths));
            }
        }

        public void addStepPhase_ReplaceFlatTableGlobalColumnValue(DefaultChainedExecutable jobFlow) {
        }

        protected String getJobWorkingDir(DefaultChainedExecutable jobFlow) {
            return JobBuilderSupport.getJobWorkingDir((String)this.config.getHdfsWorkingDirectory(), (String)jobFlow.getId());
        }

        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
            jobFlow.addTask(KafkaInputBase.createGCStep(this.intermediateTables, this.intermediatePaths));
        }
    }
}

