/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.clone;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.paimon.FileStore;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.clone.CloneFileInfo;
import org.apache.paimon.flink.clone.CloneFilesUtil;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.io.SingleFileWriter;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CopyManifestFileOperator
extends AbstractStreamOperator<CloneFileInfo>
implements OneInputStreamOperator<CloneFileInfo, CloneFileInfo> {
    private static final Logger LOG = LoggerFactory.getLogger(CopyManifestFileOperator.class);
    private final Map<String, String> sourceCatalogConfig;
    private final Map<String, String> targetCatalogConfig;
    private transient Catalog sourceCatalog;
    private transient Catalog targetCatalog;
    private transient Map<String, FileIO> srcFileIOs;
    private transient Map<String, FileIO> targetFileIOs;
    private transient Map<String, Path> targetLocations;

    public CopyManifestFileOperator(Map<String, String> sourceCatalogConfig, Map<String, String> targetCatalogConfig) {
        this.sourceCatalogConfig = sourceCatalogConfig;
        this.targetCatalogConfig = targetCatalogConfig;
    }

    public void open() throws Exception {
        this.sourceCatalog = FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(this.sourceCatalogConfig));
        this.targetCatalog = FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(this.targetCatalogConfig));
        this.srcFileIOs = new HashMap<String, FileIO>();
        this.targetFileIOs = new HashMap<String, FileIO>();
        this.targetLocations = new HashMap<String, Path>();
    }

    public void processElement(StreamRecord<CloneFileInfo> streamRecord) throws Exception {
        CloneFileInfo cloneFileInfo = (CloneFileInfo)streamRecord.getValue();
        FileIO sourceTableFileIO = CloneFilesUtil.getFileIO(this.srcFileIOs, cloneFileInfo.getSourceIdentifier(), this.sourceCatalog);
        FileIO targetTableFileIO = CloneFilesUtil.getFileIO(this.targetFileIOs, cloneFileInfo.getTargetIdentifier(), this.targetCatalog);
        Path targetTableRootPath = CloneFilesUtil.getPath(this.targetLocations, cloneFileInfo.getTargetIdentifier(), this.targetCatalog);
        String filePathExcludeTableRoot = cloneFileInfo.getFilePathExcludeTableRoot();
        Path sourcePath = new Path(cloneFileInfo.getSourceFilePath());
        Path targetPath = new Path(targetTableRootPath + filePathExcludeTableRoot);
        if (targetTableFileIO.exists(targetPath) && targetTableFileIO.getFileSize(targetPath) == sourceTableFileIO.getFileSize(sourcePath)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Skipping clone target file {} because it already exists and has the same size.", (Object)targetPath);
            }
            this.copyOrRewriteManifestFile(sourceTableFileIO, targetTableFileIO, sourcePath, targetPath, cloneFileInfo, false);
            return;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Begin copy file from {} to {}.", (Object)sourcePath, (Object)targetPath);
        }
        this.copyOrRewriteManifestFile(sourceTableFileIO, targetTableFileIO, sourcePath, targetPath, cloneFileInfo, true);
        if (LOG.isDebugEnabled()) {
            LOG.debug("End copy file from {} to {}.", (Object)sourcePath, (Object)targetPath);
        }
    }

    private void copyOrRewriteManifestFile(FileIO sourceTableFileIO, FileIO targetTableFileIO, Path sourcePath, Path targetPath, CloneFileInfo cloneFileInfo, boolean needCopyManifestFile) throws IOException, Catalog.TableNotExistException {
        Identifier sourceIdentifier = Identifier.fromString(cloneFileInfo.getSourceIdentifier());
        FileStoreTable sourceTable = (FileStoreTable)this.sourceCatalog.getTable(sourceIdentifier);
        FileStore<?> store = sourceTable.store();
        ManifestFile manifestFile = store.manifestFileFactory().create();
        List<ManifestEntry> manifestEntries = manifestFile.readWithIOException(sourcePath.getName());
        ArrayList<ManifestEntry> targetManifestEntries = new ArrayList<ManifestEntry>(manifestEntries.size());
        if (needCopyManifestFile) {
            if (this.containsExternalPath(manifestEntries)) {
                for (ManifestEntry manifestEntry : manifestEntries) {
                    ManifestEntry newManifestEntry = new ManifestEntry(manifestEntry.kind(), manifestEntry.partition(), manifestEntry.bucket(), manifestEntry.totalBuckets(), manifestEntry.file().newExternalPath(null));
                    targetManifestEntries.add(newManifestEntry);
                }
                ManifestFile.ManifestEntryWriter manifestEntryWriter = manifestFile.createManifestEntryWriter(targetPath);
                ((SingleFileWriter)manifestEntryWriter).write(targetManifestEntries);
                manifestEntryWriter.close();
            } else {
                IOUtils.copyBytes(sourceTableFileIO.newInputStream(sourcePath), targetTableFileIO.newOutputStream(targetPath, true));
            }
        }
        this.pickDataFilesForClone(manifestEntries, store, cloneFileInfo);
    }

    private void pickDataFilesForClone(List<ManifestEntry> manifestEntries, FileStore<?> store, CloneFileInfo cloneFileInfo) {
        for (ManifestEntry manifestEntry : manifestEntries) {
            FileStorePathFactory fileStorePathFactory = store.pathFactory();
            Path dataFilePath = fileStorePathFactory.createDataFilePathFactory(manifestEntry.partition(), manifestEntry.bucket()).toPath(manifestEntry);
            Path relativeBucketPath = fileStorePathFactory.relativeBucketPath(manifestEntry.partition(), manifestEntry.bucket());
            Path relativeTablePath = new Path("/" + relativeBucketPath, dataFilePath.getName());
            this.output.collect((Object)new StreamRecord((Object)new CloneFileInfo(dataFilePath.toString(), relativeTablePath.toString(), cloneFileInfo.getSourceIdentifier(), cloneFileInfo.getTargetIdentifier())));
        }
    }

    private boolean containsExternalPath(List<ManifestEntry> manifestEntries) {
        boolean result = false;
        for (ManifestEntry manifestEntry : manifestEntries) {
            if (!manifestEntry.file().externalPath().isPresent()) continue;
            result = true;
            break;
        }
        return result;
    }

    public void close() throws Exception {
        if (this.sourceCatalog != null) {
            this.sourceCatalog.close();
        }
        if (this.targetCatalog != null) {
            this.targetCatalog.close();
        }
    }
}

