/*
 * Decompiled with CFR 0.152.
 */
package com.azure.data.schemaregistry.avro;

import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.serializer.ObjectSerializer;
import com.azure.core.util.serializer.TypeReference;
import com.azure.data.schemaregistry.SchemaRegistryAsyncClient;
import com.azure.data.schemaregistry.avro.AvroSchemaRegistryUtils;
import com.azure.data.schemaregistry.models.SchemaProperties;
import com.azure.data.schemaregistry.models.SerializationType;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import reactor.core.publisher.Mono;

public final class SchemaRegistryAvroSerializer
implements ObjectSerializer {
    private final ClientLogger logger = new ClientLogger(SchemaRegistryAvroSerializer.class);
    static final int SCHEMA_ID_SIZE = 32;
    private final SchemaRegistryAsyncClient schemaRegistryClient;
    private final AvroSchemaRegistryUtils codec;
    private final String schemaGroup;
    private final Boolean autoRegisterSchemas;

    SchemaRegistryAvroSerializer(SchemaRegistryAsyncClient registryClient, AvroSchemaRegistryUtils codec, String schemaGroup, Boolean autoRegisterSchemas) {
        this.schemaRegistryClient = registryClient;
        this.codec = codec;
        this.schemaGroup = schemaGroup;
        this.autoRegisterSchemas = autoRegisterSchemas;
    }

    public <T> T deserialize(InputStream stream, TypeReference<T> typeReference) {
        return (T)this.deserializeAsync(stream, typeReference).block();
    }

    public <T> Mono<T> deserializeAsync(InputStream stream, TypeReference<T> typeReference) {
        if (stream == null) {
            return Mono.empty();
        }
        return Mono.fromCallable(() -> {
            byte[] payload = new byte[stream.available()];
            while (stream.read(payload) != -1) {
            }
            return payload;
        }).flatMap(payload -> {
            if (payload == null || ((byte[])payload).length == 0) {
                return Mono.empty();
            }
            ByteBuffer buffer = ByteBuffer.wrap(payload);
            String schemaId = this.getSchemaIdFromPayload(buffer);
            return this.schemaRegistryClient.getSchema(schemaId).handle((registryObject, sink) -> {
                byte[] payloadSchema = registryObject.getSchema();
                if (payloadSchema == null) {
                    sink.error((Throwable)this.logger.logExceptionAsError((RuntimeException)new NullPointerException(String.format("Payload schema returned as null. Schema type: %s, Schema ID: %s", registryObject.getSerializationType(), registryObject.getSchemaId()))));
                    return;
                }
                int start = buffer.position() + buffer.arrayOffset();
                int length = buffer.limit() - 32;
                byte[] b = Arrays.copyOfRange(buffer.array(), start, start + length);
                sink.next(this.codec.decode(b, payloadSchema));
            });
        });
    }

    public void serialize(OutputStream outputStream, Object value) {
        this.serializeAsync(outputStream, value).block();
    }

    public Mono<Void> serializeAsync(OutputStream outputStream, Object object) {
        if (object == null) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new NullPointerException("Null object, behavior should be defined in concrete serializer implementation."));
        }
        String schemaString = this.codec.getSchemaString(object);
        String schemaName = this.codec.getSchemaName(object);
        return this.maybeRegisterSchema(this.schemaGroup, schemaName, schemaString).handle((id, sink) -> {
            ByteBuffer idBuffer = ByteBuffer.allocate(32).put(id.getBytes(StandardCharsets.UTF_8));
            try {
                outputStream.write(idBuffer.array());
                outputStream.write(this.codec.encode(object));
                sink.complete();
            }
            catch (IOException e) {
                sink.error((Throwable)new UncheckedIOException(e.getMessage(), e));
            }
        });
    }

    private String getSchemaIdFromPayload(ByteBuffer buffer) {
        byte[] schemaGuidByteArray = new byte[32];
        buffer.get(schemaGuidByteArray);
        return new String(schemaGuidByteArray, StandardCharsets.UTF_8);
    }

    private Mono<String> maybeRegisterSchema(String schemaGroup, String schemaName, String schemaString) {
        if (this.autoRegisterSchemas.booleanValue()) {
            return this.schemaRegistryClient.registerSchema(schemaGroup, schemaName, schemaString, SerializationType.AVRO).map(SchemaProperties::getSchemaId);
        }
        return this.schemaRegistryClient.getSchemaId(schemaGroup, schemaName, schemaString, SerializationType.AVRO);
    }
}

