/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.kafka.sink;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.Config;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSemantics;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
import org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
import org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer;
import org.apache.seatunnel.connectors.seatunnel.kafka.serialize.SeaTunnelRowSerializer;
import org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaNoTransactionSender;
import org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaProduceSender;
import org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaTransactionSender;
import org.apache.seatunnel.connectors.seatunnel.kafka.sink.MessageContentPartitioner;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSinkState;

public class KafkaSinkWriter
implements SinkWriter<SeaTunnelRow, KafkaCommitInfo, KafkaSinkState> {
    private final SinkWriter.Context context;
    private String transactionPrefix;
    private long lastCheckpointId = 0L;
    private SeaTunnelRowType seaTunnelRowType;
    private final KafkaProduceSender<byte[], byte[]> kafkaProducerSender;
    private final SeaTunnelRowSerializer<byte[], byte[]> seaTunnelRowSerializer;
    private static final int PREFIX_RANGE = 10000;

    public KafkaSinkWriter(SinkWriter.Context context, SeaTunnelRowType seaTunnelRowType, ReadonlyConfig pluginConfig, List<KafkaSinkState> kafkaStates) {
        this.context = context;
        this.seaTunnelRowType = seaTunnelRowType;
        if (pluginConfig.get(Config.ASSIGN_PARTITIONS) != null && !CollectionUtils.isEmpty((Collection)((Collection)pluginConfig.get(Config.ASSIGN_PARTITIONS)))) {
            MessageContentPartitioner.setAssignPartitions((List)pluginConfig.get(Config.ASSIGN_PARTITIONS));
        }
        if (pluginConfig.get(Config.TRANSACTION_PREFIX) != null) {
            this.transactionPrefix = (String)pluginConfig.get(Config.TRANSACTION_PREFIX);
        } else {
            Random random = new Random();
            this.transactionPrefix = String.format("SeaTunnel%04d", random.nextInt(10000));
        }
        this.restoreState(kafkaStates);
        this.seaTunnelRowSerializer = this.getSerializer(pluginConfig, seaTunnelRowType);
        if (KafkaSemantics.EXACTLY_ONCE.equals((Object)this.getKafkaSemantics(pluginConfig))) {
            this.kafkaProducerSender = new KafkaTransactionSender<byte[], byte[]>(this.transactionPrefix, this.getKafkaProperties(pluginConfig));
            if (!kafkaStates.isEmpty()) {
                this.kafkaProducerSender.abortTransaction(kafkaStates.get(0).getCheckpointId() + 1L);
            }
            this.kafkaProducerSender.beginTransaction(KafkaSinkWriter.generateTransactionId(this.transactionPrefix, this.lastCheckpointId + 1L));
        } else {
            this.kafkaProducerSender = new KafkaNoTransactionSender<byte[], byte[]>(this.getKafkaProperties(pluginConfig));
        }
    }

    public void write(SeaTunnelRow element) {
        ProducerRecord<byte[], byte[]> producerRecord = this.seaTunnelRowSerializer.serializeRow(element);
        this.kafkaProducerSender.send(producerRecord);
    }

    public List<KafkaSinkState> snapshotState(long checkpointId) {
        List<KafkaSinkState> states = this.kafkaProducerSender.snapshotState(checkpointId);
        this.lastCheckpointId = checkpointId;
        this.kafkaProducerSender.beginTransaction(KafkaSinkWriter.generateTransactionId(this.transactionPrefix, this.lastCheckpointId + 1L));
        return states;
    }

    public Optional<KafkaCommitInfo> prepareCommit() {
        return this.kafkaProducerSender.prepareCommit();
    }

    public void abortPrepare() {
        this.kafkaProducerSender.abortTransaction();
    }

    public void close() {
        try {
            this.kafkaProducerSender.close();
        }
        catch (Exception e) {
            throw new KafkaConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.WRITER_OPERATION_FAILED, "Close kafka sink writer error", e);
        }
    }

    private Properties getKafkaProperties(ReadonlyConfig pluginConfig) {
        Properties kafkaProperties = new Properties();
        if (pluginConfig.get(Config.KAFKA_CONFIG) != null) {
            ((Map)pluginConfig.get(Config.KAFKA_CONFIG)).forEach((key, value) -> kafkaProperties.put(key, value));
        }
        if (pluginConfig.get(Config.ASSIGN_PARTITIONS) != null) {
            kafkaProperties.put("partitioner.class", "org.apache.seatunnel.connectors.seatunnel.kafka.sink.MessageContentPartitioner");
        }
        kafkaProperties.put("bootstrap.servers", pluginConfig.get(Config.BOOTSTRAP_SERVERS));
        kafkaProperties.put("key.serializer", ByteArraySerializer.class.getName());
        kafkaProperties.put("value.serializer", ByteArraySerializer.class.getName());
        return kafkaProperties;
    }

    private SeaTunnelRowSerializer<byte[], byte[]> getSerializer(ReadonlyConfig pluginConfig, SeaTunnelRowType seaTunnelRowType) {
        MessageFormat messageFormat = (MessageFormat)((Object)pluginConfig.get(Config.FORMAT));
        String delimiter = ",";
        if (pluginConfig.get(Config.FIELD_DELIMITER) != null) {
            delimiter = (String)pluginConfig.get(Config.FIELD_DELIMITER);
        }
        String topic = (String)pluginConfig.get(Config.TOPIC);
        if (pluginConfig.get(Config.PARTITION_KEY_FIELDS) != null && pluginConfig.get(Config.PARTITION) != null) {
            throw new KafkaConnectorException((SeaTunnelErrorCode)KafkaConnectorErrorCode.GET_TRANSACTIONMANAGER_FAILED, "Cannot select both `partiton` and `partition_key_fields`. You can configure only one of them");
        }
        if (pluginConfig.get(Config.PARTITION_KEY_FIELDS) != null) {
            return DefaultSeaTunnelRowSerializer.create(topic, this.getPartitionKeyFields(pluginConfig, seaTunnelRowType), seaTunnelRowType, messageFormat, delimiter);
        }
        if (pluginConfig.get(Config.PARTITION) != null) {
            return DefaultSeaTunnelRowSerializer.create(topic, (Integer)pluginConfig.get(Config.PARTITION), seaTunnelRowType, messageFormat, delimiter);
        }
        return DefaultSeaTunnelRowSerializer.create(topic, Arrays.asList(new String[0]), seaTunnelRowType, messageFormat, delimiter);
    }

    private KafkaSemantics getKafkaSemantics(ReadonlyConfig pluginConfig) {
        if (pluginConfig.get(Config.SEMANTICS) != null) {
            return (KafkaSemantics)((Object)pluginConfig.get(Config.SEMANTICS));
        }
        return KafkaSemantics.NON;
    }

    protected static String generateTransactionId(String transactionPrefix, long checkpointId) {
        return transactionPrefix + "-" + checkpointId;
    }

    private void restoreState(List<KafkaSinkState> states) {
        if (!states.isEmpty()) {
            this.transactionPrefix = states.get(0).getTransactionIdPrefix();
            this.lastCheckpointId = states.get(0).getCheckpointId();
        }
    }

    private List<String> getPartitionKeyFields(ReadonlyConfig pluginConfig, SeaTunnelRowType seaTunnelRowType) {
        if (pluginConfig.get(Config.PARTITION_KEY_FIELDS) != null) {
            List partitionKeyFields = (List)pluginConfig.get(Config.PARTITION_KEY_FIELDS);
            List<String> rowTypeFieldNames = Arrays.asList(seaTunnelRowType.getFieldNames());
            for (String partitionKeyField : partitionKeyFields) {
                if (rowTypeFieldNames.contains(partitionKeyField)) continue;
                throw new KafkaConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, String.format("Partition key field not found: %s, rowType: %s", partitionKeyField, rowTypeFieldNames));
            }
            return partitionKeyFields;
        }
        return Collections.emptyList();
    }
}

