/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.pulsar;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.context.propagation.TextMapSetter;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.OutgoingMessageMetadata;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
import io.smallrye.reactive.messaging.pulsar.ConfigResolver;
import io.smallrye.reactive.messaging.pulsar.OutgoingMessage;
import io.smallrye.reactive.messaging.pulsar.PulsarConnectorOutgoingConfiguration;
import io.smallrye.reactive.messaging.pulsar.PulsarOutgoingMessageMetadata;
import io.smallrye.reactive.messaging.pulsar.PulsarSenderProcessor;
import io.smallrye.reactive.messaging.pulsar.SchemaResolver;
import io.smallrye.reactive.messaging.pulsar.i18n.PulsarLogging;
import io.smallrye.reactive.messaging.pulsar.tracing.PulsarAttributesExtractor;
import io.smallrye.reactive.messaging.pulsar.tracing.PulsarTrace;
import io.smallrye.reactive.messaging.pulsar.tracing.PulsarTraceTextMapSetter;
import io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactionMetadata;
import io.smallrye.reactive.messaging.tracing.TracingUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Flow;
import java.util.stream.Collectors;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.eclipse.microprofile.reactive.messaging.Message;

public class PulsarOutgoingChannel<T> {
    private final Producer<T> producer;
    private final PulsarSenderProcessor processor;
    private final Flow.Subscriber<? extends Message<?>> subscriber;
    private final String channel;
    private final boolean healthEnabled;
    private final List<Throwable> failures = new ArrayList<Throwable>();
    private final boolean tracingEnabled;
    private final Instrumenter<PulsarTrace, Void> instrumenter;

    public PulsarOutgoingChannel(PulsarClient client, Schema<T> schema, PulsarConnectorOutgoingConfiguration oc, ConfigResolver configResolver) throws PulsarClientException {
        this.channel = oc.getChannel();
        this.healthEnabled = oc.getHealthEnabled();
        this.tracingEnabled = oc.getTracingEnabled();
        ProducerConfigurationData conf = configResolver.getProducerConf(oc);
        if (conf.getProducerName() == null) {
            conf.setProducerName(this.channel);
        }
        if (conf.getTopicName() == null) {
            conf.setTopicName(oc.getTopic().orElse(this.channel));
        }
        Map<String, Object> producerConf = configResolver.configToMap(conf);
        ProducerBuilder builder = client.newProducer(schema).loadConf(producerConf);
        if (conf.getBatcherBuilder() != null) {
            builder.batcherBuilder(conf.getBatcherBuilder());
        }
        if (conf.getCryptoKeyReader() != null) {
            builder.cryptoKeyReader(conf.getCryptoKeyReader());
        }
        for (String encryptionKey : conf.getEncryptionKeys()) {
            builder.addEncryptionKey(encryptionKey);
        }
        this.producer = builder.create();
        PulsarLogging.log.createdProducerWithConfig(this.channel, SchemaResolver.getSchemaName(schema), conf);
        long requests = oc.getMaxPendingMessages().intValue();
        if (requests <= 0L) {
            requests = Long.MAX_VALUE;
        }
        this.processor = new PulsarSenderProcessor(requests, oc.getWaitForWriteCompletion(), this::sendMessage);
        this.subscriber = MultiUtils.via((Flow.Processor)this.processor, m -> m.onFailure().invoke(f -> {
            PulsarLogging.log.unableToDispatch((Throwable)f);
            this.reportFailure((Throwable)f);
        }));
        PulsarAttributesExtractor AttributesExtractor2 = new PulsarAttributesExtractor();
        MessagingAttributesGetter<PulsarTrace, Void> messagingAttributesGetter = AttributesExtractor2.getMessagingAttributesGetter();
        InstrumenterBuilder instrumenterBuilder = Instrumenter.builder((OpenTelemetry)GlobalOpenTelemetry.get(), (String)"io.smallrye.reactive.messaging", (SpanNameExtractor)MessagingSpanNameExtractor.create(messagingAttributesGetter, (MessageOperation)MessageOperation.SEND));
        this.instrumenter = instrumenterBuilder.addAttributesExtractor(MessagingAttributesExtractor.create(messagingAttributesGetter, (MessageOperation)MessageOperation.SEND)).addAttributesExtractor((AttributesExtractor)AttributesExtractor2).buildProducerInstrumenter((TextMapSetter)PulsarTraceTextMapSetter.INSTANCE);
    }

    private Uni<Void> sendMessage(Message<?> message) {
        return Uni.createFrom().item(message).onItem().transform(m -> this.toMessageBuilder((Message<?>)m, this.producer)).onItem().transformToUni(mb -> Uni.createFrom().completionStage(() -> ((TypedMessageBuilder)mb).sendAsync())).onItemOrFailure().transformToUni((mid, t) -> {
            if (t == null) {
                OutgoingMessageMetadata.setResultOnMessage((Message)message, (Object)mid);
                return Uni.createFrom().completionStage(message.ack());
            }
            return Uni.createFrom().completionStage(message.nack(t));
        });
    }

    private TypedMessageBuilder<T> createMessageBuilder(Message<?> message, Transaction fallback) {
        Transaction transaction = message.getMetadata(PulsarTransactionMetadata.class).map(PulsarTransactionMetadata::getTransaction).orElse(fallback);
        return transaction != null ? this.producer.newMessage(transaction) : this.producer.newMessage();
    }

