/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.sink.dynamic;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory;
import org.apache.iceberg.flink.sink.dynamic.DynamicRecordInternal;
import org.apache.iceberg.flink.sink.dynamic.DynamicWriteResult;
import org.apache.iceberg.flink.sink.dynamic.DynamicWriterMetrics;
import org.apache.iceberg.flink.sink.dynamic.LRUCache;
import org.apache.iceberg.flink.sink.dynamic.WriteTarget;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.SerializableSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class DynamicWriter
implements CommittingSinkWriter<DynamicRecordInternal, DynamicWriteResult> {
    private static final Logger LOG = LoggerFactory.getLogger(DynamicWriter.class);
    private final Map<WriteTarget, RowDataTaskWriterFactory> taskWriterFactories;
    private final Map<WriteTarget, TaskWriter<RowData>> writers;
    private final DynamicWriterMetrics metrics;
    private final int subTaskId;
    private final int attemptId;
    private final Catalog catalog;
    private final FileFormat dataFileFormat;
    private final long targetDataFileSize;
    private final Map<String, String> commonWriteProperties;

    DynamicWriter(Catalog catalog, FileFormat dataFileFormat, long targetDataFileSize, Map<String, String> commonWriteProperties, int cacheMaximumSize, DynamicWriterMetrics metrics, int subTaskId, int attemptId) {
        this.catalog = catalog;
        this.dataFileFormat = dataFileFormat;
        this.targetDataFileSize = targetDataFileSize;
        this.commonWriteProperties = commonWriteProperties;
        this.metrics = metrics;
        this.subTaskId = subTaskId;
        this.attemptId = attemptId;
        this.taskWriterFactories = new LRUCache<WriteTarget, RowDataTaskWriterFactory>(cacheMaximumSize);
        this.writers = Maps.newHashMap();
        LOG.debug("DynamicIcebergSinkWriter created for subtask {} attemptId {}", (Object)subTaskId, (Object)attemptId);
    }

    public void write(DynamicRecordInternal element, SinkWriter.Context context) throws IOException, InterruptedException {
        this.writers.computeIfAbsent(new WriteTarget(element.tableName(), element.branch(), element.schema().schemaId(), element.spec().specId(), element.upsertMode(), element.equalityFields()), writerKey -> {
            RowDataTaskWriterFactory taskWriterFactory = this.taskWriterFactories.computeIfAbsent((WriteTarget)writerKey, factoryKey -> {
                Table table = this.catalog.loadTable(TableIdentifier.parse((String)factoryKey.tableName()));
                HashMap tableWriteProperties = Maps.newHashMap((Map)table.properties());
                tableWriteProperties.putAll(this.commonWriteProperties);
                Set<Integer> equalityFieldIds = DynamicWriter.getEqualityFields(table, element.equalityFields());
                if (element.upsertMode()) {
                    Preconditions.checkState((!equalityFieldIds.isEmpty() ? 1 : 0) != 0, (Object)"Equality field columns shouldn't be empty when configuring to use UPSERT data.");
                    if (!table.spec().isUnpartitioned()) {
                        for (PartitionField partitionField : table.spec().fields()) {
                            Preconditions.checkState((boolean)equalityFieldIds.contains(partitionField.sourceId()), (String)"In UPSERT mode, partition field '%s' should be included in equality fields: '%s'", (Object)partitionField, equalityFieldIds);
                        }
                    }
                }
                LOG.debug("Creating new writer factory for table '{}'", (Object)table.name());
                return new RowDataTaskWriterFactory((SerializableSupplier<Table>)(SerializableSupplier & Serializable)() -> table, FlinkSchemaUtil.convert(element.schema()), this.targetDataFileSize, this.dataFileFormat, tableWriteProperties, Lists.newArrayList(equalityFieldIds), element.upsertMode(), element.schema(), element.spec());
            });
            taskWriterFactory.initialize(this.subTaskId, this.attemptId);
            return taskWriterFactory.create();
        }).write((Object)element.rowData());
    }

    public void flush(boolean endOfInput) {
    }

    public void close() throws Exception {
        for (TaskWriter<RowData> writer : this.writers.values()) {
            writer.close();
        }
    }

    public String toString() {
        return MoreObjects.toStringHelper((Object)this).add("subtaskId", this.subTaskId).add("attemptId", this.attemptId).add("dataFileFormat", (Object)this.dataFileFormat).add("targetDataFileSize", this.targetDataFileSize).add("writeProperties", this.commonWriteProperties).toString();
    }

    public Collection<DynamicWriteResult> prepareCommit() throws IOException {
        ArrayList result = Lists.newArrayList();
        for (Map.Entry<WriteTarget, TaskWriter<RowData>> entry : this.writers.entrySet()) {
            long startNano = System.nanoTime();
            WriteResult writeResult = entry.getValue().complete();
            WriteTarget writeTarget = entry.getKey();
            this.metrics.updateFlushResult(writeTarget.tableName(), writeResult);
            this.metrics.flushDuration(writeTarget.tableName(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNano));
            LOG.debug("Iceberg writer for table {} subtask {} attempt {} flushed {} data files and {} delete files", new Object[]{writeTarget.tableName(), this.subTaskId, this.attemptId, writeResult.dataFiles().length, writeResult.deleteFiles().length});
            result.add(new DynamicWriteResult(writeTarget, writeResult));
        }
        this.writers.clear();
        return result;
    }

    private static Set<Integer> getEqualityFields(Table table, Set<Integer> equalityFieldIds) {
        if (equalityFieldIds != null && !equalityFieldIds.isEmpty()) {
            return equalityFieldIds;
        }
        Set identifierFieldIds = table.schema().identifierFieldIds();
        if (identifierFieldIds != null && !identifierFieldIds.isEmpty()) {
            return identifierFieldIds;
        }
        return Collections.emptySet();
    }

    @VisibleForTesting
    DynamicWriterMetrics getMetrics() {
        return this.metrics;
    }
}

