/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.iceberg.sink;

import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.iceberg.config.SinkConfig;
import org.apache.seatunnel.connectors.seatunnel.iceberg.exception.IcebergConnectorException;
import org.apache.seatunnel.connectors.seatunnel.iceberg.sink.IcebergSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.iceberg.sink.commit.IcebergAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.iceberg.sink.commit.IcebergAggregatedCommitter;
import org.apache.seatunnel.connectors.seatunnel.iceberg.sink.commit.IcebergCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.iceberg.sink.state.IcebergSinkState;

public class IcebergSink
implements SeaTunnelSink<SeaTunnelRow, IcebergSinkState, IcebergCommitInfo, IcebergAggregatedCommitInfo>,
SupportSaveMode,
SupportMultiTableSink {
    private static String PLUGIN_NAME = "Iceberg";
    private final SinkConfig config;
    private final ReadonlyConfig readonlyConfig;
    private final CatalogTable catalogTable;

    public IcebergSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable) {
        this.readonlyConfig = pluginConfig;
        this.config = new SinkConfig(pluginConfig);
        this.catalogTable = catalogTable;
        if (this.config.getPrimaryKeys().isEmpty() && Objects.nonNull(this.catalogTable.getTableSchema().getPrimaryKey())) {
            this.config.setPrimaryKeys(this.catalogTable.getTableSchema().getPrimaryKey().getColumnNames());
        }
        if (this.config.getPartitionKeys().isEmpty() && Objects.nonNull(this.catalogTable.getPartitionKeys())) {
            this.config.setPartitionKeys(this.catalogTable.getPartitionKeys());
        }
    }

    public String getPluginName() {
        return PLUGIN_NAME;
    }

    public IcebergSinkWriter createWriter(SinkWriter.Context context) throws IOException {
        return IcebergSinkWriter.of(this.config, this.catalogTable);
    }

    public SinkWriter<SeaTunnelRow, IcebergCommitInfo, IcebergSinkState> restoreWriter(SinkWriter.Context context, List<IcebergSinkState> states) throws IOException {
        return IcebergSinkWriter.of(this.config, this.catalogTable, states);
    }

    public Optional<SinkAggregatedCommitter<IcebergCommitInfo, IcebergAggregatedCommitInfo>> createAggregatedCommitter() throws IOException {
        return Optional.of(new IcebergAggregatedCommitter(this.config, this.catalogTable));
    }

    public Optional<Serializer<IcebergAggregatedCommitInfo>> getAggregatedCommitInfoSerializer() {
        return Optional.of(new DefaultSerializer());
    }

    public Optional<Serializer<IcebergCommitInfo>> getCommitInfoSerializer() {
        return Optional.of(new DefaultSerializer());
    }

    public Optional<SaveModeHandler> getSaveModeHandler() {
        CatalogFactory catalogFactory = (CatalogFactory)FactoryUtil.discoverFactory((ClassLoader)Thread.currentThread().getContextClassLoader(), CatalogFactory.class, (String)"Iceberg");
        if (catalogFactory == null) {
            throw new IcebergConnectorException((SeaTunnelErrorCode)SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, String.format("PluginName: %s, PluginType: %s, Message: %s", this.getPluginName(), PluginType.SINK, "Cannot find Doris catalog factory"));
        }
        Catalog catalog = catalogFactory.createCatalog(catalogFactory.factoryIdentifier(), this.readonlyConfig);
        return Optional.of(new DefaultSaveModeHandler(this.config.getSchemaSaveMode(), this.config.getDataSaveMode(), catalog, this.catalogTable, null));
    }
}

