/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.connect.data;

import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.connect.IcebergSinkConfig;
import org.apache.iceberg.connect.data.IcebergWriterFactory;
import org.apache.iceberg.connect.data.IcebergWriterResult;
import org.apache.iceberg.connect.data.Offset;
import org.apache.iceberg.connect.data.RecordUtils;
import org.apache.iceberg.connect.data.RecordWriter;
import org.apache.iceberg.connect.data.SinkWriterResult;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkRecord;

public class SinkWriter {
    private final IcebergSinkConfig config;
    private final IcebergWriterFactory writerFactory;
    private final Map<String, RecordWriter> writers;
    private final Map<TopicPartition, Offset> sourceOffsets;

    public SinkWriter(Catalog catalog, IcebergSinkConfig config) {
        this.config = config;
        this.writerFactory = new IcebergWriterFactory(catalog, config);
        this.writers = Maps.newHashMap();
        this.sourceOffsets = Maps.newHashMap();
    }

    public void close() {
        this.writers.values().forEach(RecordWriter::close);
    }

    public SinkWriterResult completeWrite() {
        List<IcebergWriterResult> writerResults = this.writers.values().stream().flatMap(writer -> writer.complete().stream()).collect(Collectors.toList());
        HashMap offsets = Maps.newHashMap(this.sourceOffsets);
        this.writers.clear();
        this.sourceOffsets.clear();
        return new SinkWriterResult(writerResults, offsets);
    }

    public void save(Collection<SinkRecord> sinkRecords) {
        sinkRecords.forEach(this::save);
    }

    private void save(SinkRecord record) {
        OffsetDateTime timestamp = record.timestamp() == null ? null : OffsetDateTime.ofInstant(Instant.ofEpochMilli(record.timestamp()), ZoneOffset.UTC);
        this.sourceOffsets.put(new TopicPartition(record.topic(), record.kafkaPartition().intValue()), new Offset(record.kafkaOffset() + 1L, timestamp));
        if (this.config.dynamicTablesEnabled()) {
            this.routeRecordDynamically(record);
        } else {
            this.routeRecordStatically(record);
        }
    }

    private void routeRecordStatically(SinkRecord record) {
        String routeField = this.config.tablesRouteField();
        if (routeField == null) {
            this.config.tables().forEach(tableName -> this.writerForTable((String)tableName, record, false).write(record));
        } else {
            String routeValue = this.extractRouteValue(record.value(), routeField);
            if (routeValue != null) {
                this.config.tables().forEach(tableName -> {
                    Pattern regex = this.config.tableConfig((String)tableName).routeRegex();
                    if (regex != null && regex.matcher(routeValue).matches()) {
                        this.writerForTable((String)tableName, record, false).write(record);
                    }
                });
            }
        }
    }

    private void routeRecordDynamically(SinkRecord record) {
        String routeField = this.config.tablesRouteField();
        Preconditions.checkNotNull((Object)routeField, (Object)"Route field cannot be null with dynamic routing");
        String routeValue = this.extractRouteValue(record.value(), routeField);
        if (routeValue != null) {
            String tableName = routeValue.toLowerCase(Locale.ROOT);
            this.writerForTable(tableName, record, true).write(record);
        }
    }

    private String extractRouteValue(Object recordValue, String routeField) {
        if (recordValue == null) {
            return null;
        }
        Object routeValue = RecordUtils.extractFromRecordValue(recordValue, routeField);
        return routeValue == null ? null : routeValue.toString();
    }

    private RecordWriter writerForTable(String tableName, SinkRecord sample, boolean ignoreMissingTable) {
        return this.writers.computeIfAbsent(tableName, notUsed -> this.writerFactory.createWriter(tableName, sample, ignoreMissingTable));
    }
}

