/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.connect.data;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.connect.IcebergSinkConfig;
import org.apache.iceberg.connect.data.IcebergWriterResult;
import org.apache.iceberg.connect.data.RecordConverter;
import org.apache.iceberg.connect.data.RecordUtils;
import org.apache.iceberg.connect.data.RecordWriter;
import org.apache.iceberg.connect.data.SchemaUpdate;
import org.apache.iceberg.connect.data.SchemaUtils;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;

class IcebergWriter
implements RecordWriter {
    private final Table table;
    private final String tableName;
    private final IcebergSinkConfig config;
    private final List<IcebergWriterResult> writerResults;
    private RecordConverter recordConverter;
    private TaskWriter<Record> writer;

    IcebergWriter(Table table, String tableName, IcebergSinkConfig config) {
        this.table = table;
        this.tableName = tableName;
        this.config = config;
        this.writerResults = Lists.newArrayList();
        this.initNewWriter();
    }

    private void initNewWriter() {
        this.writer = RecordUtils.createTableWriter(this.table, this.tableName, this.config);
        this.recordConverter = new RecordConverter(this.table, this.config);
    }

    @Override
    public void write(SinkRecord record) {
        try {
            if (record.value() != null) {
                Record row = this.convertToRow(record);
                this.writer.write((Object)row);
            }
        }
        catch (Exception e) {
            throw new DataException(String.format(Locale.ROOT, "An error occurred converting record, topic: %s, partition, %d, offset: %d", record.topic(), record.kafkaPartition(), record.kafkaOffset()), (Throwable)e);
        }
    }

    private Record convertToRow(SinkRecord record) {
        if (!this.config.evolveSchemaEnabled()) {
            return this.recordConverter.convert(record.value());
        }
        SchemaUpdate.Consumer updates = new SchemaUpdate.Consumer();
        Record row = this.recordConverter.convert(record.value(), updates);
        if (!updates.empty()) {
            this.flush();
            SchemaUtils.applySchemaUpdates(this.table, updates);
            this.initNewWriter();
            row = this.recordConverter.convert(record.value(), null);
        }
        return row;
    }

    private void flush() {
        WriteResult writeResult;
        try {
            writeResult = this.writer.complete();
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
        this.writerResults.add(new IcebergWriterResult(TableIdentifier.parse((String)this.tableName), Arrays.asList(writeResult.dataFiles()), Arrays.asList(writeResult.deleteFiles()), this.table.spec().partitionType()));
    }

    @Override
    public List<IcebergWriterResult> complete() {
        this.flush();
        ArrayList result = Lists.newArrayList(this.writerResults);
        this.writerResults.clear();
        return result;
    }

    @Override
    public void close() {
        try {
            this.writer.close();
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }
}

