/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.direct.kafka;

import cz.o2.proxima.direct.core.AbstractOnlineAttributeWriter;
import cz.o2.proxima.direct.core.CommitCallback;
import cz.o2.proxima.direct.core.OnlineAttributeWriter;
import cz.o2.proxima.direct.kafka.ElementSerializer;
import cz.o2.proxima.direct.kafka.KafkaAccessor;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.producer.KafkaProducer;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.producer.ProducerRecord;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.serialization.Serdes;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.commitlog.Partitioner;
import cz.o2.proxima.util.Pair;
import java.io.Serializable;
import java.util.Properties;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaWriter
extends AbstractOnlineAttributeWriter {
    private static final Logger log = LoggerFactory.getLogger(KafkaWriter.class);
    final KafkaAccessor accessor;
    private final Partitioner partitioner;
    private final String topic;
    private final ElementSerializer<?, ?> serializer;
    @Nullable
    private transient KafkaProducer<String, byte[]> producer;

    KafkaWriter(KafkaAccessor accessor) {
        super(accessor.getEntityDescriptor(), accessor.getUri());
        this.accessor = accessor;
        this.partitioner = accessor.getPartitioner();
        this.topic = accessor.getTopic();
        this.serializer = accessor.getSerializer();
    }

    public void write(StreamElement data, CommitCallback callback) {
        try {
            if (this.producer == null) {
                this.producer = this.createProducer();
            }
            int partition = (this.partitioner.getPartitionId(data) & Integer.MAX_VALUE) % this.producer.partitionsFor(this.topic).size();
            Pair<?, ?> output = this.serializer.write(data);
            this.producer.send(new ProducerRecord<Object, Object>(this.topic, (Integer)partition, data.getStamp(), output.getFirst(), output.getSecond()), (metadata, exception) -> {
                log.debug("Written {} to topic {} offset {} and partition {}", data, metadata.topic(), metadata.offset(), metadata.partition());
                callback.commit(exception == null, (Throwable)exception);
            });
        }
        catch (Exception ex) {
            log.warn("Failed to write ingest {}", (Object)data, (Object)ex);
            callback.commit(false, (Throwable)ex);
        }
    }

    public OnlineAttributeWriter.Factory<?> asFactory() {
        KafkaAccessor accessor = this.accessor;
        return (OnlineAttributeWriter.Factory & Serializable)repo -> new KafkaWriter(accessor);
    }

    private KafkaProducer<String, byte[]> createProducer() {
        Properties props = this.accessor.createProps();
        props.put("acks", "all");
        props.put("bootstrap.servers", this.getUri().getAuthority());
        props.put("batch.size", (Object)16384);
        return new KafkaProducer<String, byte[]>(props, Serdes.String().serializer(), Serdes.ByteArray().serializer());
    }

    public void close() {
        if (this.producer != null) {
            this.producer.close();
            this.producer = null;
        }
    }

    public KafkaAccessor getAccessor() {
        return this.accessor;
    }
}

