/*
 * Decompiled with CFR 0.152.
 */
package io.apicurio.registry.serde.avro;

import io.apicurio.registry.resolver.ParsedSchema;
import io.apicurio.registry.resolver.SchemaParser;
import io.apicurio.registry.resolver.SchemaResolver;
import io.apicurio.registry.resolver.strategy.ArtifactReferenceResolverStrategy;
import io.apicurio.registry.resolver.utils.Utils;
import io.apicurio.registry.rest.client.RegistryClient;
import io.apicurio.registry.serde.AbstractKafkaSerializer;
import io.apicurio.registry.serde.avro.AvroDatumProvider;
import io.apicurio.registry.serde.avro.AvroEncoding;
import io.apicurio.registry.serde.avro.AvroKafkaSerdeConfig;
import io.apicurio.registry.serde.avro.AvroSchemaParser;
import io.apicurio.registry.serde.avro.AvroSerdeHeaders;
import io.apicurio.registry.serde.avro.NonRecordContainer;
import io.apicurio.registry.serde.config.BaseKafkaSerDeConfig;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import org.apache.avro.Schema;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.kafka.common.header.Headers;

public class AvroKafkaSerializer<U>
extends AbstractKafkaSerializer<Schema, U> {
    private final EncoderFactory encoderFactory = EncoderFactory.get();
    private AvroSchemaParser<U> parser;
    private AvroDatumProvider<U> avroDatumProvider;
    private AvroEncoding encoding;
    private AvroSerdeHeaders avroHeaders;

    public AvroKafkaSerializer() {
    }

    public AvroKafkaSerializer(RegistryClient client) {
        super(client);
    }

    public AvroKafkaSerializer(SchemaResolver<Schema, U> schemaResolver) {
        super(schemaResolver);
    }

    public AvroKafkaSerializer(RegistryClient client, ArtifactReferenceResolverStrategy<Schema, U> artifactResolverStrategy, SchemaResolver<Schema, U> schemaResolver) {
        super(client, artifactResolverStrategy, schemaResolver);
    }

    private AvroKafkaSerializer<U> setAvroDatumProvider(AvroDatumProvider<U> avroDatumProvider) {
        this.avroDatumProvider = Objects.requireNonNull(avroDatumProvider);
        return this;
    }

    public void configure(Map<String, ?> configs, boolean isKey) {
        AvroKafkaSerdeConfig config = new AvroKafkaSerdeConfig(configs);
        this.encoding = config.getAvroEncoding();
        Class<?> adp = config.getAvroDatumProvider();
        Consumer<AvroDatumProvider> consumer = this::setAvroDatumProvider;
        Utils.instantiate(AvroDatumProvider.class, adp, consumer);
        this.avroDatumProvider.configure(config);
        this.avroHeaders = new AvroSerdeHeaders(isKey);
        this.parser = new AvroSchemaParser<U>(this.avroDatumProvider);
        super.configure((BaseKafkaSerDeConfig)config, isKey);
    }

    public SchemaParser<Schema, U> schemaParser() {
        return this.parser;
    }

    protected void serializeData(ParsedSchema<Schema> schema, U data, OutputStream out) throws IOException {
        Encoder encoder = this.createEncoder((Schema)schema.getParsedSchema(), out);
        if (data instanceof NonRecordContainer) {
            data = ((NonRecordContainer)NonRecordContainer.class.cast(data)).getValue();
        }
        DatumWriter<U> writer = this.avroDatumProvider.createDatumWriter(data, (Schema)schema.getParsedSchema());
        writer.write(data, encoder);
        encoder.flush();
    }

    protected void serializeData(Headers headers, ParsedSchema<Schema> schema, U data, OutputStream out) throws IOException {
        if (headers != null) {
            this.avroHeaders.addEncodingHeader(headers, this.encoding.name());
        }
        this.serializeData(schema, data, out);
    }

    private Encoder createEncoder(Schema schema, OutputStream os) throws IOException {
        if (this.encoding == AvroEncoding.JSON) {
            return this.encoderFactory.jsonEncoder(schema, os);
        }
        return this.encoderFactory.directBinaryEncoder(os, null);
    }
}

