/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.kafka.client.producer.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.impl.VertxInternal;
import io.vertx.kafka.client.common.impl.CloseHandler;
import io.vertx.kafka.client.common.impl.Helper;
import io.vertx.kafka.client.producer.KafkaProducer;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import io.vertx.kafka.client.producer.KafkaWriteStream;
import io.vertx.kafka.client.producer.RecordMetadata;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.PartitionInfo;

public class KafkaProducerImpl<K, V>
implements KafkaProducer<K, V> {
    private static final Map<String, SharedProducer> sharedProducers = new HashMap<String, SharedProducer>();
    private final KafkaWriteStream<K, V> stream;
    private final CloseHandler closeHandler;

    public static <K, V> KafkaProducer<K, V> createShared(Vertx vertx, String name, Properties config) {
        return KafkaProducerImpl.createShared(vertx, name, () -> KafkaWriteStream.create(vertx, config));
    }

    public static <K, V> KafkaProducer<K, V> createShared(Vertx vertx, String name, Map<String, String> config) {
        return KafkaProducerImpl.createShared(vertx, name, () -> KafkaWriteStream.create(vertx, new HashMap<String, Object>(config)));
    }

    public static <K, V> KafkaProducer<K, V> createShared(Vertx vertx, String name, Properties config, Class<K> keyType, Class<V> valueType) {
        return KafkaProducerImpl.createShared(vertx, name, () -> KafkaWriteStream.create(vertx, config, keyType, valueType));
    }

    public static <K, V> KafkaProducer<K, V> createShared(Vertx vertx, String name, Map<String, String> config, Class<K> keyType, Class<V> valueType) {
        return KafkaProducerImpl.createShared(vertx, name, () -> KafkaWriteStream.create(vertx, new HashMap<String, Object>(config), keyType, valueType));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static <K, V> KafkaProducer<K, V> createShared(Vertx vertx, String name, Supplier<KafkaWriteStream> streamFactory) {
        Map<String, SharedProducer> map = sharedProducers;
        synchronized (map) {
            SharedProducer sharedProducer = sharedProducers.computeIfAbsent(name, key -> {
                KafkaWriteStream stream = (KafkaWriteStream)streamFactory.get();
                SharedProducer s = new SharedProducer(stream);
                s.closeHandler.registerCloseHook((VertxInternal)vertx);
                return s;
            });
            Object key2 = new Object();
            KafkaProducerImpl<K, V> producer = new KafkaProducerImpl<K, V>(sharedProducer.stream, new CloseHandler((timeout, ar) -> {
                Map<String, SharedProducer> map = sharedProducers;
                synchronized (map) {
                    sharedProducer.remove(key2);
                    if (sharedProducer.isEmpty()) {
                        sharedProducers.remove(name);
                        sharedProducer.closeHandler.close((long)timeout, (Handler<AsyncResult<Void>>)ar);
                        return;
                    }
                }
                ar.handle((Object)Future.succeededFuture());
            }));
            sharedProducer.put(key2, producer);
            return producer.registerCloseHook();
        }
    }

    public KafkaProducerImpl(KafkaWriteStream<K, V> stream, CloseHandler closeHandler) {
        this.stream = stream;
        this.closeHandler = closeHandler;
    }

    public KafkaProducerImpl(KafkaWriteStream<K, V> stream) {
        this(stream, new CloseHandler(stream::close));
    }

    public KafkaProducerImpl<K, V> registerCloseHook() {
        Context context = Vertx.currentContext();
        if (context == null) {
            return this;
        }
        this.closeHandler.registerCloseHook(context);
        return this;
    }

    @Override
    public KafkaProducer<K, V> exceptionHandler(Handler<Throwable> handler) {
        this.stream.exceptionHandler(handler);
        return this;
    }

    @Override
    public KafkaProducer<K, V> write(KafkaProducerRecord<K, V> kafkaProducerRecord) {
        return this.write(kafkaProducerRecord, null);
    }

    @Override
    public KafkaProducer<K, V> write(KafkaProducerRecord<K, V> record, Handler<AsyncResult<RecordMetadata>> handler) {
        this.stream.write(record.record(), (Handler<AsyncResult<org.apache.kafka.clients.producer.RecordMetadata>>)((Handler)done -> {
            if (done.succeeded()) {
                handler.handle((Object)Future.succeededFuture((Object)Helper.from((org.apache.kafka.clients.producer.RecordMetadata)done.result())));
            } else {
                handler.handle((Object)Future.failedFuture((Throwable)done.cause()));
            }
        }));
        return this;
    }

    @Override
    public KafkaProducer<K, V> partitionsFor(String topic, Handler<AsyncResult<List<io.vertx.kafka.client.common.PartitionInfo>>> handler) {
        this.stream.partitionsFor(topic, (Handler<AsyncResult<List<PartitionInfo>>>)((Handler)done -> {
            if (done.succeeded()) {
                ArrayList<io.vertx.kafka.client.common.PartitionInfo> partitions = new ArrayList<io.vertx.kafka.client.common.PartitionInfo>();
                for (PartitionInfo kafkaPartitionInfo : (List)done.result()) {
                    io.vertx.kafka.client.common.PartitionInfo partitionInfo = new io.vertx.kafka.client.common.PartitionInfo();
                    partitionInfo.setInSyncReplicas(Stream.of(kafkaPartitionInfo.inSyncReplicas()).map(Helper::from).collect(Collectors.toList())).setLeader(Helper.from(kafkaPartitionInfo.leader())).setPartition(kafkaPartitionInfo.partition()).setReplicas(Stream.of(kafkaPartitionInfo.replicas()).map(Helper::from).collect(Collectors.toList())).setTopic(kafkaPartitionInfo.topic());
                    partitions.add(partitionInfo);
                }
                handler.handle((Object)Future.succeededFuture(partitions));
            } else {
                handler.handle((Object)Future.failedFuture((Throwable)done.cause()));
            }
        }));
        return this;
    }

    @Override
    public void end() {
        this.stream.end();
    }

    @Override
    public void end(KafkaProducerRecord<K, V> kafkaProducerRecord) {
        this.stream.end(kafkaProducerRecord.record());
    }

    @Override
    public KafkaProducer<K, V> setWriteQueueMaxSize(int size) {
        this.stream.setWriteQueueMaxSize(size);
        return this;
    }

    @Override
    public boolean writeQueueFull() {
        return this.stream.writeQueueFull();
    }

    @Override
    public KafkaProducer<K, V> drainHandler(Handler<Void> handler) {
        this.stream.drainHandler(handler);
        return this;
    }

    @Override
    public KafkaProducer<K, V> flush(Handler<Void> completionHandler) {
        this.stream.flush(completionHandler);
        return this;
    }

    @Override
    public void close() {
        this.closeHandler.close();
    }

    @Override
    public void close(Handler<AsyncResult<Void>> completionHandler) {
        this.closeHandler.close(completionHandler);
    }

    @Override
    public void close(long timeout, Handler<AsyncResult<Void>> completionHandler) {
        this.closeHandler.close(completionHandler);
    }

    @Override
    public KafkaWriteStream<K, V> asStream() {
        return this.stream;
    }

    @Override
    public Producer<K, V> producer() {
        return this.stream.producer();
    }

    private static class SharedProducer
    extends HashMap<Object, KafkaProducer> {
        final KafkaWriteStream stream;
        final CloseHandler closeHandler;

        public SharedProducer(KafkaWriteStream stream) {
            this.stream = stream;
            this.closeHandler = new CloseHandler(stream::close);
        }
    }
}

