/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.iceberg.sink;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSinkWriter;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.schema.handler.DataTypeChangeEventDispatcher;
import org.apache.seatunnel.api.table.schema.handler.DataTypeChangeEventHandler;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.iceberg.IcebergTableLoader;
import org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.iceberg.sink.commit.IcebergCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.iceberg.sink.commit.IcebergFilesCommitter;
import org.apache.seatunnel.connectors.seatunnel.iceberg.sink.state.IcebergSinkState;
import org.apache.seatunnel.connectors.seatunnel.iceberg.sink.writer.IcebergWriterFactory;
import org.apache.seatunnel.connectors.seatunnel.iceberg.sink.writer.RecordWriter;
import org.apache.seatunnel.connectors.seatunnel.iceberg.sink.writer.WriteResult;
import org.apache.seatunnel.shade.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IcebergSinkWriter
implements SinkWriter<SeaTunnelRow, IcebergCommitInfo, IcebergSinkState>,
SupportMultiTableSinkWriter<Void>,
SupportSchemaEvolutionSinkWriter {
    private static final Logger log = LoggerFactory.getLogger(IcebergSinkWriter.class);
    private TableSchema tableSchema;
    private SeaTunnelRowType rowType;
    private final IcebergSinkConfig config;
    private final IcebergTableLoader icebergTableLoader;
    private volatile RecordWriter writer;
    private final IcebergFilesCommitter filesCommitter;
    private final List<WriteResult> results = Lists.newArrayList();
    private String commitUser = UUID.randomUUID().toString();
    private final DataTypeChangeEventHandler dataTypeChangeEventHandler;

    public IcebergSinkWriter(IcebergTableLoader icebergTableLoader, IcebergSinkConfig config, TableSchema tableSchema, List<IcebergSinkState> states) {
        this.config = config;
        this.icebergTableLoader = icebergTableLoader;
        this.tableSchema = tableSchema;
        this.rowType = tableSchema.toPhysicalRowDataType();
        this.filesCommitter = IcebergFilesCommitter.of(config, icebergTableLoader);
        this.dataTypeChangeEventHandler = new DataTypeChangeEventDispatcher();
        if (Objects.nonNull(states) && !states.isEmpty()) {
            this.commitUser = states.get(0).getCommitUser();
            this.preCommit(states);
        }
    }

    private void preCommit(List<IcebergSinkState> states) {
        states.forEach(icebergSinkState -> this.filesCommitter.doCommit(icebergSinkState.getWriteResults()));
    }

    private void tryCreateRecordWriter() {
        if (this.writer == null) {
            IcebergWriterFactory icebergWriterFactory = new IcebergWriterFactory(this.icebergTableLoader, this.config);
            this.writer = icebergWriterFactory.createWriter(this.tableSchema);
        }
    }

    public static IcebergSinkWriter of(IcebergSinkConfig config, CatalogTable catalogTable) {
        return IcebergSinkWriter.of(config, catalogTable, null);
    }

    public static IcebergSinkWriter of(IcebergSinkConfig config, CatalogTable catalogTable, List<IcebergSinkState> states) {
        IcebergTableLoader icebergTableLoader = IcebergTableLoader.create(config, catalogTable);
        return new IcebergSinkWriter(icebergTableLoader, config, catalogTable.getTableSchema(), states);
    }

    public void write(SeaTunnelRow element) throws IOException {
        this.tryCreateRecordWriter();
        this.writer.write(element, this.rowType);
    }

    public Optional<IcebergCommitInfo> prepareCommit() throws IOException {
        List<WriteResult> writeResults = this.writer != null ? this.writer.complete() : Collections.emptyList();
        IcebergCommitInfo icebergCommitInfo = new IcebergCommitInfo(writeResults);
        this.results.addAll(writeResults);
        return Optional.of(icebergCommitInfo);
    }

    public void applySchemaChange(SchemaChangeEvent event) throws IOException {
        if (this.config.isTableSchemaEvolutionEnabled()) {
            log.info("changed rowType before: {}", (Object)this.fieldsInfo(this.rowType));
            this.rowType = this.dataTypeChangeEventHandler.reset(this.rowType).apply(event);
            log.info("changed rowType after: {}", (Object)this.fieldsInfo(this.rowType));
            this.tryCreateRecordWriter();
            this.writer.applySchemaChange(this.rowType, event);
        }
    }

    public List<IcebergSinkState> snapshotState(long checkpointId) throws IOException {
        IcebergSinkState icebergSinkState = new IcebergSinkState(this.results, this.commitUser, checkpointId);
        this.results.clear();
        return Collections.singletonList(icebergSinkState);
    }

    public void abortPrepare() {
    }

    public void close() throws IOException {
        try {
            if (this.writer != null) {
                this.writer.close();
            }
            this.icebergTableLoader.close();
        }
        finally {
            this.results.clear();
        }
    }

    private String fieldsInfo(SeaTunnelRowType seaTunnelRowType) {
        Object[] fieldsInfo = new String[seaTunnelRowType.getTotalFields()];
        for (int i = 0; i < seaTunnelRowType.getTotalFields(); ++i) {
            fieldsInfo[i] = String.format("%s<%s>", seaTunnelRowType.getFieldName(i), seaTunnelRowType.getFieldType(i));
        }
        return StringUtils.join(fieldsInfo, ", ");
    }
}

