/*
 * Decompiled with CFR 0.152.
 */
package io.cloudevents.kafka;

import io.cloudevents.Attributes;
import io.cloudevents.CloudEvent;
import io.cloudevents.format.Wire;
import io.cloudevents.format.builder.EventStep;
import java.util.AbstractMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CloudEventsKafkaProducer<K, A extends Attributes, T>
implements Producer<K, CloudEvent<A, T>> {
    private static final Logger log = LoggerFactory.getLogger(CloudEventsKafkaProducer.class);
    private final Producer<K, byte[]> producer;
    private final EventStep<A, T, byte[], byte[]> builder;

    public CloudEventsKafkaProducer(Properties configuration, EventStep<A, T, byte[], byte[]> builder) {
        Objects.requireNonNull(configuration);
        Objects.requireNonNull(builder);
        Optional.ofNullable(configuration.get("value.serializer")).map(config -> config.toString()).filter(config -> !config.contains(ByteArraySerializer.class.getName())).ifPresent(wrong -> {
            log.warn("Fixing the wrong deserializer {}", wrong);
            configuration.put("value.serializer", ByteArraySerializer.class);
        });
        this.builder = builder;
        this.producer = new KafkaProducer(configuration);
    }

    private Wire<byte[], String, byte[]> marshal(Supplier<CloudEvent<A, T>> event) {
        return Optional.ofNullable(this.builder).map(step -> step.withEvent(event)).map(marshaller -> marshaller.marshal()).get();
    }

    private Set<Header> marshal(Map<String, byte[]> headers) {
        return headers.entrySet().stream().map(header -> new AbstractMap.SimpleEntry(header.getKey(), header.getValue())).map(header -> new RecordHeader((String)header.getKey(), (byte[])header.getValue())).collect(Collectors.toSet());
    }

    private ProducerRecord<K, byte[]> marshal(ProducerRecord<K, CloudEvent<A, T>> event) {
        Wire<byte[], String, byte[]> wire = this.marshal(() -> (CloudEvent)event.value());
        Set<Header> headers = this.marshal(wire.getHeaders());
        byte[] payload = wire.getPayload().orElse(null);
        ProducerRecord record = new ProducerRecord(event.topic(), event.partition(), event.timestamp(), event.key(), (Object)payload, headers);
        return record;
    }

    public Future<RecordMetadata> send(ProducerRecord<K, CloudEvent<A, T>> event) {
        return this.producer.send(this.marshal(event));
    }

    public Future<RecordMetadata> send(ProducerRecord<K, CloudEvent<A, T>> event, Callback callback) {
        return this.producer.send(this.marshal(event), callback);
    }

    public void abortTransaction() throws ProducerFencedException {
        this.producer.abortTransaction();
    }

    public void beginTransaction() throws ProducerFencedException {
        this.producer.beginTransaction();
    }

    public void close() {
        this.producer.close();
    }

    public void close(long arg0, TimeUnit arg1) {
        this.producer.close(arg0, arg1);
    }

    public void commitTransaction() throws ProducerFencedException {
        this.producer.commitTransaction();
    }

    public void flush() {
        this.producer.flush();
    }

    public void initTransactions() {
        this.producer.initTransactions();
    }

    public Map<MetricName, ? extends Metric> metrics() {
        return this.producer.metrics();
    }

    public List<PartitionInfo> partitionsFor(String arg0) {
        return this.producer.partitionsFor(arg0);
    }

    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> arg0, String arg1) throws ProducerFencedException {
        this.producer.sendOffsetsToTransaction(arg0, arg1);
    }
}

