/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.clone;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.clone.CloneFileInfo;
import org.apache.paimon.flink.clone.PickFilesUtil;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PickFilesForCloneOperator
extends AbstractStreamOperator<CloneFileInfo>
implements OneInputStreamOperator<Tuple2<String, String>, CloneFileInfo> {
    private static final Logger LOG = LoggerFactory.getLogger(PickFilesForCloneOperator.class);
    private final Map<String, String> sourceCatalogConfig;
    private final Map<String, String> targetCatalogConfig;
    private Catalog sourceCatalog;
    private Catalog targetCatalog;

    public PickFilesForCloneOperator(Map<String, String> sourceCatalogConfig, Map<String, String> targetCatalogConfig) {
        this.sourceCatalogConfig = sourceCatalogConfig;
        this.targetCatalogConfig = targetCatalogConfig;
    }

    public void open() throws Exception {
        this.sourceCatalog = FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(this.sourceCatalogConfig));
        this.targetCatalog = FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(this.targetCatalogConfig));
    }

    public void processElement(StreamRecord<Tuple2<String, String>> streamRecord) throws Exception {
        String sourceIdentifierStr = (String)((Tuple2)streamRecord.getValue()).f0;
        Identifier sourceIdentifier = Identifier.fromString(sourceIdentifierStr);
        String targetIdentifierStr = (String)((Tuple2)streamRecord.getValue()).f1;
        Identifier targetIdentifier = Identifier.fromString(targetIdentifierStr);
        FileStoreTable sourceTable = (FileStoreTable)this.sourceCatalog.getTable(sourceIdentifier);
        this.targetCatalog.createDatabase(targetIdentifier.getDatabaseName(), true);
        this.targetCatalog.createTable(targetIdentifier, Schema.fromTableSchema(sourceTable.schema()), true);
        List<CloneFileInfo> result = this.toCloneFileInfos(PickFilesUtil.getUsedFilesForLatestSnapshot(sourceTable), sourceTable.location(), sourceIdentifierStr, targetIdentifierStr);
        if (LOG.isDebugEnabled()) {
            LOG.debug("The CloneFileInfo of table {} is {} : ", (Object)sourceTable.location(), result);
        }
        for (CloneFileInfo info : result) {
            this.output.collect((Object)new StreamRecord((Object)info));
        }
    }

    private List<CloneFileInfo> toCloneFileInfos(List<Path> files, Path sourceTableRoot, String sourceIdentifier, String targetIdentifier) {
        ArrayList<CloneFileInfo> result = new ArrayList<CloneFileInfo>();
        for (Path file : files) {
            Path relativePath = this.getPathExcludeTableRoot(file, sourceTableRoot);
            result.add(new CloneFileInfo(relativePath.toString(), sourceIdentifier, targetIdentifier));
        }
        return result;
    }

    private Path getPathExcludeTableRoot(Path absolutePath, Path sourceTableRoot) {
        String fileAbsolutePath = absolutePath.toUri().toString();
        String sourceTableRootPath = sourceTableRoot.toString();
        Preconditions.checkState(fileAbsolutePath.startsWith(sourceTableRootPath), "File absolute path does not start with source table root path. This is unexpected. fileAbsolutePath is: " + fileAbsolutePath + ", sourceTableRootPath is: " + sourceTableRootPath);
        return new Path(fileAbsolutePath.substring(sourceTableRootPath.length()));
    }

    public void close() throws Exception {
        if (this.sourceCatalog != null) {
            this.sourceCatalog.close();
        }
        if (this.targetCatalog != null) {
            this.targetCatalog.close();
        }
    }
}

