/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.metrics.lib.impl.kafka;

import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kylin.metrics.lib.ActiveReservoir;
import org.apache.kylin.metrics.lib.ActiveReservoirListener;
import org.apache.kylin.metrics.lib.ActiveReservoirReporter;
import org.apache.kylin.metrics.lib.Record;
import org.apache.kylin.metrics.lib.impl.ReporterBuilder;
import org.apache.kylin.metrics.lib.impl.kafka.KafkaActiveReserviorListener;
import org.apache.kylin.metrics.lib.impl.kafka.KafkaSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaReservoirReporter
extends ActiveReservoirReporter {
    public static final String KAFKA_REPORTER_SUFFIX = "KAFKA";
    public static final KafkaSink sink = new KafkaSink();
    protected static final Logger logger = LoggerFactory.getLogger(KafkaReservoirReporter.class);
    private final ActiveReservoir activeReservoir;
    private final KafkaReservoirListener listener;

    private KafkaReservoirReporter(ActiveReservoir activeReservoir, Properties props) {
        this.activeReservoir = activeReservoir;
        this.listener = new KafkaReservoirListener(props);
    }

    public static Builder forRegistry(ActiveReservoir activeReservoir) {
        return new Builder(activeReservoir);
    }

    public static String decorateTopic(String topic) {
        return ActiveReservoirReporter.KYLIN_PREFIX + "_" + KAFKA_REPORTER_SUFFIX + "_" + topic;
    }

    public static String getTableFromSubject(String subject) {
        return sink.getTableFromSubject(subject);
    }

    public void start() {
        this.activeReservoir.addListener((ActiveReservoirListener)this.listener);
    }

    public void stop() {
        this.activeReservoir.removeListener((ActiveReservoirListener)this.listener);
    }

    public void close() {
        this.stop();
    }

    KafkaReservoirListener getListener() {
        return this.listener;
    }

    class KafkaReservoirListener
    extends KafkaActiveReserviorListener {
        protected final Producer<byte[], byte[]> producer;

        private KafkaReservoirListener(Properties props) {
            this.producer = new KafkaProducer(props);
        }

        @Override
        public void tryFetchMetadataFor(String topic) {
            this.producer.partitionsFor(topic);
        }

        @Override
        protected String decorateTopic(String topic) {
            return KafkaReservoirReporter.decorateTopic(topic);
        }

        @Override
        protected void send(String topic, Record record, Callback callback) {
            this.producer.send(new ProducerRecord(topic, (Object)record.getKey(), (Object)record.getValue()), callback);
        }

        public void close() {
            this.producer.close();
        }
    }

    public static class Builder
    extends ReporterBuilder {
        private Builder(ActiveReservoir activeReservoir) {
            super(activeReservoir);
        }

        private void setFixedProperties() {
            this.props.put("key.serializer", ByteArraySerializer.class.getName());
            this.props.put("value.serializer", ByteArraySerializer.class.getName());
        }

        public KafkaReservoirReporter build() {
            this.setFixedProperties();
            return new KafkaReservoirReporter(this.registry, this.props);
        }
    }
}

