/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.iceberg.migrate;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.factories.FactoryException;
import org.apache.paimon.factories.FactoryUtil;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.iceberg.IcebergOptions;
import org.apache.paimon.iceberg.IcebergPathFactory;
import org.apache.paimon.iceberg.manifest.IcebergDataFileMeta;
import org.apache.paimon.iceberg.manifest.IcebergManifestEntry;
import org.apache.paimon.iceberg.manifest.IcebergManifestFile;
import org.apache.paimon.iceberg.manifest.IcebergManifestFileMeta;
import org.apache.paimon.iceberg.manifest.IcebergManifestList;
import org.apache.paimon.iceberg.metadata.IcebergDataField;
import org.apache.paimon.iceberg.metadata.IcebergMetadata;
import org.apache.paimon.iceberg.metadata.IcebergPartitionField;
import org.apache.paimon.iceberg.metadata.IcebergPartitionSpec;
import org.apache.paimon.iceberg.metadata.IcebergSchema;
import org.apache.paimon.iceberg.migrate.IcebergMigrateMetadata;
import org.apache.paimon.iceberg.migrate.IcebergMigrateMetadataFactory;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.migrate.FileMetaUtils;
import org.apache.paimon.migrate.Migrator;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.shade.org.apache.avro.file.DataFileStream;
import org.apache.paimon.shade.org.apache.avro.generic.GenericDatumReader;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.types.DataField;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.ThreadPoolUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IcebergMigrator
implements Migrator {
    private static final Logger LOG = LoggerFactory.getLogger(IcebergMigrator.class);
    private final ThreadPoolExecutor executor;
    private final Catalog paimonCatalog;
    private final String paimonDatabaseName;
    private final String paimonTableName;
    private final CoreOptions coreOptions;
    private final String icebergDatabaseName;
    private final String icebergTableName;
    private final Options icebergOptions;
    private final IcebergMigrateMetadata icebergMigrateMetadata;
    private final IcebergPathFactory icebergMetaPathFactory;
    private final String icebergLatestMetadataLocation;
    private final IcebergMetadata icebergMetadata;
    private Boolean deleteOriginTable = true;

    public IcebergMigrator(Catalog paimonCatalog, String paimonDatabaseName, String paimonTableName, String icebergDatabaseName, String icebergTableName, Options icebergOptions, Integer parallelism, Map<String, String> options) {
        IcebergMigrateMetadataFactory icebergMigrateMetadataFactory;
        this.paimonCatalog = paimonCatalog;
        this.paimonDatabaseName = paimonDatabaseName;
        this.paimonTableName = paimonTableName;
        this.coreOptions = new CoreOptions(options);
        Preconditions.checkArgument(this.coreOptions.bucket() == -1, "Iceberg migrator only support unaware-bucket target table, bucket should be -1");
        Preconditions.checkArgument(!options.containsKey(CoreOptions.PRIMARY_KEY.key()), "Iceberg migrator does not support define primary key for target table.");
        this.icebergDatabaseName = icebergDatabaseName;
        this.icebergTableName = icebergTableName;
        this.icebergOptions = icebergOptions;
        Preconditions.checkArgument(icebergOptions.containsKey(IcebergOptions.METADATA_ICEBERG_STORAGE.key()), "'metadata.iceberg.storage' is required, please make sure it has been set.");
        try {
            icebergMigrateMetadataFactory = FactoryUtil.discoverFactory(IcebergMigrator.class.getClassLoader(), IcebergMigrateMetadataFactory.class, icebergOptions.get(IcebergOptions.METADATA_ICEBERG_STORAGE).toString() + "_migrate");
        }
        catch (FactoryException e) {
            throw new RuntimeException("create IcebergMigrateMetadataFactory failed.", e);
        }
        this.icebergMigrateMetadata = icebergMigrateMetadataFactory.create(Identifier.create(icebergDatabaseName, icebergTableName), icebergOptions);
        this.icebergMetadata = this.icebergMigrateMetadata.icebergMetadata();
        this.icebergLatestMetadataLocation = this.icebergMigrateMetadata.icebergLatestMetadataLocation();
        this.icebergMetaPathFactory = new IcebergPathFactory(new Path(this.icebergLatestMetadataLocation).getParent());
        this.executor = ThreadPoolUtils.createCachedThreadPool(parallelism, "ICEBERG_MIGRATOR");
    }

    @Override
    public void executeMigrate() throws Exception {
        List<TableSchema> paimonSchemas = this.icebergSchemasToPaimonSchemas(this.icebergMetadata);
        Preconditions.checkArgument(!paimonSchemas.isEmpty(), "paimon schemas transformed from iceberg table is empty.");
        Identifier paimonIdentifier = Identifier.create(this.paimonDatabaseName, this.paimonTableName);
        this.paimonCatalog.createDatabase(this.paimonDatabaseName, true);
        TableSchema firstSchema = paimonSchemas.get(0);
        Preconditions.checkArgument(firstSchema.id() == 0L, "Unexpected, first schema id is not 0.");
        this.paimonCatalog.createTable(paimonIdentifier, firstSchema.toSchema(), false);
        try {
            FileStoreTable paimonTable = (FileStoreTable)this.paimonCatalog.getTable(paimonIdentifier);
            FileIO fileIO = paimonTable.fileIO();
            SchemaManager schemaManager = paimonTable.schemaManager();
            for (int i = 1; i < paimonSchemas.size(); ++i) {
                LOG.info("commit new schema from iceberg, new schema id:{}", (Object)paimonSchemas.get(i).id());
                schemaManager.commit(paimonSchemas.get(i));
            }
            IcebergManifestFile manifestFile = IcebergManifestFile.create(paimonTable, this.icebergMetaPathFactory);
            IcebergManifestList manifestList = IcebergManifestList.create(paimonTable, this.icebergMetaPathFactory);
            List<IcebergManifestFileMeta> icebergManifestFileMetas = manifestList.read(this.icebergMetadata.currentSnapshot().manifestList());
            this.checkAndFilterManifestFiles(icebergManifestFileMetas);
            HashMap<Long, List> icebergEntries = new HashMap<Long, List>();
            for (IcebergManifestFileMeta icebergManifestFileMeta : icebergManifestFileMetas) {
                long l = this.getSchemaIdFromIcebergManifestFile(new Path(icebergManifestFileMeta.manifestPath()), fileIO);
                List<IcebergManifestEntry> entries = manifestFile.read(icebergManifestFileMeta);
                icebergEntries.computeIfAbsent(l, v -> new ArrayList()).addAll(entries.stream().filter(IcebergManifestEntry::isLive).collect(Collectors.toList()));
            }
            ArrayList<IcebergDataFileMeta> icebergDataFileMetas = new ArrayList<IcebergDataFileMeta>();
            for (Map.Entry entry2 : icebergEntries.entrySet()) {
                icebergDataFileMetas.addAll(((List)entry2.getValue()).stream().map(entry -> entry.file().withSchemaId((Long)kv.getKey())).collect(Collectors.toList()));
            }
            if (icebergDataFileMetas.isEmpty()) {
                LOG.info("No live iceberg data files in iceberg table for snapshot {}, iceberg table meta path is {}.", (Object)this.icebergMetadata.currentSnapshotId(), (Object)this.icebergLatestMetadataLocation);
                return;
            }
            this.checkAndFilterDataFiles(icebergDataFileMetas);
            LOG.info("Begin to create Migrate Task, the number of iceberg data files is {}", (Object)icebergDataFileMetas.size());
            ArrayList<MigrateTask> arrayList = new ArrayList<MigrateTask>();
            ConcurrentHashMap<Path, Path> concurrentHashMap = new ConcurrentHashMap<Path, Path>();
            if (paimonTable.partitionKeys().isEmpty()) {
                arrayList.add(this.importUnPartitionedTable(icebergDataFileMetas, paimonTable, concurrentHashMap));
            } else {
                arrayList.addAll(this.importPartitionedTable(icebergDataFileMetas, paimonTable, concurrentHashMap));
            }
            List<Future> futures = arrayList.stream().map(this.executor::submit).collect(Collectors.toList());
            ArrayList commitMessages = new ArrayList();
            try {
                for (Future future : futures) {
                    commitMessages.add(future.get());
                }
            }
            catch (Exception e) {
                futures.forEach(f -> f.cancel(true));
                for (Future future : futures) {
                    while (!future.isDone()) {
                        Thread.sleep(100L);
                    }
                }
                for (Map.Entry entry3 : concurrentHashMap.entrySet()) {
                    Path newPath = (Path)entry3.getKey();
                    Path origin = (Path)entry3.getValue();
                    if (!fileIO.exists(newPath)) continue;
                    fileIO.rename(newPath, origin);
                }
                throw new RuntimeException("Migrating failed because exception happens", e);
            }
            try (BatchTableCommit commit = paimonTable.newBatchWriteBuilder().newCommit();){
                commit.commit(new ArrayList<CommitMessage>(commitMessages));
                LOG.info("paimon commit success! Iceberg data files have been migrated to paimon.");
            }
        }
        catch (Exception e) {
            this.paimonCatalog.dropTable(paimonIdentifier, true);
            throw new RuntimeException("Migrating failed", e);
        }
        if (this.deleteOriginTable.booleanValue()) {
            this.icebergMigrateMetadata.deleteOriginTable();
        }
    }

    @Override
    public void deleteOriginTable(boolean delete) throws Exception {
        this.deleteOriginTable = delete;
    }

    @Override
    public void renameTable(boolean ignoreIfNotExists) throws Exception {
        Identifier targetTableId = Identifier.create(this.paimonDatabaseName, this.paimonTableName);
        Identifier sourceTableId = Identifier.create(this.icebergDatabaseName, this.icebergTableName);
        LOG.info("Last step: rename {} to {}.", (Object)targetTableId, (Object)sourceTableId);
        this.paimonCatalog.renameTable(targetTableId, sourceTableId, ignoreIfNotExists);
    }

    private List<TableSchema> icebergSchemasToPaimonSchemas(IcebergMetadata icebergMetadata) {
        return icebergMetadata.schemas().stream().map(icebergSchema -> {
            LOG.info("Convert iceberg schema to paimon schema, iceberg schema id: {}", (Object)icebergSchema.schemaId());
            return TableSchema.create(icebergSchema.schemaId(), this.icebergSchemaToPaimonSchema((IcebergSchema)icebergSchema));
        }).collect(Collectors.toList());
    }

    private Schema icebergSchemaToPaimonSchema(IcebergSchema icebergSchema) {
        int currentPartitionSpecId = this.icebergMetadata.defaultSpecId();
        IcebergPartitionSpec currentIcebergPartitionSpec = this.icebergMetadata.partitionSpecs().get(currentPartitionSpecId);
        List<DataField> dataFields = icebergSchema.fields().stream().map(IcebergDataField::toDatafield).collect(Collectors.toList());
        List<String> partitionKeys = currentIcebergPartitionSpec.fields().stream().map(IcebergPartitionField::name).collect(Collectors.toList());
        return new Schema(dataFields, partitionKeys, Collections.emptyList(), Collections.emptyMap(), null);
    }

    private void checkAndFilterManifestFiles(List<IcebergManifestFileMeta> icebergManifestFileMetas) {
        for (IcebergManifestFileMeta meta : icebergManifestFileMetas) {
            Preconditions.checkArgument(meta.content() != IcebergManifestFileMeta.Content.DELETES, "IcebergMigrator don't support analyzing manifest file with 'DELETE' content.");
        }
    }

    private void checkAndFilterDataFiles(List<IcebergDataFileMeta> icebergDataFileMetas) {
        for (IcebergDataFileMeta meta : icebergDataFileMetas) {
            Preconditions.checkArgument(meta.content() == IcebergDataFileMeta.Content.DATA, "IcebergMigrator don't support analyzing iceberg delete file.");
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private long getSchemaIdFromIcebergManifestFile(Path manifestPath, FileIO fileIO) {
        try (DataFileStream dataFileStream = new DataFileStream(fileIO.newInputStream(manifestPath), new GenericDatumReader());){
            String schema = dataFileStream.getMetaString("schema");
            long l = JsonSerdeUtil.fromJson(schema, IcebergSchema.class).schemaId();
            return l;
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private static List<DataFileMeta> construct(List<IcebergDataFileMeta> icebergDataFileMetas, FileIO fileIO, Table paimonTable, Path newDir, Map<Path, Path> rollback) {
        return icebergDataFileMetas.stream().map(icebergDataFileMeta -> IcebergMigrator.constructFileMeta(icebergDataFileMeta, fileIO, paimonTable, newDir, rollback)).collect(Collectors.toList());
    }

    private static DataFileMeta constructFileMeta(IcebergDataFileMeta icebergDataFileMeta, FileIO fileIO, Table table, Path dir, Map<Path, Path> rollback) {
        FileStatus status;
        try {
            status = fileIO.getFileStatus(new Path(icebergDataFileMeta.filePath()));
        }
        catch (IOException e) {
            throw new RuntimeException("error when get file status. file path is " + icebergDataFileMeta.filePath(), e);
        }
        String format = icebergDataFileMeta.fileFormat();
        long schemaId = icebergDataFileMeta.schemaId();
        return FileMetaUtils.constructFileMeta(format, status, fileIO, table, dir, rollback, schemaId);
    }

    private MigrateTask importUnPartitionedTable(List<IcebergDataFileMeta> icebergDataFileMetas, FileStoreTable paimonTable, Map<Path, Path> rollback) {
        BinaryRow partitionRow = BinaryRow.EMPTY_ROW;
        Path newDir = paimonTable.store().pathFactory().bucketPath(partitionRow, 0);
        return new MigrateTask(icebergDataFileMetas, paimonTable, partitionRow, newDir, rollback);
    }

    private List<MigrateTask> importPartitionedTable(List<IcebergDataFileMeta> icebergDataFileMetas, FileStoreTable paimonTable, Map<Path, Path> rollback) {
        Map<BinaryRow, List<IcebergDataFileMeta>> dataInPartition = icebergDataFileMetas.stream().collect(Collectors.groupingBy(IcebergDataFileMeta::partition));
        ArrayList<MigrateTask> migrateTasks = new ArrayList<MigrateTask>();
        for (Map.Entry<BinaryRow, List<IcebergDataFileMeta>> entry : dataInPartition.entrySet()) {
            BinaryRow partitionRow = entry.getKey();
            Path newDir = paimonTable.store().pathFactory().bucketPath(partitionRow, 0);
            migrateTasks.add(new MigrateTask(entry.getValue(), paimonTable, partitionRow, newDir, rollback));
        }
        return migrateTasks;
    }

    public static class MigrateTask
    implements Callable<CommitMessage> {
        private final List<IcebergDataFileMeta> icebergDataFileMetas;
        private final FileStoreTable paimonTable;
        private final BinaryRow partitionRow;
        private final Path newDir;
        private final Map<Path, Path> rollback;

        public MigrateTask(List<IcebergDataFileMeta> icebergDataFileMetas, FileStoreTable paimonTable, BinaryRow partitionRow, Path newDir, Map<Path, Path> rollback) {
            this.icebergDataFileMetas = icebergDataFileMetas;
            this.paimonTable = paimonTable;
            this.partitionRow = partitionRow;
            this.newDir = newDir;
            this.rollback = rollback;
        }

        @Override
        public CommitMessage call() throws Exception {
            FileIO fileIO = this.paimonTable.fileIO();
            if (!fileIO.exists(this.newDir)) {
                fileIO.mkdirs(this.newDir);
            }
            List fileMetas = IcebergMigrator.construct(this.icebergDataFileMetas, fileIO, this.paimonTable, this.newDir, this.rollback);
            return FileMetaUtils.createCommitMessage(this.partitionRow, this.paimonTable.coreOptions().bucket(), fileMetas);
        }
    }
}

