/*
 * Decompiled with CFR 0.152.
 */
package org.hawkular.apm.client.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.hawkular.apm.api.services.Publisher;
import org.hawkular.apm.api.services.PublisherMetricHandler;
import org.hawkular.apm.api.services.ServiceStatus;
import org.hawkular.apm.api.utils.PropertyUtil;

public abstract class AbstractPublisherKafka<T>
implements Publisher<T>,
ServiceStatus {
    private static ObjectMapper mapper = new ObjectMapper();
    private Producer<String, String> producer;
    private String topic;
    private PublisherMetricHandler<T> handler = null;

    public AbstractPublisherKafka(String topic) {
        this.topic = topic;
        if (this.isAvailable()) {
            this.init();
        }
    }

    public boolean isAvailable() {
        String uri = PropertyUtil.getProperty((String)"HAWKULAR_APM_URI_PUBLISHER", (String)PropertyUtil.getProperty((String)"HAWKULAR_APM_URI"));
        return uri != null && uri.startsWith("kafka:");
    }

    protected void init() {
        Properties props = new Properties();
        props.put("bootstrap.servers", PropertyUtil.getProperty((String)"HAWKULAR_APM_URI_PUBLISHER", (String)PropertyUtil.getProperty((String)"HAWKULAR_APM_URI")).substring("kafka:".length()));
        props.put("acks", "all");
        props.put("retries", PropertyUtil.getPropertyAsInteger((String)"HAWKULAR_APM_KAFKA_PRODUCER_RETRIES", (Integer)3));
        props.put("batch.size", (Object)16384);
        props.put("linger.ms", (Object)1);
        props.put("buffer.memory", (Object)0x2000000);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        this.producer = new KafkaProducer(props);
    }

    public int getInitialRetryCount() {
        return 0;
    }

    public void publish(String tenantId, List<T> items) throws Exception {
        this.publish(tenantId, items, this.getInitialRetryCount(), 0L);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void publish(String tenantId, List<T> items, int retryCount, long delay) throws Exception {
        if (delay > 0L) {
            try {
                AbstractPublisherKafka abstractPublisherKafka = this;
                synchronized (abstractPublisherKafka) {
                    this.wait(delay);
                }
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        long startTime = 0L;
        if (this.handler != null) {
            startTime = System.nanoTime();
        }
        for (int i = 0; i < items.size(); ++i) {
            String data = mapper.writeValueAsString(items.get(i));
            this.producer.send(new ProducerRecord(this.topic, (Object)data));
        }
        if (this.handler != null) {
            this.handler.published(tenantId, items, TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - startTime));
        }
    }

    public void retry(String tenantId, List<T> items, String subscriber, int retryCount, long delay) throws Exception {
        throw new UnsupportedOperationException("Retry not supported for this publisher");
    }

    public void setMetricHandler(PublisherMetricHandler<T> handler) {
        this.handler = handler;
    }
}

