/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.streaming.jobs;

import java.util.Collections;
import java.util.List;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.exception.ErrorCodeSupplier;
import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.common.exception.ServerErrorCode;
import org.apache.kylin.job.execution.JobTypeEnum;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.cube.model.NDataflowUpdate;
import org.apache.kylin.metadata.cube.utils.StreamingUtils;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
import org.apache.kylin.streaming.common.MergeJobEntry;
import org.apache.kylin.streaming.jobs.StreamingDFMergeJob;
import org.apache.kylin.streaming.request.StreamingSegmentRequest;
import org.apache.kylin.streaming.rest.RestSupport;
import org.apache.kylin.streaming.util.JobExecutionIdHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SyncMerger {
    private static final Logger logger = LoggerFactory.getLogger(SyncMerger.class);
    private MergeJobEntry mergeJobEntry;

    public SyncMerger(MergeJobEntry mergeJobEntry) {
        this.mergeJobEntry = mergeJobEntry;
    }

    public void run(StreamingDFMergeJob merger) {
        block28: {
            logger.info("start merge streaming segment");
            logger.info(this.mergeJobEntry.toString());
            long start = System.currentTimeMillis();
            try {
                merger.streamingMergeSegment(this.mergeJobEntry);
                logger.info("merge segment cost {}", (Object)(System.currentTimeMillis() - start));
                logger.info("delete merged segment and change the status");
                this.mergeJobEntry.globalMergeTime().set(System.currentTimeMillis() - start);
                KylinConfig config = KylinConfig.getInstanceFromEnv();
                if (config.isUTEnv()) {
                    EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
                        NDataflowManager dfMgr = NDataflowManager.getInstance((KylinConfig)KylinConfig.getInstanceFromEnv(), (String)this.mergeJobEntry.project());
                        NDataflow copy = dfMgr.getDataflow(this.mergeJobEntry.dataflowId()).copy();
                        NDataSegment seg = copy.getSegment(this.mergeJobEntry.afterMergeSegment().getId());
                        seg.setStatus(SegmentStatusEnum.READY);
                        seg.setSourceCount(this.mergeJobEntry.afterMergeSegmentSourceCount());
                        NDataflowUpdate dfUpdate = new NDataflowUpdate(this.mergeJobEntry.dataflowId());
                        dfUpdate.setToUpdateSegs(new NDataSegment[]{seg});
                        List<NDataSegment> toRemoveSegs = this.mergeJobEntry.unMergedSegments();
                        dfUpdate.setToRemoveSegs(toRemoveSegs.toArray(new NDataSegment[0]));
                        dfMgr.updateDataflow(dfUpdate);
                        return 0;
                    }, (String)this.mergeJobEntry.project());
                    break block28;
                }
                String url = "/streaming_jobs/dataflow/segment";
                StreamingSegmentRequest req = new StreamingSegmentRequest(this.mergeJobEntry.project(), this.mergeJobEntry.dataflowId(), this.mergeJobEntry.afterMergeSegmentSourceCount());
                req.setRemoveSegment(this.mergeJobEntry.unMergedSegments());
                req.setNewSegId(this.mergeJobEntry.afterMergeSegment().getId());
                req.setJobType(JobTypeEnum.STREAMING_MERGE.name());
                String jobId = StreamingUtils.getJobId((String)this.mergeJobEntry.dataflowId(), (String)req.getJobType());
                req.setJobExecutionId(JobExecutionIdHolder.getJobExecutionId(jobId));
                try (RestSupport rest = new RestSupport(config);){
                    rest.execute((HttpRequestBase)rest.createHttpPut(url), req);
                }
                StreamingUtils.replayAuditlog();
            }
            catch (Exception e) {
                logger.info("merge failed reason: {} stackTrace is: {}", (Object)e.toString(), (Object)e.getStackTrace());
                KylinConfig config = KylinConfig.getInstanceFromEnv();
                if (!config.isUTEnv()) {
                    String url = "/streaming_jobs/dataflow/segment/deletion";
                    StreamingSegmentRequest req = new StreamingSegmentRequest(this.mergeJobEntry.project(), this.mergeJobEntry.dataflowId());
                    req.setRemoveSegment(Collections.singletonList(this.mergeJobEntry.afterMergeSegment()));
                    req.setJobType(JobTypeEnum.STREAMING_MERGE.name());
                    String jobId = StreamingUtils.getJobId((String)this.mergeJobEntry.dataflowId(), (String)req.getJobType());
                    req.setJobExecutionId(JobExecutionIdHolder.getJobExecutionId(jobId));
                    try (RestSupport rest = new RestSupport(config);){
                        rest.execute((HttpRequestBase)rest.createHttpPost(url), req);
                    }
                    StreamingUtils.replayAuditlog();
                }
                throw new KylinException((ErrorCodeSupplier)ServerErrorCode.SEGMENT_MERGE_FAILURE, this.mergeJobEntry.afterMergeSegment().getId());
            }
        }
    }
}

