/*
 * Decompiled with CFR 0.152.
 */
package com.zendesk.maxwell.producer;

import com.zendesk.maxwell.MaxwellContext;
import com.zendesk.maxwell.producer.AbstractProducer;
import com.zendesk.maxwell.row.RowMap;
import com.zendesk.maxwell.util.TopicInterpolator;
import io.nats.client.Connection;
import io.nats.client.Nats;
import io.nats.client.Options;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NatsProducer
extends AbstractProducer {
    private final Logger LOGGER = LoggerFactory.getLogger(NatsProducer.class);
    private final Connection natsConnection;
    private final String natsSubjectTemplate;

    public NatsProducer(MaxwellContext context) {
        super(context);
        List<String> urls = Arrays.asList(context.getConfig().natsUrl.split(","));
        Options.Builder optionBuilder = new Options.Builder();
        urls.forEach(arg_0 -> ((Options.Builder)optionBuilder).server(arg_0));
        Options option = optionBuilder.build();
        this.natsSubjectTemplate = context.getConfig().natsSubject;
        try {
            this.natsConnection = Nats.connect((Options)option);
        }
        catch (IOException | InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void push(RowMap r) throws Exception {
        if (!r.shouldOutput(this.outputConfig)) {
            this.context.setPosition(r.getNextPosition());
            return;
        }
        String value = r.toJSON(this.outputConfig);
        String natsSubject = new TopicInterpolator(this.natsSubjectTemplate).generateFromRowMapAndCleanUpIllegalCharacters(r);
        long maxPayloadSize = this.natsConnection.getMaxPayload();
        byte[] messageBytes = value.getBytes(StandardCharsets.UTF_8);
        if ((long)messageBytes.length > maxPayloadSize) {
            this.LOGGER.error("->  nats message size (" + messageBytes.length + ") > max payload size (" + maxPayloadSize + ")");
            return;
        }
        this.natsConnection.publish(natsSubject, messageBytes);
        if (r.isTXCommit()) {
            this.context.setPosition(r.getNextPosition());
        }
        if (this.LOGGER.isDebugEnabled()) {
            this.LOGGER.debug("->  nats subject:{}, message:{}", (Object)natsSubject, (Object)value);
        }
    }
}

