/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.copy;

import java.io.FileNotFoundException;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.copy.CopyFileInfo;
import org.apache.paimon.flink.copy.CopyFilesUtil;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.utils.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CopyDataFileOperator
extends AbstractStreamOperator<CopyFileInfo>
implements OneInputStreamOperator<CopyFileInfo, CopyFileInfo> {
    private static final Logger LOG = LoggerFactory.getLogger(CopyDataFileOperator.class);
    private final Map<String, String> sourceCatalogConfig;
    private final Map<String, String> targetCatalogConfig;
    private transient Catalog sourceCatalog;
    private transient Catalog targetCatalog;
    private transient Map<String, FileIO> srcFileIOs;
    private transient Map<String, FileIO> targetFileIOs;
    private transient Map<String, Path> targetLocations;

    public CopyDataFileOperator(Map<String, String> sourceCatalogConfig, Map<String, String> targetCatalogConfig) {
        this.sourceCatalogConfig = sourceCatalogConfig;
        this.targetCatalogConfig = targetCatalogConfig;
    }

    public void open() throws Exception {
        this.sourceCatalog = FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(this.sourceCatalogConfig));
        this.targetCatalog = FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(this.targetCatalogConfig));
        this.srcFileIOs = new HashMap<String, FileIO>();
        this.targetFileIOs = new HashMap<String, FileIO>();
        this.targetLocations = new HashMap<String, Path>();
    }

    public void processElement(StreamRecord<CopyFileInfo> streamRecord) throws Exception {
        CopyFileInfo copyFileInfo = (CopyFileInfo)streamRecord.getValue();
        FileIO sourceTableFileIO = CopyFilesUtil.getFileIO(this.srcFileIOs, copyFileInfo.getSourceIdentifier(), this.sourceCatalog);
        FileIO targetTableFileIO = CopyFilesUtil.getFileIO(this.targetFileIOs, copyFileInfo.getTargetIdentifier(), this.targetCatalog);
        Path targetTableRootPath = CopyFilesUtil.getPath(this.targetLocations, copyFileInfo.getTargetIdentifier(), this.targetCatalog);
        String filePathExcludeTableRoot = copyFileInfo.getFilePathExcludeTableRoot();
        Path sourcePath = new Path(copyFileInfo.getSourceFilePath());
        Path targetPath = new Path(targetTableRootPath + filePathExcludeTableRoot);
        try {
            if (targetTableFileIO.exists(targetPath) && targetTableFileIO.getFileSize(targetPath) == sourceTableFileIO.getFileSize(sourcePath)) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Skipping copy target file {} because it already exists and has the same size.", (Object)targetPath);
                }
                this.output.collect(streamRecord);
                return;
            }
        }
        catch (FileNotFoundException e) {
            LOG.warn("File {} does not exist. ignore it", (Object)sourcePath, (Object)e);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Begin copy file from {} to {}.", (Object)sourcePath, (Object)targetPath);
        }
        try {
            IOUtils.copyBytes(sourceTableFileIO.newInputStream(sourcePath), targetTableFileIO.newOutputStream(targetPath, true));
        }
        catch (FileNotFoundException e) {
            LOG.warn("File {} does not exist. ignore it", (Object)sourcePath, (Object)e);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("End copy file from {} to {}.", (Object)sourcePath, (Object)targetPath);
        }
        this.output.collect(streamRecord);
    }

    public void close() throws Exception {
        if (this.sourceCatalog != null) {
            this.sourceCatalog.close();
        }
        if (this.targetCatalog != null) {
            this.targetCatalog.close();
        }
    }
}