    private TypedMessageBuilder<T> toMessageBuilder(Message<?> message, Producer<T> producer) {
        TypedMessageBuilder<T> messageBuilder;
        Optional optionalMetadata = message.getMetadata(PulsarOutgoingMessageMetadata.class);
        if (optionalMetadata.isPresent()) {
            PulsarOutgoingMessageMetadata metadata = (PulsarOutgoingMessageMetadata)optionalMetadata.get();
            if (this.tracingEnabled) {
                TracingUtils.traceOutgoing(this.instrumenter, message, (Object)new PulsarTrace.Builder().withProperties(metadata.getProperties()).withSequenceId(metadata.getSequenceId()).withTopic(producer.getTopic()).build());
            }
            messageBuilder = this.createMessageBuilder(message, metadata.getTransaction());
            if (metadata.hasKey()) {
                if (metadata.getKeyBytes() != null) {
                    messageBuilder.keyBytes(metadata.getKeyBytes());
                } else {
                    messageBuilder.key(metadata.getKey());
                }
            }
            if (metadata.getOrderingKey() != null) {
                messageBuilder.orderingKey(metadata.getOrderingKey());
            }
            if (metadata.getProperties() != null) {
                messageBuilder.properties(metadata.getProperties());
            }
            if (metadata.getReplicatedClusters() != null) {
                messageBuilder.replicationClusters(metadata.getReplicatedClusters());
            }
            if (metadata.getReplicationDisabled() != null) {
                messageBuilder.disableReplication();
            }
            if (metadata.getEventTime() != null) {
                messageBuilder.eventTime(metadata.getEventTime().longValue());
            }
            if (metadata.getSequenceId() != null) {
                messageBuilder.sequenceId(metadata.getSequenceId().longValue());
            }
            if (metadata.getDeliverAt() != null) {
                messageBuilder.deliverAt(metadata.getDeliverAt().longValue());
            }
        } else {
            messageBuilder = this.createMessageBuilder(message, null);
            if (this.tracingEnabled) {
                HashMap<String, String> properties = new HashMap<String, String>();
                TracingUtils.traceOutgoing(this.instrumenter, message, (Object)new PulsarTrace.Builder().withProperties(properties).withTopic(producer.getTopic()).build());
                messageBuilder.properties(properties);
            }
        }
        Object payload = message.getPayload();
        if (payload instanceof OutgoingMessage) {
            OutgoingMessage outgoing = (OutgoingMessage)payload;
            if (outgoing.hasKey()) {
                if (outgoing.getKeyBytes() != null) {
                    messageBuilder.keyBytes(outgoing.getKeyBytes());
                } else {
                    messageBuilder.key(outgoing.getKey());
                }
            }
            if (outgoing.getProperties() != null) {
                messageBuilder.properties(outgoing.getProperties());
            }
            if (outgoing.getOrderingKey() != null) {
                messageBuilder.orderingKey(outgoing.getOrderingKey());
            }
            if (outgoing.getSequenceId() != null) {
                messageBuilder.sequenceId(outgoing.getSequenceId().longValue());
            }
            if (outgoing.getEventTime() != null) {
                messageBuilder.eventTime(outgoing.getEventTime().longValue());
            }
            if (outgoing.getDeliverAt() != null) {
                messageBuilder.deliverAt(outgoing.getDeliverAt().longValue());
            }
            if (outgoing.getReplicationDisabled()) {
                messageBuilder.disableReplication();
            }
            if (outgoing.getReplicatedClusters() != null) {
                messageBuilder.replicationClusters(outgoing.getReplicatedClusters());
            }
            return messageBuilder.value(outgoing.getValue());
        }
        return messageBuilder.value(message.getPayload());
    }

    public Flow.Subscriber<? extends Message<?>> getSubscriber() {
        return this.subscriber;
    }

    public String getChannel() {
        return this.channel;
    }

    public Producer<T> getProducer() {
        return this.producer;
    }

    public void close() {
        if (this.processor != null) {
            this.processor.cancel();
        }
        try {
            this.producer.close();
        }
        catch (PulsarClientException e) {
            PulsarLogging.log.unableToCloseProducer(e);
        }
    }

    private synchronized void reportFailure(Throwable failure) {
        if (this.failures.size() == 10) {
            this.failures.remove(0);
        }
        this.failures.add(failure);
    }

    public void isStarted(HealthReport.HealthReportBuilder builder) {
        if (this.healthEnabled) {
            builder.add(this.channel, this.producer.isConnected());
        }
    }

    public void isReady(HealthReport.HealthReportBuilder builder) {
        this.isStarted(builder);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void isAlive(HealthReport.HealthReportBuilder builder) {
        if (this.healthEnabled) {
            ArrayList<Throwable> actualFailures;
            PulsarOutgoingChannel pulsarOutgoingChannel = this;
            synchronized (pulsarOutgoingChannel) {
                actualFailures = new ArrayList<Throwable>(this.failures);
            }
            if (!actualFailures.isEmpty()) {
                builder.add(this.channel, false, actualFailures.stream().map(Throwable::getMessage).collect(Collectors.joining()));
            } else {
                builder.add(this.channel, true);
            }
        }
    }
}

