/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.kafka.client.producer.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.kafka.client.KafkaCodecs;
import io.vertx.kafka.client.producer.KafkaWriteStream;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.Serializer;

public class KafkaWriteStreamImpl<K, V>
implements KafkaWriteStream<K, V> {
    private long maxSize = 0x100000L;
    private long size;
    private final Producer<K, V> producer;
    private Handler<Void> drainHandler;
    private Handler<Throwable> exceptionHandler;
    private final Context context;

    public static <K, V> KafkaWriteStreamImpl<K, V> create(Vertx vertx, Properties config) {
        return new KafkaWriteStreamImpl<K, V>(vertx.getOrCreateContext(), new KafkaProducer(config));
    }

    public static <K, V> KafkaWriteStreamImpl<K, V> create(Vertx vertx, Properties config, Class<K> keyType, Class<V> valueType) {
        Serializer<K> keySerializer = KafkaCodecs.serializer(keyType);
        Serializer<V> valueSerializer = KafkaCodecs.serializer(valueType);
        return new KafkaWriteStreamImpl<K, V>(vertx.getOrCreateContext(), new KafkaProducer(config, keySerializer, valueSerializer));
    }

    public static <K, V> KafkaWriteStreamImpl<K, V> create(Vertx vertx, Map<String, Object> config) {
        return new KafkaWriteStreamImpl<K, V>(vertx.getOrCreateContext(), new KafkaProducer(config));
    }

    public static <K, V> KafkaWriteStreamImpl<K, V> create(Vertx vertx, Map<String, Object> config, Class<K> keyType, Class<V> valueType) {
        Serializer<K> keySerializer = KafkaCodecs.serializer(keyType);
        Serializer<V> valueSerializer = KafkaCodecs.serializer(valueType);
        return new KafkaWriteStreamImpl<K, V>(vertx.getOrCreateContext(), new KafkaProducer(config, keySerializer, valueSerializer));
    }

    public KafkaWriteStreamImpl(Context context, Producer<K, V> producer) {
        this.producer = producer;
        this.context = context;
    }

    private int len(Object value) {
        if (value instanceof byte[]) {
            return ((byte[])value).length;
        }
        if (value instanceof String) {
            return ((String)value).length();
        }
        return 1;
    }

    @Override
    public synchronized KafkaWriteStreamImpl<K, V> write(ProducerRecord<K, V> record, Handler<AsyncResult<RecordMetadata>> handler) {
        int len = this.len(record.value());
        this.size += (long)len;
        try {
            this.producer.send(record, (metadata, err) -> {
                KafkaWriteStreamImpl kafkaWriteStreamImpl = this;
                synchronized (kafkaWriteStreamImpl) {
                    if (err != null) {
                        if (this.exceptionHandler != null) {
                            Handler<Throwable> exceptionHandler = this.exceptionHandler;
                            this.context.runOnContext(v -> exceptionHandler.handle((Object)err));
                        }
                        if (handler != null) {
                            this.context.runOnContext(v -> handler.handle((Object)Future.failedFuture((Throwable)err)));
                        }
                    } else {
                        long lowWaterMark;
                        this.size -= (long)len;
                        if (handler != null) {
                            this.context.runOnContext(v -> handler.handle((Object)Future.succeededFuture((Object)metadata)));
                        }
                        if (this.size < (lowWaterMark = this.maxSize / 2L) && this.drainHandler != null) {
                            Handler<Void> drainHandler = this.drainHandler;
                            this.drainHandler = null;
                            this.context.runOnContext(drainHandler);
                        }
                    }
                }
            });
        }
        catch (Exception e) {
            this.size -= (long)len;
        }
        return this;
    }

    public KafkaWriteStreamImpl<K, V> write(ProducerRecord<K, V> record) {
        return this.write((ProducerRecord)record, (Handler)null);
    }

    public KafkaWriteStreamImpl<K, V> setWriteQueueMaxSize(int size) {
        this.maxSize = size;
        return this;
    }

    public synchronized boolean writeQueueFull() {
        return this.size >= this.maxSize;
    }

    public synchronized KafkaWriteStreamImpl<K, V> drainHandler(Handler<Void> handler) {
        this.drainHandler = handler;
        return this;
    }

    public void end() {
    }

    public KafkaWriteStreamImpl<K, V> exceptionHandler(Handler<Throwable> handler) {
        this.exceptionHandler = handler;
        return this;
    }

    @Override
    public KafkaWriteStreamImpl<K, V> partitionsFor(String topic, Handler<AsyncResult<List<PartitionInfo>>> handler) {
        AtomicBoolean done = new AtomicBoolean();
        this.context.owner().setTimer(2000L, id -> {
            if (done.compareAndSet(false, true)) {
                handler.handle((Object)Future.failedFuture((String)"Kafka connect timeout"));
            }
        });
        this.context.executeBlocking(future -> {
            List partitions = this.producer.partitionsFor(topic);
            if (done.compareAndSet(false, true)) {
                future.complete((Object)partitions);
            }
        }, handler);
        return this;
    }

    @Override
    public KafkaWriteStreamImpl<K, V> flush(Handler<Void> completionHandler) {
        this.context.executeBlocking(future -> {
            this.producer.flush();
            future.complete();
        }, ar -> completionHandler.handle(null));
        return this;
    }

    @Override
    public void close() {
        this.close((Handler<AsyncResult<Void>>)((Handler)ar -> {}));
    }

    @Override
    public void close(Handler<AsyncResult<Void>> completionHandler) {
        this.close(0L, completionHandler);
    }

    @Override
    public void close(long timeout, Handler<AsyncResult<Void>> completionHandler) {
        this.context.executeBlocking(future -> {
            if (timeout > 0L) {
                this.producer.close(timeout, TimeUnit.MILLISECONDS);
            } else {
                this.producer.close();
            }
            future.complete();
        }, completionHandler);
    }

    @Override
    public Producer<K, V> producer() {
        return this.producer;
    }
}

