/*
 * Decompiled with CFR 0.152.
 */
package ai.superstream;

import ai.superstream.Superstream;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.shaded.com.google.protobuf.Descriptors;

public class SuperstreamDeserializer<T>
implements Deserializer<T> {
    private Deserializer<T> originalDeserializer;
    private Superstream superstreamConnection;

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        try {
            Deserializer originalDeserializerT;
            Class originalDeserializerClass;
            Object originalDeserializerObj = configs.get("original.deserializer");
            if (originalDeserializerObj == null) {
                throw new Exception("original deserializer is required");
            }
            if (originalDeserializerObj instanceof String) {
                originalDeserializerClass = Class.forName((String)originalDeserializerObj);
            } else if (originalDeserializerObj instanceof Class) {
                originalDeserializerClass = (Class)originalDeserializerObj;
            } else {
                throw new Exception("Invalid type for original deserializer");
            }
            this.originalDeserializer = originalDeserializerT = (Deserializer)originalDeserializerClass.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
            this.originalDeserializer.configure(configs, isKey);
            Superstream superstreamConn = (Superstream)configs.get("superstream.connection");
            if (superstreamConn == null) {
                System.out.println("Failed to connect to Superstream");
            } else {
                this.superstreamConnection = superstreamConn;
            }
        }
        catch (Exception e) {
            String errMsg = String.format("superstream: error initializing superstream: %s", e.getMessage());
            if (this.superstreamConnection != null) {
                this.superstreamConnection.handleError(errMsg);
            }
            System.out.println(errMsg);
        }
    }

    @Override
    public T deserialize(String topic, byte[] data) {
        if (this.originalDeserializer == null) {
            return null;
        }
        T deserializedData = this.originalDeserializer.deserialize(topic, data);
        return deserializedData;
    }

    @Override
    public T deserialize(String topic, Headers headers, byte[] data) {
        byte[] dataToDesrialize;
        if (this.originalDeserializer == null) {
            return null;
        }
        String schemaId = null;
        Header header = headers.lastHeader("superstream_schema");
        if (header != null) {
            schemaId = new String(header.value(), StandardCharsets.UTF_8);
        }
        if ((dataToDesrialize = data) == null) {
            this.originalDeserializer.deserialize(topic, headers, dataToDesrialize);
        }
        if (this.superstreamConnection != null) {
            this.superstreamConnection.updateClientCounters(counters -> counters.incrementTotalBytesAfterReduction(data.length));
        }
        if (schemaId != null) {
            if (!this.superstreamConnection.superstreamReady.booleanValue()) {
                int totalWaitTime = 60;
                int checkInterval = 5;
                try {
                    for (int i = 0; i < totalWaitTime && !this.superstreamConnection.superstreamReady.booleanValue(); i += checkInterval) {
                        Thread.sleep(checkInterval * 1000);
                    }
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
            if (!this.superstreamConnection.superstreamReady.booleanValue()) {
                System.out.println("superstream: cannot connect with superstream and consume message that was modified by superstream");
                return null;
            }
            Descriptors.Descriptor desc = this.superstreamConnection.SchemaIDMap.get(schemaId);
            if (desc == null) {
                this.superstreamConnection.sendGetSchemaRequest(schemaId);
                desc = this.superstreamConnection.SchemaIDMap.get(schemaId);
                if (desc == null) {
                    this.superstreamConnection.handleError("error getting schema with id: " + schemaId);
                    System.out.println("superstream: shcema not found");
                    return null;
                }
            }
            try {
                byte[] supertstreamDeserialized = this.superstreamConnection.protoToJson(data, desc);
                dataToDesrialize = supertstreamDeserialized;
                this.superstreamConnection.updateClientCounters(counters -> {
                    counters.incrementTotalBytesBeforeReduction(supertstreamDeserialized.length);
                    counters.incrementTotalMessagesSuccessfullyConsumed();
                });
            }
            catch (Exception e) {
                this.superstreamConnection.handleError(String.format("error deserializing data: %s", e.getMessage()));
                return null;
            }
        }
        if (this.superstreamConnection != null) {
            this.superstreamConnection.updateClientCounters(counters -> {
                counters.incrementTotalBytesBeforeReduction(data.length);
                counters.incrementTotalMessagesFailedConsume();
            });
        }
        T deserializedData = this.originalDeserializer.deserialize(topic, headers, dataToDesrialize);
        return deserializedData;
    }

    @Override
    public void close() {
        if (this.originalDeserializer != null) {
            this.originalDeserializer.close();
        }
        if (this.superstreamConnection != null) {
            this.superstreamConnection.close();
        }
    }
}

