/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.clone.files;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.clone.files.DataFileInfo;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFileMetaSerializer;
import org.apache.paimon.migrate.FileMetaUtils;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.CommitMessage;

public class CloneFilesCommitOperator
extends AbstractStreamOperator<Long>
implements OneInputStreamOperator<DataFileInfo, Long>,
BoundedOneInput {
    private static final long serialVersionUID = 1L;
    private final Map<String, String> catalogConfig;
    private transient DataFileMetaSerializer dataFileSerializer;
    private transient Map<Identifier, Map<BinaryRow, List<DataFileMeta>>> files;

    public CloneFilesCommitOperator(Map<String, String> catalogConfig) {
        this.catalogConfig = catalogConfig;
    }

    public void open() throws Exception {
        super.open();
        this.dataFileSerializer = new DataFileMetaSerializer();
        this.files = new HashMap<Identifier, Map<BinaryRow, List<DataFileMeta>>>();
    }

    public void processElement(StreamRecord<DataFileInfo> streamRecord) throws Exception {
        DataFileInfo file = (DataFileInfo)streamRecord.getValue();
        BinaryRow partition = file.partition();
        List files = this.files.computeIfAbsent(file.identifier(), k -> new HashMap()).computeIfAbsent(partition, k -> new ArrayList());
        files.add(this.dataFileSerializer.deserializeFromBytes(file.dataFileMeta()));
    }

    public void endInput() throws Exception {
        try (Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(this.catalogConfig));){
            for (Map.Entry<Identifier, Map<BinaryRow, List<DataFileMeta>>> entry : this.files.entrySet()) {
                ArrayList<CommitMessage> commitMessages = new ArrayList<CommitMessage>();
                for (Map.Entry<BinaryRow, List<DataFileMeta>> listEntry : entry.getValue().entrySet()) {
                    commitMessages.add(FileMetaUtils.createCommitMessage(listEntry.getKey(), 0, listEntry.getValue()));
                }
                Table table = catalog.getTable(entry.getKey());
                BatchTableCommit commit = table.newBatchWriteBuilder().withOverwrite().newCommit();
                Throwable throwable = null;
                try {
                    commit.commit(commitMessages);
                }
                catch (Throwable throwable2) {
                    throwable = throwable2;
                    throw throwable2;
                }
                finally {
                    if (commit == null) continue;
                    if (throwable != null) {
                        try {
                            commit.close();
                        }
                        catch (Throwable throwable3) {
                            throwable.addSuppressed(throwable3);
                        }
                        continue;
                    }
                    commit.close();
                }
            }
        }
    }
}

