/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.testing.utils;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.druid.indexer.TaskIdUtils;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.utils.StreamEventWriter;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;

public class KafkaEventWriter
implements StreamEventWriter {
    private static final String TEST_PROPERTY_PREFIX = "kafka.test.property.";
    private final KafkaProducer<String, byte[]> producer;
    private final boolean txnEnabled;
    private final List<Future<RecordMetadata>> pendingWriteRecords = new ArrayList<Future<RecordMetadata>>();

    public KafkaEventWriter(IntegrationTestingConfig config, boolean txnEnabled) {
        Properties properties = new Properties();
        this.addFilteredProperties(config, properties);
        properties.setProperty("bootstrap.servers", config.getKafkaHost());
        properties.setProperty("acks", "all");
        properties.setProperty("retries", "3");
        properties.setProperty("key.serializer", ByteArraySerializer.class.getName());
        properties.setProperty("value.serializer", ByteArraySerializer.class.getName());
        this.txnEnabled = txnEnabled;
        if (txnEnabled) {
            properties.setProperty("enable.idempotence", "true");
            properties.setProperty("transactional.id", TaskIdUtils.getRandomId());
        }
        this.producer = new KafkaProducer(properties, (Serializer)new StringSerializer(), (Serializer)new ByteArraySerializer());
        if (txnEnabled) {
            this.producer.initTransactions();
        }
    }

    @Override
    public boolean isTransactionEnabled() {
        return this.txnEnabled;
    }

    @Override
    public void initTransaction() {
        if (!this.txnEnabled) {
            throw new IllegalStateException("Kafka writer was initialized with transaction disabled");
        }
        this.producer.beginTransaction();
    }

    @Override
    public void commitTransaction() {
        if (!this.txnEnabled) {
            throw new IllegalStateException("Kafka writer was initialized with transaction disabled");
        }
        this.producer.commitTransaction();
    }

    @Override
    public void write(String topic, byte[] event) {
        Future future = this.producer.send(new ProducerRecord(topic, (Object)event));
        this.pendingWriteRecords.add(future);
    }

    @Override
    public void close() {
        this.flush();
        this.producer.close();
    }

    @Override
    public void flush() {
        Exception e = null;
        for (Future<RecordMetadata> future : this.pendingWriteRecords) {
            try {
                future.get();
            }
            catch (InterruptedException | ExecutionException ex) {
                if (ex instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                }
                if (e == null) {
                    e = ex;
                    continue;
                }
                e.addSuppressed(ex);
            }
        }
        this.pendingWriteRecords.clear();
        if (e != null) {
            throw new RuntimeException(e);
        }
    }

    private void addFilteredProperties(IntegrationTestingConfig config, Properties properties) {
        for (Map.Entry<String, String> entry : config.getProperties().entrySet()) {
            if (!entry.getKey().startsWith(TEST_PROPERTY_PREFIX)) continue;
            properties.setProperty(entry.getKey().substring(TEST_PROPERTY_PREFIX.length()), entry.getValue());
        }
    }
}

