/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.clone.spits;

import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.clone.spits.CloneSplitInfo;
import org.apache.paimon.flink.clone.spits.CommitMessageInfo;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.source.TableRead;

public class CloneSplitsFunction
extends ProcessFunction<CloneSplitInfo, CommitMessageInfo> {
    private static final long serialVersionUID = 1L;
    private final Map<String, String> sourceCatalogConfig;
    private final Map<String, String> targetCatalogConfig;
    private transient Catalog sourceCatalog;
    private transient Catalog targetCatalog;

    public CloneSplitsFunction(Map<String, String> sourceCatalogConfig, Map<String, String> targetCatalogConfig) {
        this.sourceCatalogConfig = sourceCatalogConfig;
        this.targetCatalogConfig = targetCatalogConfig;
    }

    public void open(OpenContext openContext) throws Exception {
        this.open(new Configuration());
    }

    public void open(Configuration conf) throws Exception {
        this.sourceCatalog = FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(this.sourceCatalogConfig));
        this.targetCatalog = FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(this.targetCatalogConfig));
    }

    public void processElement(CloneSplitInfo cloneSplitInfo, ProcessFunction.Context context, Collector<CommitMessageInfo> collector) throws Exception {
        TableRead tableRead = this.sourceCatalog.getTable(cloneSplitInfo.sourceIdentifier()).newReadBuilder().newRead();
        BatchTableWrite write = this.targetCatalog.getTable(cloneSplitInfo.targetidentifier()).newBatchWriteBuilder().newWrite();
        tableRead.createReader(cloneSplitInfo.split()).forEachRemaining(row -> {
            try {
                write.write((InternalRow)row);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        List<CommitMessage> commitMessages = write.prepareCommit();
        for (CommitMessage commitMessage : commitMessages) {
            CommitMessageInfo messageInfo = new CommitMessageInfo(cloneSplitInfo.targetidentifier(), commitMessage);
            collector.collect((Object)messageInfo);
        }
        write.close();
    }
}

