/*
 * Decompiled with CFR 0.152.
 */
package com.azure.data.schemaregistry;

import com.azure.core.annotation.ReturnType;
import com.azure.core.annotation.ServiceClient;
import com.azure.core.annotation.ServiceMethod;
import com.azure.core.http.rest.Response;
import com.azure.core.http.rest.SimpleResponse;
import com.azure.core.util.Context;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.data.schemaregistry.SchemaRegistryClientBuilder;
import com.azure.data.schemaregistry.implementation.AzureSchemaRegistry;
import com.azure.data.schemaregistry.implementation.models.SchemaId;
import com.azure.data.schemaregistry.implementation.models.SchemasGetByIdHeaders;
import com.azure.data.schemaregistry.models.SchemaProperties;
import com.azure.data.schemaregistry.models.SerializationType;
import java.net.URI;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import reactor.core.publisher.Mono;

@ServiceClient(builder=SchemaRegistryClientBuilder.class, isAsync=true)
public final class SchemaRegistryAsyncClient {
    static final Charset SCHEMA_REGISTRY_SERVICE_ENCODING = StandardCharsets.UTF_8;
    private static final Pattern SCHEMA_PATTERN = Pattern.compile("/\\$schemagroups/(?<schemaGroup>.+)/schemas/(?<schemaName>.+?)/");
    private final ClientLogger logger = new ClientLogger(SchemaRegistryAsyncClient.class);
    private final AzureSchemaRegistry restService;
    private final Integer maxSchemaMapSize;
    private final ConcurrentSkipListMap<String, Function<String, Object>> typeParserMap;
    private final Map<String, SchemaProperties> idCache;
    private final Map<String, SchemaProperties> schemaStringCache;

