/*
 * Decompiled with CFR 0.152.
 */
package org.radarbase.producer.rest;

import java.io.IOException;
import java.util.Collections;
import java.util.Objects;
import okhttp3.HttpUrl;
import okhttp3.MediaType;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import org.apache.avro.SchemaValidationException;
import org.json.JSONException;
import org.radarbase.data.AvroRecordData;
import org.radarbase.data.RecordData;
import org.radarbase.producer.AuthenticationException;
import org.radarbase.producer.KafkaTopicSender;
import org.radarbase.producer.rest.BinaryRecordRequest;
import org.radarbase.producer.rest.ConnectionState;
import org.radarbase.producer.rest.JsonRecordRequest;
import org.radarbase.producer.rest.ParsedSchemaMetadata;
import org.radarbase.producer.rest.RecordRequest;
import org.radarbase.producer.rest.RestSender;
import org.radarbase.producer.rest.SchemaRetriever;
import org.radarbase.producer.rest.TopicRequestBody;
import org.radarbase.producer.rest.UncheckedRequestException;
import org.radarbase.topic.AvroTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class RestTopicSender<K, V>
implements KafkaTopicSender<K, V> {
    private static final Logger logger = LoggerFactory.getLogger(RestTopicSender.class);
    private final AvroTopic<K, V> topic;
    private RecordRequest<K, V> requestData;
    private final RestSender sender;
    private final ConnectionState state;

    RestTopicSender(AvroTopic<K, V> topic, RestSender sender, ConnectionState state) throws SchemaValidationException {
        this.topic = topic;
        this.sender = sender;
        this.state = state;
        if (sender.getRequestContext().properties.binary) {
            try {
                this.requestData = new BinaryRecordRequest<K, V>(topic);
            }
            catch (IllegalArgumentException ex) {
                logger.warn("Cannot use Binary encoding for incompatible topic {}: {}", topic, (Object)ex.toString());
            }
        }
        if (this.requestData == null) {
            this.requestData = new JsonRecordRequest<K, V>(topic);
        }
    }

    @Override
    public void send(K key, V value) throws IOException, SchemaValidationException {
        this.send(new AvroRecordData<K, V>(this.topic, key, Collections.singletonList(value)));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void send(RecordData<K, V> records) throws IOException, SchemaValidationException {
        boolean doResend;
        block18: {
            RestSender.RequestContext context = this.sender.getRequestContext();
            Request request = this.buildRequest(context, records);
            doResend = false;
            try (Response response = context.client.request(request);){
                if (response.isSuccessful()) {
                    this.state.didConnect();
                    logger.debug("Added message to topic {}", this.topic);
                    break block18;
                }
                if (response.code() == 401 || response.code() == 403) {
                    this.state.wasUnauthorized();
                    break block18;
                }
                if (response.code() == 415) {
                    this.downgradeConnection(request, response);
                    doResend = true;
                    break block18;
                }
                throw UncheckedRequestException.fail(request, response, null);
            }
            catch (IOException ex) {
                this.state.didDisconnect();
                UncheckedRequestException.fail(request, null, ex).rethrow();
            }
            catch (UncheckedRequestException ex) {
                this.state.didDisconnect();
                ex.rethrow();
            }
            finally {
                this.requestData.reset();
            }
        }
        if (this.state.getState() == ConnectionState.State.UNAUTHORIZED) {
            throw new AuthenticationException("Request unauthorized");
        }
        if (doResend) {
            this.send(records);
        }
    }

    private void updateRecords(RestSender.RequestContext context, RecordData<K, V> records) throws IOException, SchemaValidationException {
        ParsedSchemaMetadata valueMetadata;
        ParsedSchemaMetadata keyMetadata;
        if (!context.properties.binary && this.requestData instanceof BinaryRecordRequest) {
            this.requestData = new JsonRecordRequest<K, V>(this.topic);
        }
        String sendTopic = this.topic.getName();
        SchemaRetriever retriever = this.sender.getSchemaRetriever();
        try {
            keyMetadata = retriever.getOrSetSchemaMetadata(sendTopic, false, this.topic.getKeySchema(), -1);
            valueMetadata = retriever.getOrSetSchemaMetadata(sendTopic, true, this.topic.getValueSchema(), -1);
        }
        catch (IOException | JSONException ex) {
            throw new IOException("Failed to get schemas for topic " + this.topic, ex);
        }
        this.requestData.prepare(keyMetadata, valueMetadata, records);
    }

    private void downgradeConnection(Request request, Response response) throws IOException {
        if (this.requestData instanceof BinaryRecordRequest) {
            this.state.didConnect();
            logger.warn("Binary Avro encoding is not supported. Switching to JSON encoding.");
            this.sender.useLegacyEncoding("application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json", RestSender.KAFKA_REST_AVRO_ENCODING, false);
            this.requestData = new JsonRecordRequest<K, V>(this.topic);
        } else if (Objects.equals(request.header("Accept"), "application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json")) {
            this.state.didConnect();
            logger.warn("Latest Avro encoding is not supported. Switching to legacy encoding.");
            this.sender.useLegacyEncoding("application/vnd.kafka.v1+json, application/vnd.kafka+json, application/json", RestSender.KAFKA_REST_AVRO_LEGACY_ENCODING, false);
        } else {
            MediaType contentType;
            RequestBody body = request.body();
            MediaType mediaType = contentType = body != null ? body.contentType() : null;
            if (contentType == null || contentType.equals((Object)RestSender.KAFKA_REST_AVRO_LEGACY_ENCODING)) {
                throw UncheckedRequestException.fail(request, response, new IOException("Content-Type " + contentType + " not accepted by server."));
            }
            this.state.didConnect();
            logger.warn("Content-Type changed during request");
        }
    }

    private Request buildRequest(RestSender.RequestContext context, RecordData<K, V> records) throws IOException, SchemaValidationException {
        this.updateRecords(context, records);
        HttpUrl sendToUrl = context.client.getRelativeUrl("topics/" + this.topic.getName());
        Request.Builder requestBuilder = new Request.Builder().url(sendToUrl).headers(context.properties.headers).header("Accept", context.properties.acceptType);
        MediaType contentType = context.properties.contentType;
        if (contentType.equals((Object)RestSender.KAFKA_REST_BINARY_ENCODING) && !(this.requestData instanceof BinaryRecordRequest)) {
            contentType = RestSender.KAFKA_REST_AVRO_ENCODING;
        }
        TopicRequestBody requestBody = new TopicRequestBody(this.requestData, contentType);
        return requestBuilder.post((RequestBody)requestBody).build();
    }

    @Override
    public void clear() {
    }

    @Override
    public void flush() {
    }

    @Override
    public void close() {
    }
}

