/*
 * Decompiled with CFR 0.152.
 */
package org.radarbase.producer.direct;

import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.radarbase.data.RecordData;
import org.radarbase.producer.KafkaSender;
import org.radarbase.producer.KafkaTopicSender;
import org.radarbase.topic.AvroTopic;

public class DirectSender
implements KafkaSender {
    private final KafkaProducer producer;

    public DirectSender(Properties properties) {
        this.producer = new KafkaProducer(properties);
    }

    public <K, V> KafkaTopicSender<K, V> sender(AvroTopic<K, V> topic) {
        return new DirectTopicSender<K, V>(topic);
    }

    public boolean resetConnection() {
        return true;
    }

    public boolean isConnected() {
        return true;
    }

    public void close() {
        this.producer.flush();
        this.producer.close();
    }

    private class DirectTopicSender<K, V>
    implements KafkaTopicSender<K, V> {
        private final String name;

        private DirectTopicSender(AvroTopic<K, V> topic) {
            this.name = topic.getName();
        }

        public void send(K key, V value) {
            DirectSender.this.producer.send(new ProducerRecord(this.name, key, value));
            DirectSender.this.producer.flush();
        }

        public void send(RecordData<K, V> records) {
            for (Object record : records) {
                DirectSender.this.producer.send(new ProducerRecord(this.name, records.getKey(), record));
            }
            DirectSender.this.producer.flush();
        }

        public void clear() {
        }

        public void flush() {
        }

        public void close() {
        }
    }
}

