/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source.operator;

import java.util.regex.Pattern;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.paimon.append.MultiTableUnawareAppendCompactionTask;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.compact.MultiTableScanBase;
import org.apache.paimon.flink.compact.MultiUnawareBucketTableScan;
import org.apache.paimon.flink.sink.MultiTableCompactionTaskTypeInfo;
import org.apache.paimon.flink.source.operator.CombinedCompactorSourceFunction;

public class CombinedUnawareStreamingSourceFunction
extends CombinedCompactorSourceFunction<MultiTableUnawareAppendCompactionTask> {
    private final long monitorInterval;
    private MultiTableScanBase<MultiTableUnawareAppendCompactionTask> tableScan;

    public CombinedUnawareStreamingSourceFunction(Catalog.Loader catalogLoader, Pattern includingPattern, Pattern excludingPattern, Pattern databasePattern, long monitorInterval) {
        super(catalogLoader, includingPattern, excludingPattern, databasePattern, true);
        this.monitorInterval = monitorInterval;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.tableScan = new MultiUnawareBucketTableScan(this.catalogLoader, this.includingPattern, this.excludingPattern, this.databasePattern, this.isStreaming, this.isRunning);
    }

    @Override
    void scanTable() throws Exception {
        while (this.isRunning.get()) {
            MultiTableScanBase.ScanResult scanResult = this.tableScan.scanTable((SourceFunction.SourceContext<MultiTableUnawareAppendCompactionTask>)this.ctx);
            if (scanResult == MultiTableScanBase.ScanResult.FINISHED) {
                return;
            }
            if (scanResult != MultiTableScanBase.ScanResult.IS_EMPTY) continue;
            Thread.sleep(this.monitorInterval);
        }
    }

    public static DataStream<MultiTableUnawareAppendCompactionTask> buildSource(StreamExecutionEnvironment env, String name, Catalog.Loader catalogLoader, Pattern includingPattern, Pattern excludingPattern, Pattern databasePattern, long monitorInterval) {
        CombinedUnawareStreamingSourceFunction function = new CombinedUnawareStreamingSourceFunction(catalogLoader, includingPattern, excludingPattern, databasePattern, monitorInterval);
        StreamSource sourceOperator = new StreamSource((SourceFunction)function);
        boolean isParallel = false;
        MultiTableCompactionTaskTypeInfo compactionTaskTypeInfo = new MultiTableCompactionTaskTypeInfo();
        return new DataStreamSource(env, (TypeInformation)compactionTaskTypeInfo, sourceOperator, isParallel, name, Boundedness.CONTINUOUS_UNBOUNDED).forceNonParallel().rebalance();
    }

    public void close() throws Exception {
        super.close();
        if (this.tableScan != null) {
            this.tableScan.close();
        }
    }
}

