/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.clone.spits;

import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.clone.files.ListCloneFilesFunction;
import org.apache.paimon.flink.clone.schema.CloneSchemaInfo;
import org.apache.paimon.flink.clone.spits.CloneSplitInfo;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableScan;

public class ListCloneSplitsFunction
extends ProcessFunction<CloneSchemaInfo, CloneSplitInfo> {
    private static final long serialVersionUID = 1L;
    private final Map<String, String> sourceCatalogConfig;
    private final Map<String, String> targetCatalogConfig;
    @Nullable
    private final String whereSql;
    private transient Catalog sourceCatalog;
    private transient Catalog targetCatalog;

    public ListCloneSplitsFunction(Map<String, String> sourceCatalogConfig, Map<String, String> targetCatalogConfig, @Nullable String whereSql) {
        this.sourceCatalogConfig = sourceCatalogConfig;
        this.targetCatalogConfig = targetCatalogConfig;
        this.whereSql = whereSql;
    }

    public void open(OpenContext openContext) throws Exception {
        this.open(new Configuration());
    }

    public void open(Configuration conf) throws Exception {
        this.sourceCatalog = FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(this.sourceCatalogConfig));
        this.targetCatalog = FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(this.targetCatalogConfig));
    }

    public void processElement(CloneSchemaInfo cloneSchemaInfo, ProcessFunction.Context context, Collector<CloneSplitInfo> collector) throws Exception {
        Tuple2<Identifier, Identifier> tuple = cloneSchemaInfo.identifierTuple();
        Table sourceTable = this.sourceCatalog.getTable((Identifier)tuple.f0);
        PartitionPredicate predicate = ListCloneFilesFunction.getPartitionPredicate(this.whereSql, sourceTable.rowType().project(sourceTable.partitionKeys()), (Identifier)tuple.f0);
        TableScan scan = sourceTable.newReadBuilder().withPartitionFilter(predicate).newScan();
        List<Split> splits = scan.plan().splits();
        for (Split split : splits) {
            CloneSplitInfo splitInfo = new CloneSplitInfo((Identifier)tuple.f0, (Identifier)tuple.f1, split);
            collector.collect((Object)splitInfo);
        }
    }
}

