/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.sink.dynamic;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.UpdatePartitionSpec;
import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.sink.dynamic.CompareSchemasVisitor;
import org.apache.iceberg.flink.sink.dynamic.DataConverter;
import org.apache.iceberg.flink.sink.dynamic.EvolveSchemaVisitor;
import org.apache.iceberg.flink.sink.dynamic.PartitionSpecEvolution;
import org.apache.iceberg.flink.sink.dynamic.TableMetadataCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
class TableUpdater {
    private static final Logger LOG = LoggerFactory.getLogger(TableUpdater.class);
    private final TableMetadataCache cache;
    private final Catalog catalog;

    TableUpdater(TableMetadataCache cache, Catalog catalog) {
        this.cache = cache;
        this.catalog = catalog;
    }

    Tuple2<TableMetadataCache.ResolvedSchemaInfo, PartitionSpec> update(TableIdentifier tableIdentifier, String branch, Schema schema, PartitionSpec spec) {
        this.findOrCreateTable(tableIdentifier, schema, spec);
        this.findOrCreateBranch(tableIdentifier, branch);
        TableMetadataCache.ResolvedSchemaInfo newSchemaInfo = this.findOrCreateSchema(tableIdentifier, schema);
        PartitionSpec newSpec = this.findOrCreateSpec(tableIdentifier, spec);
        return Tuple2.of((Object)newSchemaInfo, (Object)newSpec);
    }

    private void findOrCreateTable(TableIdentifier identifier, Schema schema, PartitionSpec spec) {
        Tuple2<Boolean, Exception> exists = this.cache.exists(identifier);
        if (Boolean.FALSE.equals(exists.f0)) {
            if (exists.f1 instanceof NoSuchNamespaceException) {
                SupportsNamespaces catalogWithNameSpace = (SupportsNamespaces)this.catalog;
                LOG.info("Namespace {} not found during table search. Creating namespace", (Object)identifier);
                try {
                    catalogWithNameSpace.createNamespace(identifier.namespace());
                }
                catch (AlreadyExistsException e) {
                    LOG.debug("Namespace {} created concurrently", (Object)identifier.namespace(), (Object)e);
                }
            }
            LOG.info("Table {} not found during table search. Creating table.", (Object)identifier);
            try {
                Table table = this.catalog.createTable(identifier, schema, spec);
                this.cache.update(identifier, table);
            }
            catch (AlreadyExistsException e) {
                LOG.debug("Table {} created concurrently. Skipping creation.", (Object)identifier, (Object)e);
                this.cache.invalidate(identifier);
                this.findOrCreateTable(identifier, schema, spec);
            }
        }
    }

    private void findOrCreateBranch(TableIdentifier identifier, String branch) {
        String fromCache = this.cache.branch(identifier, branch);
        if (fromCache == null) {
            Table table = this.catalog.loadTable(identifier);
            try {
                table.manageSnapshots().createBranch(branch).commit();
                LOG.info("Branch {} for {} created", (Object)branch, (Object)identifier);
            }
            catch (CommitFailedException e) {
                table.refresh();
                if (table.refs().containsKey(branch)) {
                    LOG.debug("Branch {} concurrently created for {}.", (Object)branch, (Object)identifier);
                }
                LOG.error("Failed to create branch {} for {}.", new Object[]{branch, identifier, e});
                throw e;
            }
            this.cache.update(identifier, table);
        }
    }

    private TableMetadataCache.ResolvedSchemaInfo findOrCreateSchema(TableIdentifier identifier, Schema schema) {
        TableMetadataCache.ResolvedSchemaInfo fromCache = this.cache.schema(identifier, schema);
        if (fromCache.compareResult() != CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED) {
            return fromCache;
        }
        Table table = this.catalog.loadTable(identifier);
        Schema tableSchema = table.schema();
        CompareSchemasVisitor.Result result = CompareSchemasVisitor.visit(schema, tableSchema, true);
        switch (result) {
            case SAME: {
                this.cache.update(identifier, table);
                return new TableMetadataCache.ResolvedSchemaInfo(tableSchema, result, DataConverter.identity());
            }
            case DATA_CONVERSION_NEEDED: {
                this.cache.update(identifier, table);
                return new TableMetadataCache.ResolvedSchemaInfo(tableSchema, result, DataConverter.get((LogicalType)FlinkSchemaUtil.convert(schema), (LogicalType)FlinkSchemaUtil.convert(tableSchema)));
            }
            case SCHEMA_UPDATE_NEEDED: {
                LOG.info("Triggering schema update for table {} {} to {}", new Object[]{identifier, tableSchema, schema});
                UpdateSchema updateApi = table.updateSchema();
                EvolveSchemaVisitor.visit(updateApi, tableSchema, schema);
                try {
                    updateApi.commit();
                    this.cache.update(identifier, table);
                    TableMetadataCache.ResolvedSchemaInfo comparisonAfterMigration = this.cache.schema(identifier, schema);
                    Schema newSchema = comparisonAfterMigration.resolvedTableSchema();
                    LOG.info("Table {} schema updated from {} to {}", new Object[]{identifier, tableSchema, newSchema});
                    return comparisonAfterMigration;
                }
                catch (CommitFailedException e) {
                    this.cache.invalidate(identifier);
                    TableMetadataCache.ResolvedSchemaInfo newSchema = this.cache.schema(identifier, schema);
                    if (newSchema.compareResult() != CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED) {
                        LOG.debug("Table {} schema updated concurrently to {}", (Object)identifier, (Object)schema);
                        return newSchema;
                    }
                    LOG.error("Schema update failed for {} from {} to {}", new Object[]{identifier, tableSchema, schema, e});
                    throw e;
                }
            }
        }
        throw new IllegalArgumentException("Unknown comparison result");
    }

    private PartitionSpec findOrCreateSpec(TableIdentifier identifier, PartitionSpec targetSpec) {
        PartitionSpec currentSpec = this.cache.spec(identifier, targetSpec);
        if (currentSpec != null) {
            return currentSpec;
        }
        Table table = this.catalog.loadTable(identifier);
        currentSpec = table.spec();
        PartitionSpecEvolution.PartitionSpecChanges result = PartitionSpecEvolution.evolve(currentSpec, targetSpec);
        if (result.isEmpty()) {
            LOG.info("Returning equivalent existing spec {} for {}", (Object)currentSpec, (Object)targetSpec);
            return currentSpec;
        }
        LOG.info("Spec for table {} has been altered. Updating from {} to {}", new Object[]{identifier, currentSpec, targetSpec});
        UpdatePartitionSpec updater = table.updateSpec();
        result.termsToRemove().forEach(arg_0 -> ((UpdatePartitionSpec)updater).removeField(arg_0));
        result.termsToAdd().forEach(arg_0 -> ((UpdatePartitionSpec)updater).addField(arg_0));
        try {
            updater.commit();
            this.cache.update(identifier, table);
        }
        catch (CommitFailedException e) {
            this.cache.invalidate(identifier);
            PartitionSpec newSpec = this.cache.spec(identifier, targetSpec);
            result = PartitionSpecEvolution.evolve(targetSpec, newSpec);
            if (result.isEmpty()) {
                LOG.debug("Table {} partition spec updated concurrently to {}", (Object)identifier, (Object)newSpec);
                return newSpec;
            }
            LOG.error("Partition spec update failed for {} from {} to {}", new Object[]{identifier, currentSpec, targetSpec, e});
            throw e;
        }
        return this.cache.spec(identifier, targetSpec);
    }
}

