/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.source;

import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.metrics.FlinkStreamReadMetrics;
import org.apache.hudi.source.IncrementalInputSplits;
import org.apache.hudi.source.prune.PartitionPruners;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamReadMonitoringFunction
extends RichSourceFunction<MergeOnReadInputSplit>
implements CheckpointedFunction {
    private static final Logger LOG = LoggerFactory.getLogger(StreamReadMonitoringFunction.class);
    private static final long serialVersionUID = 1L;
    private final Path path;
    private final long interval;
    private final boolean cdcEnabled;
    private transient Object checkpointLock;
    private volatile boolean isRunning = true;
    private String issuedInstant;
    private String issuedOffset;
    private transient ListState<String> instantState;
    private final Configuration conf;
    private HoodieTableMetaClient metaClient;
    private final IncrementalInputSplits incrementalInputSplits;
    private transient FlinkStreamReadMetrics readMetrics;

    public StreamReadMonitoringFunction(Configuration conf, Path path, RowType rowType, long maxCompactionMemoryInBytes, @Nullable PartitionPruners.PartitionPruner partitionPruner) {
        this.conf = conf;
        this.path = path;
        this.interval = conf.getInteger(FlinkOptions.READ_STREAMING_CHECK_INTERVAL);
        this.cdcEnabled = conf.getBoolean(FlinkOptions.CDC_ENABLED);
        this.incrementalInputSplits = IncrementalInputSplits.builder().conf(conf).path(path).rowType(rowType).maxCompactionMemoryInBytes(maxCompactionMemoryInBytes).partitionPruner(partitionPruner).skipCompaction(conf.getBoolean(FlinkOptions.READ_STREAMING_SKIP_COMPACT)).skipClustering(conf.getBoolean(FlinkOptions.READ_STREAMING_SKIP_CLUSTERING)).skipInsertOverwrite(conf.getBoolean(FlinkOptions.READ_STREAMING_SKIP_INSERT_OVERWRITE)).build();
    }

    public void initializeState(FunctionInitializationContext context) throws Exception {
        ValidationUtils.checkState((this.instantState == null ? 1 : 0) != 0, (String)("The " + ((Object)((Object)this)).getClass().getSimpleName() + " has already been initialized."));
        this.registerMetrics();
        this.instantState = context.getOperatorStateStore().getListState(new ListStateDescriptor("file-monitoring-state", (TypeSerializer)StringSerializer.INSTANCE));
        if (context.isRestored()) {
            LOG.info("Restoring state for the class {} with table {} and base path {}.", new Object[]{((Object)((Object)this)).getClass().getSimpleName(), this.conf.getString(FlinkOptions.TABLE_NAME), this.path});
            ArrayList<String> retrievedStates = new ArrayList<String>();
            for (String entry : (Iterable)this.instantState.get()) {
                retrievedStates.add(entry);
            }
            ValidationUtils.checkArgument((retrievedStates.size() <= 2 ? 1 : 0) != 0, (String)(((Object)((Object)this)).getClass().getSimpleName() + " retrieved invalid state."));
            if (retrievedStates.size() == 1 && this.issuedInstant != null) {
                throw new IllegalArgumentException("The " + ((Object)((Object)this)).getClass().getSimpleName() + " has already restored from a previous Flink version.");
            }
            if (retrievedStates.size() == 1) {
                this.issuedInstant = (String)retrievedStates.get(0);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("{} retrieved an issued instant of time {} for table {} with path {}.", new Object[]{((Object)((Object)this)).getClass().getSimpleName(), this.issuedInstant, this.conf.get(FlinkOptions.TABLE_NAME), this.path});
                }
            } else if (retrievedStates.size() == 2) {
                this.issuedInstant = (String)retrievedStates.get(0);
                this.issuedOffset = (String)retrievedStates.get(1);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("{} retrieved an issued instant of time [{}, {}] for table {} with path {}.", new Object[]{((Object)((Object)this)).getClass().getSimpleName(), this.issuedInstant, this.issuedOffset, this.conf.get(FlinkOptions.TABLE_NAME), this.path});
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run(SourceFunction.SourceContext<MergeOnReadInputSplit> context) throws Exception {
        this.checkpointLock = context.getCheckpointLock();
        while (this.isRunning) {
            Object object = this.checkpointLock;
            synchronized (object) {
                this.monitorDirAndForwardSplits(context);
            }
            TimeUnit.SECONDS.sleep(this.interval);
        }
    }

    @Nullable
    private HoodieTableMetaClient getOrCreateMetaClient() {
        if (this.metaClient != null) {
            return this.metaClient;
        }
        org.apache.hadoop.conf.Configuration hadoopConf = HadoopConfigurations.getHadoopConf(this.conf);
        if (StreamerUtil.tableExists(this.path.toString(), hadoopConf)) {
            this.metaClient = StreamerUtil.createMetaClient(this.path.toString(), hadoopConf);
            return this.metaClient;
        }
        return null;
    }

    @VisibleForTesting
    public void monitorDirAndForwardSplits(SourceFunction.SourceContext<MergeOnReadInputSplit> context) {
        HoodieTableMetaClient metaClient = this.getOrCreateMetaClient();
        if (metaClient == null) {
            return;
        }
        IncrementalInputSplits.Result result = this.incrementalInputSplits.inputSplits(metaClient, this.issuedOffset, this.cdcEnabled);
        if (result.isEmpty() && StringUtils.isNullOrEmpty((String)result.getEndInstant())) {
            LOG.warn("No new instants to read for current run.");
            return;
        }
        for (MergeOnReadInputSplit split : result.getInputSplits()) {
            context.collect((Object)split);
        }
        this.issuedInstant = result.getEndInstant();
        this.issuedOffset = result.getOffset();
        LOG.info("\n------------------------------------------------------------\n---------- table: {}\n---------- consumed to instant: {}\n------------------------------------------------------------", (Object)this.conf.getString(FlinkOptions.TABLE_NAME), (Object)this.issuedInstant);
        if (result.isEmpty()) {
            LOG.warn("No new files to read for current run.");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() throws Exception {
        super.close();
        if (this.checkpointLock != null) {
            Object object = this.checkpointLock;
            synchronized (object) {
                this.issuedInstant = null;
                this.isRunning = false;
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Closed File Monitoring Source for path: " + this.path + ".");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cancel() {
        if (this.checkpointLock != null) {
            Object object = this.checkpointLock;
            synchronized (object) {
                this.isRunning = false;
            }
        } else {
            this.isRunning = false;
        }
    }

    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        this.instantState.clear();
        if (this.issuedInstant != null) {
            this.instantState.add((Object)this.issuedInstant);
            this.readMetrics.setIssuedInstant(this.issuedInstant);
        }
        if (this.issuedOffset != null) {
            this.instantState.add((Object)this.issuedOffset);
        }
    }

    private void registerMetrics() {
        OperatorMetricGroup metrics = this.getRuntimeContext().getMetricGroup();
        this.readMetrics = new FlinkStreamReadMetrics((MetricGroup)metrics);
        this.readMetrics.registerMetrics();
    }

    public String getIssuedOffset() {
        return this.issuedOffset;
    }
}