    SchemaRegistryAsyncClient(AzureSchemaRegistry restService, int maxSchemaMapSize, ConcurrentSkipListMap<String, Function<String, Object>> typeParserMap) {
        this.restService = restService;
        this.maxSchemaMapSize = maxSchemaMapSize;
        this.typeParserMap = typeParserMap;
        this.idCache = new ConcurrentHashMap<String, SchemaProperties>();
        this.schemaStringCache = new ConcurrentHashMap<String, SchemaProperties>();
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<SchemaProperties> registerSchema(String schemaGroup, String schemaName, String schemaString, SerializationType serializationType) {
        return this.registerSchemaWithResponse(schemaGroup, schemaName, schemaString, serializationType).map(Response::getValue);
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<Response<SchemaProperties>> registerSchemaWithResponse(String schemaGroup, String schemaName, String schemaString, SerializationType serializationType) {
        return FluxUtil.withContext(context -> this.registerSchemaWithResponse(schemaGroup, schemaName, schemaString, serializationType, (Context)context));
    }

    Mono<Response<SchemaProperties>> registerSchemaWithResponse(String schemaGroup, String schemaName, String schemaString, SerializationType serializationType, Context context) {
        this.logger.verbose("Registering schema. Group: '{}', name: '{}', serialization type: '{}', payload: '{}'", new Object[]{schemaGroup, schemaName, serializationType, schemaString});
        return this.restService.getSchemas().registerWithResponseAsync(schemaGroup, schemaName, com.azure.data.schemaregistry.implementation.models.SerializationType.AVRO, schemaString).handle((response, sink) -> {
            SchemaId schemaId = response.getValue();
            SchemaProperties registered = new SchemaProperties(schemaId.getId(), serializationType, schemaName, schemaString.getBytes(SCHEMA_REGISTRY_SERVICE_ENCODING));
            this.schemaStringCache.putIfAbsent(SchemaRegistryAsyncClient.getSchemaStringCacheKey(schemaGroup, schemaName, schemaString), registered);
            this.idCache.putIfAbsent(schemaId.getId(), registered);
            this.logger.verbose("Cached schema string. Group: '{}', name: '{}'", new Object[]{schemaGroup, schemaName});
            SimpleResponse schemaRegistryObjectSimpleResponse = new SimpleResponse(response.getRequest(), response.getStatusCode(), response.getHeaders(), (Object)registered);
            sink.next((Object)schemaRegistryObjectSimpleResponse);
        });
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<SchemaProperties> getSchema(String schemaId) {
        if (this.idCache.containsKey(schemaId)) {
            this.logger.verbose("Cache hit for schema id '{}'", new Object[]{schemaId});
            return Mono.fromCallable(() -> this.idCache.get(schemaId));
        }
        return this.getSchemaWithResponse(schemaId).map(Response::getValue);
    }

    Mono<Response<SchemaProperties>> getSchemaWithResponse(String schemaId) {
        return FluxUtil.withContext(context -> this.getSchemaWithResponse(schemaId, (Context)context));
    }

    Mono<Response<SchemaProperties>> getSchemaWithResponse(String schemaId, Context context) {
        Objects.requireNonNull(schemaId, "'schemaId' should not be null");
        return this.restService.getSchemas().getByIdWithResponseAsync(schemaId).handle((response, sink) -> {
            SerializationType serializationType = SerializationType.fromString(((SchemasGetByIdHeaders)response.getDeserializedHeaders()).getSchemaType());
            URI location = URI.create(((SchemasGetByIdHeaders)response.getDeserializedHeaders()).getLocation());
            Matcher matcher = SCHEMA_PATTERN.matcher(location.getPath());
            if (!matcher.lookingAt()) {
                sink.error((Throwable)new IllegalArgumentException("Response location does not contain schema group or schema name. Location: " + location.getPath()));
                return;
            }
            String schemaGroup = matcher.group("schemaGroup");
            String schemaName = matcher.group("schemaName");
            SchemaProperties schemaObject = new SchemaProperties(schemaId, serializationType, schemaName, response.getValue());
            String schemaCacheKey = SchemaRegistryAsyncClient.getSchemaStringCacheKey(schemaGroup, schemaName, new String(response.getValue(), SCHEMA_REGISTRY_SERVICE_ENCODING));
            this.schemaStringCache.putIfAbsent(schemaCacheKey, schemaObject);
            this.idCache.putIfAbsent(schemaId, schemaObject);
            this.logger.verbose("Cached schema object. Path: '{}'", new Object[]{schemaId});
            SimpleResponse schemaResponse = new SimpleResponse(response.getRequest(), response.getStatusCode(), response.getHeaders(), (Object)schemaObject);
            sink.next((Object)schemaResponse);
        });
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<String> getSchemaId(String schemaGroup, String schemaName, String schemaString, SerializationType serializationType) {
        String schemaStringCacheKey = SchemaRegistryAsyncClient.getSchemaStringCacheKey(schemaGroup, schemaName, schemaString);
        if (this.schemaStringCache.containsKey(schemaStringCacheKey)) {
            return Mono.fromCallable(() -> {
                this.logger.verbose("Cache hit schema string. Group: '{}', name: '{}'", new Object[]{schemaGroup, schemaName});
                return this.schemaStringCache.get(schemaStringCacheKey).getSchemaId();
            });
        }
        return this.getSchemaIdWithResponse(schemaGroup, schemaName, schemaString, serializationType).map(response -> (String)response.getValue());
    }

    Mono<Response<String>> getSchemaIdWithResponse(String schemaGroup, String schemaName, String schemaString, SerializationType serializationType) {
        return FluxUtil.withContext(context -> this.getSchemaIdWithResponse(schemaGroup, schemaName, schemaString, serializationType, (Context)context));
    }

    Mono<Response<String>> getSchemaIdWithResponse(String schemaGroup, String schemaName, String schemaString, SerializationType serializationType, Context context) {
        return this.restService.getSchemas().queryIdByContentWithResponseAsync(schemaGroup, schemaName, com.azure.data.schemaregistry.implementation.models.SerializationType.AVRO, schemaString).handle((response, sink) -> {
            SchemaId schemaId = response.getValue();
            SchemaProperties properties = new SchemaProperties(schemaId.getId(), serializationType, schemaName, schemaString.getBytes(SCHEMA_REGISTRY_SERVICE_ENCODING));
            this.schemaStringCache.putIfAbsent(SchemaRegistryAsyncClient.getSchemaStringCacheKey(schemaGroup, schemaName, schemaString), properties);
            this.idCache.putIfAbsent(schemaId.getId(), properties);
            this.logger.verbose("Cached schema string. Group: '{}', name: '{}'", new Object[]{schemaGroup, schemaName});
            SimpleResponse schemaIdResponse = new SimpleResponse(response.getRequest(), response.getStatusCode(), response.getHeaders(), (Object)schemaId.getId());
            sink.next((Object)schemaIdResponse);
        });
    }

    void clearCache() {
        this.idCache.clear();
        this.schemaStringCache.clear();
        this.typeParserMap.clear();
    }

    private static String getSchemaStringCacheKey(String schemaGroup, String schemaName, String schemaString) {
        return schemaGroup + schemaName + schemaString;
    }
}

