/*
 * Decompiled with CFR 0.152.
 */
package net.mguenther.kafka.junit.provider;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import net.mguenther.kafka.junit.KeyValue;
import net.mguenther.kafka.junit.RecordProducer;
import net.mguenther.kafka.junit.SendKeyValues;
import net.mguenther.kafka.junit.SendKeyValuesTransactional;
import net.mguenther.kafka.junit.SendValues;
import net.mguenther.kafka.junit.SendValuesTransactional;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;

public class DefaultRecordProducer
implements RecordProducer {
    private final String bootstrapServers;

    @Override
    public <V> List<RecordMetadata> send(SendValues<V> sendRequest) throws InterruptedException {
        Collection records = sendRequest.getValues().stream().map(value -> new KeyValue<String, Object>(null, value)).collect(Collectors.toList());
        SendKeyValues keyValueRequest = SendKeyValues.to(sendRequest.getTopic(), records).withAll(sendRequest.getProducerProps()).build();
        return this.send(keyValueRequest);
    }

    @Override
    public <V> List<RecordMetadata> send(SendValuesTransactional<V> sendRequest) throws InterruptedException {
        HashMap<String, Collection<KeyValue<String, Collection>>> recordsPerTopic = new HashMap<String, Collection<KeyValue<String, Collection>>>();
        for (String topic : sendRequest.getValuesPerTopic().keySet()) {
            Collection records = sendRequest.getValuesPerTopic().get(topic).stream().map(value -> new KeyValue<String, Object>(null, value)).collect(Collectors.toList());
            recordsPerTopic.put(topic, records);
        }
        SendKeyValuesTransactional keyValueRequest = SendKeyValuesTransactional.inTransaction(recordsPerTopic).withAll(sendRequest.getProducerProps()).withFailTransaction(sendRequest.shouldFailTransaction()).build();
        return this.send(keyValueRequest);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Override
    public <K, V> List<RecordMetadata> send(SendKeyValues<K, V> sendRequest) throws InterruptedException {
        KafkaProducer producer = new KafkaProducer(this.effectiveProducerProps(sendRequest.getProducerProps()));
        ArrayList metadata = new ArrayList(sendRequest.getRecords().size());
        try {
            for (KeyValue<K, V> record : sendRequest.getRecords()) {
                ProducerRecord producerRecord = new ProducerRecord(sendRequest.getTopic(), null, record.getKey(), record.getValue(), (Iterable)record.getHeaders());
                Future f = producer.send(producerRecord);
                try {
                    metadata.add(f.get());
                }
                catch (ExecutionException e) {
                    if (!RuntimeException.class.isAssignableFrom(e.getCause().getClass())) throw new RuntimeException(e.getCause());
                    throw (RuntimeException)e.getCause();
                    return Collections.unmodifiableList(metadata);
                }
            }
        }
        finally {
            producer.flush();
            producer.close();
        }
    }

    @Override
    public <K, V> List<RecordMetadata> send(SendKeyValuesTransactional<K, V> sendRequest) throws InterruptedException {
        KafkaProducer producer = new KafkaProducer(this.effectiveProducerProps(sendRequest.getProducerProps()));
        ArrayList metadata = new ArrayList();
        try {
            producer.initTransactions();
            producer.beginTransaction();
            for (String topic : sendRequest.getRecordsPerTopic().keySet()) {
                for (KeyValue<K, V> record : sendRequest.getRecordsPerTopic().get(topic)) {
                    ProducerRecord producerRecord = new ProducerRecord(topic, null, record.getKey(), record.getValue(), (Iterable)record.getHeaders());
                    Future f = producer.send(producerRecord);
                    try {
                        metadata.add(f.get());
                    }
                    catch (ExecutionException e) {
                        if (RuntimeException.class.isAssignableFrom(e.getCause().getClass())) {
                            throw (RuntimeException)e.getCause();
                        }
                        throw new RuntimeException(e.getCause());
                    }
                }
            }
            if (sendRequest.shouldFailTransaction()) {
                producer.abortTransaction();
            } else {
                producer.commitTransaction();
            }
        }
        catch (ProducerFencedException e) {
            producer.abortTransaction();
            String message = String.format("There happens to be another producer that shares the transactional ID '%s'with this producer, but that has a newer epoch assigned to it. This producer has been fenced off, it can no longer write to the log transactionally. Hence, the ongoing transaction is aborted and the producer closed.", sendRequest.getProducerProps().get("transactional.id"));
            throw new RuntimeException(message, e);
        }
        catch (OutOfOrderSequenceException e) {
            producer.abortTransaction();
            String message = "This producer has received out-of-band sequence numbers. This is a fatal condition and thus, the producer is no longer able to log transactionally and reliably. Hence, the ongoing transaction is aborted and the producer closed.";
            throw new RuntimeException("This producer has received out-of-band sequence numbers. This is a fatal condition and thus, the producer is no longer able to log transactionally and reliably. Hence, the ongoing transaction is aborted and the producer closed.", e);
        }
        finally {
            producer.flush();
            producer.close();
        }
        return Collections.unmodifiableList(metadata);
    }

    private Properties effectiveProducerProps(Properties originalProducerProps) {
        Properties effectiveProducerProps = new Properties();
        effectiveProducerProps.putAll((Map<?, ?>)originalProducerProps);
        effectiveProducerProps.put("bootstrap.servers", this.bootstrapServers);
        return effectiveProducerProps;
    }

    public DefaultRecordProducer(String bootstrapServers) {
        this.bootstrapServers = bootstrapServers;
    }
}

