/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink.compact;

import com.beust.jcommander.JCommander;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.compact.CompactFunction;
import org.apache.hudi.sink.compact.CompactionCommitEvent;
import org.apache.hudi.sink.compact.CompactionCommitSink;
import org.apache.hudi.sink.compact.CompactionPlanSourceFunction;
import org.apache.hudi.sink.compact.FlinkCompactionConfig;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HoodieFlinkCompactor {
    protected static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkCompactor.class);

    public static void main(String[] args) throws Exception {
        HoodieCompactionPlan compactionPlan;
        Option requested;
        Option<String> compactionInstantTimeOption;
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        FlinkCompactionConfig cfg = new FlinkCompactionConfig();
        JCommander cmd = new JCommander((Object)cfg, null, args);
        if (cfg.help.booleanValue() || args.length == 0) {
            cmd.usage();
            System.exit(1);
        }
        Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
        HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
        conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName());
        CompactionUtil.setAvroSchema(conf, metaClient);
        CompactionUtil.inferChangelogMode(conf, metaClient);
        HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf);
        HoodieFlinkTable table = writeClient.getHoodieTable();
        if (cfg.schedule.booleanValue() && (compactionInstantTimeOption = CompactionUtil.getCompactionInstantTime(metaClient)).isPresent()) {
            boolean scheduled = writeClient.scheduleCompactionAtInstant((String)compactionInstantTimeOption.get(), Option.empty());
            if (!scheduled) {
                LOG.info("No compaction plan for this job ");
                return;
            }
            table.getMetaClient().reloadActiveTimeline();
        }
        HoodieTimeline timeline = table.getActiveTimeline().filterPendingCompactionTimeline();
        Option option = requested = CompactionUtil.isLIFO(cfg.compactionSeq) ? timeline.lastInstant() : timeline.firstInstant();
        if (!requested.isPresent()) {
            LOG.info("No compaction plan scheduled, turns on the compaction plan schedule with --schedule option");
            return;
        }
        String compactionInstantTime = ((HoodieInstant)requested.get()).getTimestamp();
        HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant((String)compactionInstantTime);
        if (timeline.containsInstant(inflightInstant)) {
            LOG.info("Rollback inflight compaction instant: [" + compactionInstantTime + "]");
            table.rollbackInflightCompaction(inflightInstant);
            table.getMetaClient().reloadActiveTimeline();
        }
        if ((compactionPlan = CompactionUtils.getCompactionPlan((HoodieTableMetaClient)table.getMetaClient(), (String)compactionInstantTime)) == null || compactionPlan.getOperations() == null || compactionPlan.getOperations().isEmpty()) {
            LOG.info("No compaction plan for instant " + compactionInstantTime);
            return;
        }
        HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant((String)compactionInstantTime);
        HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
        if (!pendingCompactionTimeline.containsInstant(instant)) {
            LOG.warn("The compaction plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\nClean the compaction plan in auxiliary path and cancels the compaction");
            CompactionUtil.cleanInstant(table.getMetaClient(), instant);
            return;
        }
        int compactionParallelism = conf.getInteger(FlinkOptions.COMPACTION_TASKS) == -1 ? compactionPlan.getOperations().size() : conf.getInteger(FlinkOptions.COMPACTION_TASKS);
        table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
        env.addSource((SourceFunction)new CompactionPlanSourceFunction(compactionPlan, compactionInstantTime)).name("compaction_source").uid("uid_compaction_source").rebalance().transform("compact_task", TypeInformation.of(CompactionCommitEvent.class), (OneInputStreamOperator)new ProcessOperator((ProcessFunction)new CompactFunction(conf))).setParallelism(compactionParallelism).addSink((SinkFunction)new CompactionCommitSink(conf)).name("clean_commits").uid("uid_clean_commits").setParallelism(1);
        env.execute("flink_hudi_compaction");
        writeClient.close();
    }
}

