/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.iceberg.sink.writer;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.types.Types;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.event.AlterTableAddColumnEvent;
import org.apache.seatunnel.api.table.event.AlterTableChangeColumnEvent;
import org.apache.seatunnel.api.table.event.AlterTableDropColumnEvent;
import org.apache.seatunnel.api.table.event.AlterTableModifyColumnEvent;
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.iceberg.config.SinkConfig;
import org.apache.seatunnel.connectors.seatunnel.iceberg.data.RowConverter;
import org.apache.seatunnel.connectors.seatunnel.iceberg.sink.schema.SchemaChangeWrapper;
import org.apache.seatunnel.connectors.seatunnel.iceberg.sink.writer.IcebergRecord;
import org.apache.seatunnel.connectors.seatunnel.iceberg.sink.writer.IcebergWriterFactory;
import org.apache.seatunnel.connectors.seatunnel.iceberg.sink.writer.RecordWriter;
import org.apache.seatunnel.connectors.seatunnel.iceberg.sink.writer.WriteResult;
import org.apache.seatunnel.connectors.seatunnel.iceberg.utils.SchemaUtils;
import org.apache.seatunnel.shade.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IcebergRecordWriter
implements RecordWriter {
    private static final Logger log = LoggerFactory.getLogger(IcebergRecordWriter.class);
    private final Table table;
    private final SinkConfig config;
    private final List<WriteResult> writerResults;
    private TaskWriter<Record> writer;
    private RowConverter recordConverter;
    private IcebergWriterFactory writerFactory;

    public IcebergRecordWriter(Table table, IcebergWriterFactory writerFactory, SinkConfig config) {
        this.config = config;
        this.table = table;
        this.writerResults = Lists.newArrayList();
        this.recordConverter = new RowConverter(table, config);
        this.writerFactory = writerFactory;
        this.writer = this.createTaskWriter();
    }

    private TaskWriter<Record> createTaskWriter() {
        return this.writerFactory.createTaskWriter(this.table, this.config);
    }

    @Override
    public void write(SeaTunnelRow seaTunnelRow, SeaTunnelRowType rowType) {
        SchemaChangeWrapper updates = new SchemaChangeWrapper();
        Record record = this.recordConverter.convert(seaTunnelRow, (SeaTunnelDataType)rowType, updates);
        if (!updates.empty()) {
            this.applySchemaUpdate(updates);
            record = this.recordConverter.convert(seaTunnelRow, (SeaTunnelDataType)rowType);
        }
        IcebergRecord icebergRecord = new IcebergRecord(record, seaTunnelRow.getRowKind());
        try {
            this.writer.write(icebergRecord);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void applySchemaChange(SeaTunnelRowType afterRowType, SchemaChangeEvent event) {
        log.info("Apply schema change start.");
        SchemaChangeWrapper updates = new SchemaChangeWrapper();
        Schema schema = this.table.schema();
        if (event instanceof AlterTableDropColumnEvent) {
            AlterTableDropColumnEvent dropColumnEvent = (AlterTableDropColumnEvent)event;
            updates.deleteColumn(dropColumnEvent.getColumn());
        } else if (!(event instanceof AlterTableAddColumnEvent) && !(event instanceof AlterTableModifyColumnEvent) && event instanceof AlterTableChangeColumnEvent) {
            AlterTableChangeColumnEvent changeColumnEvent = (AlterTableChangeColumnEvent)event;
            this.changeColumn(schema, changeColumnEvent.getColumn(), changeColumnEvent.getOldColumn(), updates);
        }
        if (!updates.empty()) {
            this.applySchemaUpdate(updates);
        }
        log.info("Apply schema change end.");
    }

    private void changeColumn(Schema schema, Column column, String oldColumn, SchemaChangeWrapper updates) {
        Types.NestedField nestedField = schema.findField(oldColumn);
        if (nestedField != null) {
            updates.changeColumn(oldColumn, column.getName());
        }
    }

    private void applySchemaUpdate(SchemaChangeWrapper updates) {
        this.flush();
        SchemaUtils.applySchemaUpdates(this.table, updates);
        this.resetWriter();
    }

    @Override
    public List<WriteResult> complete() {
        this.flush();
        ArrayList result = Lists.newArrayList(this.writerResults);
        this.writerResults.clear();
        this.resetWriter();
        return result;
    }

    private void resetWriter() {
        this.writer = this.createTaskWriter();
        this.recordConverter = new RowConverter(this.table, this.config);
    }

    private void flush() {
        org.apache.iceberg.io.WriteResult writeResult;
        if (this.writer == null) {
            return;
        }
        try {
            writeResult = this.writer.complete();
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
        this.writerResults.add(new WriteResult(Arrays.asList(writeResult.dataFiles()), Arrays.asList(writeResult.deleteFiles()), this.table.spec().partitionType()));
        this.writer = null;
    }

    @Override
    public void close() {
    }
}

