/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.kafka.client.producer.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.kafka.client.producer.KafkaWriteStream;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.Serializer;

public class KafkaWriteStreamImpl<K, V>
implements KafkaWriteStream<K, V> {
    private long maxSize = 0x100000L;
    private long pending;
    private final Producer<K, V> producer;
    private Handler<Void> drainHandler;
    private Handler<Throwable> exceptionHandler;
    private final Context context;

    public static <K, V> KafkaWriteStreamImpl<K, V> create(Vertx vertx, Properties config) {
        return new KafkaWriteStreamImpl<K, V>(vertx.getOrCreateContext(), new KafkaProducer(config));
    }

    public static <K, V> KafkaWriteStreamImpl<K, V> create(Vertx vertx, Properties config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        return new KafkaWriteStreamImpl<K, V>(vertx.getOrCreateContext(), new KafkaProducer(config, keySerializer, valueSerializer));
    }

    public static <K, V> KafkaWriteStreamImpl<K, V> create(Vertx vertx, Map<String, Object> config) {
        return new KafkaWriteStreamImpl<K, V>(vertx.getOrCreateContext(), new KafkaProducer(config));
    }

    public static <K, V> KafkaWriteStreamImpl<K, V> create(Vertx vertx, Map<String, Object> config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        return new KafkaWriteStreamImpl<K, V>(vertx.getOrCreateContext(), new KafkaProducer(config, keySerializer, valueSerializer));
    }

    public KafkaWriteStreamImpl(Context context, Producer<K, V> producer) {
        this.producer = producer;
        this.context = context;
    }

    private int len(Object value) {
        if (value instanceof byte[]) {
            return ((byte[])value).length;
        }
        if (value instanceof String) {
            return ((String)value).length();
        }
        return 1;
    }

    @Override
    public KafkaWriteStream<K, V> send(ProducerRecord<K, V> record) {
        return this.send((ProducerRecord)record, (Handler)null);
    }

    @Override
    public synchronized KafkaWriteStreamImpl<K, V> send(ProducerRecord<K, V> record, Handler<AsyncResult<RecordMetadata>> handler) {
        int len = this.len(record.value());
        this.pending += (long)len;
        this.context.executeBlocking(fut -> {
            block6: {
                try {
                    this.producer.send(record, (metadata, err) -> this.context.runOnContext(v1 -> {
                        KafkaWriteStreamImpl kafkaWriteStreamImpl = this;
                        synchronized (kafkaWriteStreamImpl) {
                            if (err != null && this.exceptionHandler != null) {
                                Handler<Throwable> exceptionHandler = this.exceptionHandler;
                                this.context.runOnContext(v2 -> exceptionHandler.handle((Object)err));
                            }
                            long lowWaterMark = this.maxSize / 2L;
                            this.pending -= (long)len;
                            if (this.pending < lowWaterMark && this.drainHandler != null) {
                                Handler<Void> drainHandler = this.drainHandler;
                                this.drainHandler = null;
                                this.context.runOnContext(drainHandler);
                            }
                        }
                        if (handler != null) {
                            handler.handle((Object)(err != null ? Future.failedFuture((Throwable)err) : Future.succeededFuture((Object)metadata)));
                        }
                    }));
                }
                catch (Throwable e) {
                    KafkaWriteStreamImpl kafkaWriteStreamImpl = this;
                    synchronized (kafkaWriteStreamImpl) {
                        if (this.exceptionHandler != null) {
                            Handler<Throwable> exceptionHandler = this.exceptionHandler;
                            this.context.runOnContext(v3 -> exceptionHandler.handle((Object)e));
                        }
                    }
                    if (handler == null) break block6;
                    handler.handle((Object)Future.failedFuture((Throwable)e));
                }
            }
        }, null);
        return this;
    }

    public KafkaWriteStreamImpl<K, V> write(ProducerRecord<K, V> data, Handler<AsyncResult<Void>> handler) {
        Handler mdHandler = null;
        if (handler != null) {
            mdHandler = ar -> handler.handle((Object)ar.mapEmpty());
        }
        return this.send((ProducerRecord)data, mdHandler);
    }

    public KafkaWriteStreamImpl<K, V> write(ProducerRecord<K, V> record) {
        return this.write(record, (Handler<AsyncResult<Void>>)null);
    }

    public KafkaWriteStreamImpl<K, V> setWriteQueueMaxSize(int size) {
        this.maxSize = size;
        return this;
    }

    public synchronized boolean writeQueueFull() {
        return this.pending >= this.maxSize;
    }

    public synchronized KafkaWriteStreamImpl<K, V> drainHandler(Handler<Void> handler) {
        this.drainHandler = handler;
        return this;
    }

    public void end() {
    }

    public void end(Handler<AsyncResult<Void>> handler) {
        if (handler != null) {
            this.context.runOnContext(v -> handler.handle((Object)Future.succeededFuture()));
        }
    }

    public KafkaWriteStreamImpl<K, V> exceptionHandler(Handler<Throwable> handler) {
        this.exceptionHandler = handler;
        return this;
    }

    @Override
    public KafkaWriteStreamImpl<K, V> partitionsFor(String topic, Handler<AsyncResult<List<PartitionInfo>>> handler) {
        AtomicBoolean done = new AtomicBoolean();
        this.context.owner().setTimer(2000L, id -> {
            if (done.compareAndSet(false, true)) {
                handler.handle((Object)Future.failedFuture((String)"Kafka connect timeout"));
            }
        });
        this.context.executeBlocking(future -> {
            List partitions = this.producer.partitionsFor(topic);
            if (done.compareAndSet(false, true)) {
                future.complete((Object)partitions);
            }
        }, handler);
        return this;
    }

    @Override
    public KafkaWriteStreamImpl<K, V> flush(Handler<Void> completionHandler) {
        this.context.executeBlocking(future -> {
            this.producer.flush();
            future.complete();
        }, ar -> completionHandler.handle(null));
        return this;
    }

    @Override
    public void close() {
        this.close((Handler<AsyncResult<Void>>)((Handler)ar -> {}));
    }

    @Override
    public void close(Handler<AsyncResult<Void>> completionHandler) {
        this.close(0L, completionHandler);
    }

    @Override
    public void close(long timeout, Handler<AsyncResult<Void>> completionHandler) {
        this.context.executeBlocking(future -> {
            if (timeout > 0L) {
                this.producer.close(Duration.ofMillis(timeout));
            } else {
                this.producer.close();
            }
            future.complete();
        }, completionHandler);
    }

    @Override
    public Producer<K, V> unwrap() {
        return this.producer;
    }
}

