/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.clone;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.FileStore;
import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.clone.CloneFileInfo;
import org.apache.paimon.flink.clone.CloneFilesUtil;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
import org.apache.paimon.shade.guava30.com.google.common.collect.Iterables;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.SnapshotManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CopyMetaFilesForCloneOperator
extends ProcessFunction<Tuple2<String, String>, Void> {
    public static final OutputTag<CloneFileInfo> INDEX_FILES_TAG = new OutputTag<CloneFileInfo>("index-files"){};
    public static final OutputTag<CloneFileInfo> DATA_MANIFEST_FILES_TAG = new OutputTag<CloneFileInfo>("data-manifest-files"){};
    private static final Logger LOG = LoggerFactory.getLogger(CopyMetaFilesForCloneOperator.class);
    private final Map<String, String> sourceCatalogConfig;
    private final Map<String, String> targetCatalogConfig;
    private Catalog sourceCatalog;
    private Catalog targetCatalog;

    public CopyMetaFilesForCloneOperator(Map<String, String> sourceCatalogConfig, Map<String, String> targetCatalogConfig) {
        this.sourceCatalogConfig = sourceCatalogConfig;
        this.targetCatalogConfig = targetCatalogConfig;
    }

    public void open(OpenContext openContext) throws Exception {
        super.open(openContext);
        this.sourceCatalog = FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(this.sourceCatalogConfig));
        this.targetCatalog = FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(this.targetCatalogConfig));
    }

    public void processElement(Tuple2<String, String> tuple, ProcessFunction.Context context, Collector<Void> collector) throws Exception {
        String sourceIdentifierStr = (String)tuple.f0;
        Identifier sourceIdentifier = Identifier.fromString(sourceIdentifierStr);
        String targetIdentifierStr = (String)tuple.f1;
        Identifier targetIdentifier = Identifier.fromString(targetIdentifierStr);
        FileStoreTable sourceTable = (FileStoreTable)this.sourceCatalog.getTable(sourceIdentifier);
        this.targetCatalog.createDatabase(targetIdentifier.getDatabaseName(), true);
        this.targetCatalog.createTable(targetIdentifier, CopyMetaFilesForCloneOperator.newSchemaFromTableSchema(sourceTable.schema()), true);
        FileStoreTable targetTable = (FileStoreTable)this.targetCatalog.getTable(targetIdentifier);
        SchemaManager sourceSchemaManager = sourceTable.schemaManager();
        SchemaManager targetSchemaManager = targetTable.schemaManager();
        FileIO sourceTableFileIO = sourceTable.fileIO();
        FileIO targetTableFileIO = targetTable.fileIO();
        for (long schemaId : sourceSchemaManager.listAllIds()) {
            IOUtils.copyBytes(sourceTableFileIO.newInputStream(sourceSchemaManager.toSchemaPath(schemaId)), targetTableFileIO.newOutputStream(targetSchemaManager.toSchemaPath(schemaId), true));
        }
        FileStore<?> sourceStore = sourceTable.store();
        FileStore<?> targetStore = targetTable.store();
        SnapshotManager sourceSnapshotManager = sourceStore.snapshotManager();
        SnapshotManager targetSnapshotManager = targetStore.snapshotManager();
        Snapshot latestSnapshot = sourceSnapshotManager.latestSnapshot();
        if (latestSnapshot != null) {
            long snapshotId = latestSnapshot.id();
            IOUtils.copyBytes(sourceTableFileIO.newInputStream(sourceSnapshotManager.snapshotPath(snapshotId)), targetTableFileIO.newOutputStream(targetSnapshotManager.snapshotPath(snapshotId), true));
        }
        FileStorePathFactory sourcePathFactory = sourceStore.pathFactory();
        FileStorePathFactory targetPathFactory = targetStore.pathFactory();
        if (latestSnapshot != null) {
            IOUtils.copyBytes(sourceTableFileIO.newInputStream(sourcePathFactory.toManifestListPath(latestSnapshot.baseManifestList())), targetTableFileIO.newOutputStream(targetPathFactory.toManifestListPath(latestSnapshot.baseManifestList()), true));
            IOUtils.copyBytes(sourceTableFileIO.newInputStream(sourcePathFactory.toManifestListPath(latestSnapshot.deltaManifestList())), targetTableFileIO.newOutputStream(targetPathFactory.toManifestListPath(latestSnapshot.deltaManifestList()), true));
            String changelogManifestList = latestSnapshot.changelogManifestList();
            if (changelogManifestList != null) {
                IOUtils.copyBytes(sourceTableFileIO.newInputStream(sourcePathFactory.toManifestListPath(changelogManifestList)), targetTableFileIO.newOutputStream(targetPathFactory.toManifestListPath(changelogManifestList), true));
            }
        }
        List<Object> indexFiles = new ArrayList();
        if (latestSnapshot != null) {
            IndexFileHandler indexFileHandler = sourceStore.newIndexFileHandler();
            String indexManifest = latestSnapshot.indexManifest();
            if (indexManifest != null && indexFileHandler.existsManifest(indexManifest)) {
                IOUtils.copyBytes(sourceTableFileIO.newInputStream(sourcePathFactory.indexManifestFileFactory().toPath(indexManifest)), targetTableFileIO.newOutputStream(targetPathFactory.indexManifestFileFactory().toPath(indexManifest), true));
                List indexManifestEntries = CloneFilesUtil.retryReadingFiles(() -> indexFileHandler.readManifestWithIOException(indexManifest));
                ArrayList<Path> indexFileList = new ArrayList<Path>();
                if (indexManifestEntries != null) {
                    indexManifestEntries.stream().map(IndexManifestEntry::indexFile).map(indexFileHandler::filePath).forEach(indexFileList::add);
                }
                indexFiles = CloneFilesUtil.toCloneFileInfos(indexFileList, sourceTable.location(), sourceIdentifierStr, targetIdentifierStr);
                for (CloneFileInfo cloneFileInfo : indexFiles) {
                    context.output(INDEX_FILES_TAG, (Object)cloneFileInfo);
                }
            }
        }
        if (latestSnapshot != null && latestSnapshot.statistics() != null) {
            IOUtils.copyBytes(sourceTableFileIO.newInputStream(sourcePathFactory.statsFileFactory().toPath(latestSnapshot.statistics())), targetTableFileIO.newOutputStream(targetPathFactory.statsFileFactory().toPath(latestSnapshot.statistics()), true));
        }
        List<Object> dataManifestFiles = new ArrayList();
        if (latestSnapshot != null) {
            List<Path> list = CloneFilesUtil.getManifestUsedFilesForSnapshot(sourceTable, latestSnapshot.id());
            dataManifestFiles = CloneFilesUtil.toCloneFileInfos(list, sourceTable.location(), sourceIdentifierStr, targetIdentifierStr);
        }
        for (CloneFileInfo info : dataManifestFiles) {
            context.output(DATA_MANIFEST_FILES_TAG, (Object)info);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("The CloneFileInfo of table {} is: indexFiles={}, dataManifestFiles={}", new Object[]{sourceTable.location(), indexFiles, dataManifestFiles});
        }
    }

    private static Schema newSchemaFromTableSchema(TableSchema tableSchema) {
        return new Schema(ImmutableList.copyOf(tableSchema.fields()), ImmutableList.copyOf(tableSchema.partitionKeys()), ImmutableList.copyOf(tableSchema.primaryKeys()), ImmutableMap.copyOf(Iterables.filter(tableSchema.options().entrySet(), entry -> !Objects.equals(entry.getKey(), CoreOptions.PATH.key()))), tableSchema.comment());
    }

    public void close() throws Exception {
        if (this.sourceCatalog != null) {
            this.sourceCatalog.close();
        }
        if (this.targetCatalog != null) {
            this.targetCatalog.close();
        }
    }
}

