/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.pulsar.sink;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.SerializationSchema;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarSemantics;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties;
import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
import org.apache.seatunnel.connectors.seatunnel.pulsar.state.PulsarCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.pulsar.state.PulsarSinkState;
import org.apache.seatunnel.format.json.JsonSerializationSchema;
import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
import org.apache.seatunnel.format.text.TextSerializationSchema;
import org.apache.seatunnel.shade.com.google.common.collect.Lists;

public class PulsarSinkWriter
implements SinkWriter<SeaTunnelRow, PulsarCommitInfo, PulsarSinkState> {
    private SinkWriter.Context context;
    private Producer<byte[]> producer;
    private PulsarClient pulsarClient;
    private SerializationSchema serializationSchema;
    private SerializationSchema keySerializationSchema;
    private TransactionImpl transaction;
    private int transactionTimeout = (Integer)SinkProperties.TRANSACTION_TIMEOUT.defaultValue();
    private PulsarSemantics pulsarSemantics = (PulsarSemantics)((Object)SinkProperties.SEMANTICS.defaultValue());
    private final AtomicLong pendingMessages;

    public PulsarSinkWriter(SinkWriter.Context context, PulsarClientConfig clientConfig, SeaTunnelRowType seaTunnelRowType, ReadonlyConfig pluginConfig, List<PulsarSinkState> pulsarStates) {
        this.context = context;
        String topic = (String)pluginConfig.get(SinkProperties.TOPIC);
        String format = (String)pluginConfig.get(SinkProperties.FORMAT);
        String delimiter = (String)pluginConfig.get(SinkProperties.FIELD_DELIMITER);
        Integer transactionTimeout = (Integer)pluginConfig.get(SinkProperties.TRANSACTION_TIMEOUT);
        PulsarSemantics pulsarSemantics = (PulsarSemantics)((Object)pluginConfig.get(SinkProperties.SEMANTICS));
        MessageRoutingMode messageRoutingMode = (MessageRoutingMode)((Object)pluginConfig.get(SinkProperties.MESSAGE_ROUTING_MODE));
        this.serializationSchema = this.createSerializationSchema(seaTunnelRowType, format, delimiter);
        List<String> partitionKeyList = this.getPartitionKeyFields(pluginConfig, seaTunnelRowType);
        this.keySerializationSchema = PulsarSinkWriter.createKeySerializationSchema(partitionKeyList, seaTunnelRowType);
        this.pulsarClient = PulsarConfigUtil.createClient(clientConfig, pulsarSemantics);
        if (PulsarSemantics.EXACTLY_ONCE == pulsarSemantics) {
            try {
                this.transaction = (TransactionImpl)PulsarConfigUtil.getTransaction(this.pulsarClient, transactionTimeout);
            }
            catch (Exception e) {
                throw new PulsarConnectorException((SeaTunnelErrorCode)PulsarConnectorErrorCode.CREATE_TRANSACTION_FAILED, "Pulsar transaction create fail.");
            }
        }
        try {
            this.producer = PulsarConfigUtil.createProducer(this.pulsarClient, topic, pulsarSemantics, pluginConfig, messageRoutingMode);
        }
        catch (PulsarClientException e) {
            throw new PulsarConnectorException((SeaTunnelErrorCode)PulsarConnectorErrorCode.CREATE_PRODUCER_FAILED, "Pulsar Producer create fail.");
        }
        this.pendingMessages = new AtomicLong(0L);
    }

    public void write(SeaTunnelRow element) throws IOException {
        byte[] message = this.serializationSchema.serialize(element);
        byte[] key = null;
        if (this.keySerializationSchema != null) {
            key = this.keySerializationSchema.serialize(element);
        }
        TypedMessageBuilder<byte[]> typedMessageBuilder = PulsarConfigUtil.createTypedMessageBuilder(this.producer, this.transaction);
        if (key != null) {
            typedMessageBuilder.keyBytes(key);
        }
        typedMessageBuilder.value(message);
        if (PulsarSemantics.NON == this.pulsarSemantics) {
            typedMessageBuilder.sendAsync();
        } else {
            this.pendingMessages.incrementAndGet();
            CompletableFuture<MessageId> future = typedMessageBuilder.sendAsync();
            future.whenComplete((id, ex) -> {
                this.pendingMessages.decrementAndGet();
                if (ex != null) {
                    throw new PulsarConnectorException((SeaTunnelErrorCode)PulsarConnectorErrorCode.SEND_MESSAGE_FAILED, "send message failed");
                }
            });
        }
    }

    public Optional<PulsarCommitInfo> prepareCommit() throws IOException {
        if (PulsarSemantics.EXACTLY_ONCE == this.pulsarSemantics) {
            PulsarCommitInfo pulsarCommitInfo = new PulsarCommitInfo(this.transaction.getTxnID());
            return Optional.of(pulsarCommitInfo);
        }
        return Optional.empty();
    }

    public List<PulsarSinkState> snapshotState(long checkpointId) throws IOException {
        if (PulsarSemantics.NON != this.pulsarSemantics) {
            this.producer.flush();
            while (this.pendingMessages.longValue() > 0L) {
                this.producer.flush();
            }
        }
        if (PulsarSemantics.EXACTLY_ONCE == this.pulsarSemantics) {
            ArrayList pulsarSinkStates = Lists.newArrayList((Object[])new PulsarSinkState[]{new PulsarSinkState(this.transaction.getTxnID())});
            try {
                this.transaction = (TransactionImpl)PulsarConfigUtil.getTransaction(this.pulsarClient, this.transactionTimeout);
            }
            catch (Exception e) {
                throw new PulsarConnectorException((SeaTunnelErrorCode)PulsarConnectorErrorCode.CREATE_TRANSACTION_FAILED, "Pulsar transaction create fail.");
            }
            return pulsarSinkStates;
        }
        return Collections.emptyList();
    }

    public void abortPrepare() {
        if (PulsarSemantics.EXACTLY_ONCE == this.pulsarSemantics) {
            this.transaction.abort();
        }
    }

    public void close() throws IOException {
        this.producer.close();
        this.pulsarClient.close();
    }

    private SerializationSchema createSerializationSchema(SeaTunnelRowType rowType, String format, String delimiter) {
        if ("json".equals(format)) {
            return new JsonSerializationSchema(rowType);
        }
        if ("text".equals(format)) {
            return TextSerializationSchema.builder().seaTunnelRowType(rowType).delimiter(delimiter).build();
        }
        throw new SeaTunnelJsonFormatException((SeaTunnelErrorCode)CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Unsupported format: " + format);
    }

    public static SerializationSchema createKeySerializationSchema(List<String> keyFieldNames, SeaTunnelRowType seaTunnelRowType) {
        if (keyFieldNames == null || keyFieldNames.isEmpty()) {
            return null;
        }
        int[] keyFieldIndexArr = new int[keyFieldNames.size()];
        SeaTunnelDataType[] keyFieldDataTypeArr = new SeaTunnelDataType[keyFieldNames.size()];
        for (int i = 0; i < keyFieldNames.size(); ++i) {
            int rowFieldIndex;
            String keyFieldName = keyFieldNames.get(i);
            keyFieldIndexArr[i] = rowFieldIndex = seaTunnelRowType.indexOf(keyFieldName);
            keyFieldDataTypeArr[i] = seaTunnelRowType.getFieldType(rowFieldIndex);
        }
        SeaTunnelRowType keyType = new SeaTunnelRowType(keyFieldNames.toArray(new String[0]), keyFieldDataTypeArr);
        JsonSerializationSchema keySerializationSchema = new JsonSerializationSchema(keyType);
        Function<SeaTunnelRow, SeaTunnelRow> keyDataExtractor = row -> {
            Object[] keyFields = new Object[keyFieldIndexArr.length];
            for (int i = 0; i < keyFieldIndexArr.length; ++i) {
                keyFields[i] = row.getField(keyFieldIndexArr[i]);
            }
            return new SeaTunnelRow(keyFields);
        };
        return (SerializationSchema & Serializable)row -> keySerializationSchema.serialize((SeaTunnelRow)keyDataExtractor.apply(row));
    }

    private List<String> getPartitionKeyFields(ReadonlyConfig pluginConfig, SeaTunnelRowType seaTunnelRowType) {
        if (pluginConfig.get(SinkProperties.PARTITION_KEY_FIELDS) != null) {
            List partitionKeyFields = (List)pluginConfig.get(SinkProperties.PARTITION_KEY_FIELDS);
            List<String> rowTypeFieldNames = Arrays.asList(seaTunnelRowType.getFieldNames());
            for (String partitionKeyField : partitionKeyFields) {
                if (rowTypeFieldNames.contains(partitionKeyField)) continue;
                throw new PulsarConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, String.format("Partition key field not found: %s, rowType: %s", partitionKeyField, rowTypeFieldNames));
            }
            return partitionKeyFields;
        }
        return Collections.emptyList();
    }
}

