/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source.operator;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.paimon.append.MultiTableUnawareAppendCompactionTask;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.compact.MultiTableScanBase;
import org.apache.paimon.flink.compact.MultiUnawareBucketTableScan;
import org.apache.paimon.flink.sink.MultiTableCompactionTaskTypeInfo;
import org.apache.paimon.flink.source.operator.CombinedCompactorSourceFunction;
import org.apache.paimon.flink.source.operator.MultiUnawareTablesReadOperator;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CombinedUnawareBatchSourceFunction
extends CombinedCompactorSourceFunction<MultiTableUnawareAppendCompactionTask> {
    private static final Logger LOGGER = LoggerFactory.getLogger(CombinedUnawareBatchSourceFunction.class);
    private transient MultiTableScanBase<MultiTableUnawareAppendCompactionTask> tableScan;

    public CombinedUnawareBatchSourceFunction(Catalog.Loader catalogLoader, Pattern includingPattern, Pattern excludingPattern, Pattern databasePattern) {
        super(catalogLoader, includingPattern, excludingPattern, databasePattern, false);
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.tableScan = new MultiUnawareBucketTableScan(this.catalogLoader, this.includingPattern, this.excludingPattern, this.databasePattern, this.isStreaming, this.isRunning);
    }

    @Override
    void scanTable() throws Exception {
        if (this.isRunning.get()) {
            MultiTableScanBase.ScanResult scanResult = this.tableScan.scanTable((SourceFunction.SourceContext<MultiTableUnawareAppendCompactionTask>)this.ctx);
            if (scanResult == MultiTableScanBase.ScanResult.FINISHED) {
                return;
            }
            if (scanResult == MultiTableScanBase.ScanResult.IS_EMPTY) {
                LOGGER.info("No file were collected for the table of unaware-bucket");
            }
        }
    }

    public static DataStream<MultiTableUnawareAppendCompactionTask> buildSource(StreamExecutionEnvironment env, String name, Catalog.Loader catalogLoader, Pattern includingPattern, Pattern excludingPattern, Pattern databasePattern, @Nullable Duration partitionIdleTime) {
        CombinedUnawareBatchSourceFunction function = new CombinedUnawareBatchSourceFunction(catalogLoader, includingPattern, excludingPattern, databasePattern);
        StreamSource sourceOperator = new StreamSource((SourceFunction)function);
        MultiTableCompactionTaskTypeInfo compactionTaskTypeInfo = new MultiTableCompactionTaskTypeInfo();
        SingleOutputStreamOperator source = new DataStreamSource(env, (TypeInformation)compactionTaskTypeInfo, sourceOperator, false, name, Boundedness.BOUNDED).forceNonParallel();
        if (partitionIdleTime != null) {
            source = source.transform(name, (TypeInformation)compactionTaskTypeInfo, (OneInputStreamOperator)new MultiUnawareTablesReadOperator(catalogLoader, partitionIdleTime));
        }
        PartitionTransformation transformation = new PartitionTransformation(source.getTransformation(), (StreamPartitioner)new RebalancePartitioner());
        return new DataStream(env, (Transformation)transformation);
    }

    private static Long getPartitionInfo(Identifier tableIdentifier, BinaryRow partition, Map<Identifier, Map<BinaryRow, Long>> multiTablesPartitionInfo, Catalog catalog) {
        Map<BinaryRow, Long> partitionInfo = multiTablesPartitionInfo.get(tableIdentifier);
        if (partitionInfo == null) {
            try {
                Table table = catalog.getTable(tableIdentifier);
                if (!(table instanceof FileStoreTable)) {
                    LOGGER.error(String.format("Only FileStoreTable supports compact action. The table type is '%s'.", table.getClass().getName()));
                }
                FileStoreTable fileStoreTable = (FileStoreTable)table;
                List<PartitionEntry> partitions = fileStoreTable.newSnapshotReader().partitionEntries();
                partitionInfo = partitions.stream().collect(Collectors.toMap(PartitionEntry::partition, PartitionEntry::lastFileCreationTime));
                multiTablesPartitionInfo.put(tableIdentifier, partitionInfo);
            }
            catch (Catalog.TableNotExistException e) {
                LOGGER.error(String.format("table: %s not found.", tableIdentifier.getFullName()));
            }
        }
        return partitionInfo.get(partition);
    }

    public void close() throws Exception {
        super.close();
        if (this.tableScan != null) {
            this.tableScan.close();
        }
    }
}

