/*
 * Decompiled with CFR 0.152.
 */
package ai.superstream;

import ai.superstream.Superstream;
import com.github.luben.zstd.Zstd;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.Serializer;

public class SuperstreamSerializer<T>
implements Serializer<T> {
    private Serializer<T> originalSerializer;
    private Superstream superstreamConnection;
    private volatile String compressionType = "none";
    private boolean producerCompressionEnabled = false;

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        try {
            Serializer originalSerializerT;
            Class originalSerializerClass;
            Object originalSerializerObj = configs.get("original.serializer");
            if (originalSerializerObj == null) {
                throw new Exception("Original serializer is required");
            }
            if (originalSerializerObj instanceof String) {
                originalSerializerClass = Class.forName((String)originalSerializerObj);
            } else if (originalSerializerObj instanceof Class) {
                originalSerializerClass = (Class)originalSerializerObj;
            } else {
                throw new Exception("Invalid type for original serializer");
            }
            this.originalSerializer = originalSerializerT = (Serializer)originalSerializerClass.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
            this.originalSerializer.configure(configs, isKey);
            Superstream superstreamConn = (Superstream)configs.get("superstream.connection");
            if (superstreamConn == null) {
                System.out.println("Failed to connect to Superstream");
            } else {
                this.superstreamConnection = superstreamConn;
            }
            String configuredCompressionType = (String)configs.get("compression.type");
            boolean bl = this.producerCompressionEnabled = configuredCompressionType != null && !configuredCompressionType.equals("none");
            if (this.superstreamConnection != null) {
                this.superstreamConnection.setCompressionUpdateCallback(this::onCompressionUpdate);
                this.compressionType = this.superstreamConnection.compressionEnabled != false ? "zstd" : "none";
            }
            this.compressionType = this.producerCompressionEnabled ? configuredCompressionType : "none";
        }
        catch (Exception e) {
            String errMsg = String.format("Superstream: Error initializing serializer: %s", e.getMessage());
            if (this.superstreamConnection != null) {
                this.superstreamConnection.handleError(errMsg);
            }
            System.out.println(errMsg);
        }
    }

    private void onCompressionUpdate(boolean enabled, String type) {
        if (!this.producerCompressionEnabled) {
            this.compressionType = enabled ? type : "none";
        }
    }

    @Override
    public byte[] serialize(String topic, T data) {
        byte[] serializedData = this.originalSerializer.serialize(topic, data);
        return serializedData;
    }

    @Override
    public byte[] serialize(String topic, Headers headers, T data) {
        byte[] serializedData;
        if (this.originalSerializer == null) {
            return null;
        }
        byte[] serializedResult = serializedData = this.originalSerializer.serialize(topic, headers, data);
        if (serializedData == null) {
            return null;
        }
        if (this.superstreamConnection != null && this.superstreamConnection.superstreamReady.booleanValue()) {
            this.superstreamConnection.clientCounters.incrementTotalBytesBeforeReduction(serializedData.length);
            if (this.superstreamConnection.reductionEnabled.booleanValue() && this.superstreamConnection.descriptor != null) {
                try {
                    Superstream.JsonToProtoResult jsonToProtoResult = this.superstreamConnection.jsonToProto(serializedData);
                    if (jsonToProtoResult.isSuccess()) {
                        serializedResult = jsonToProtoResult.getMessageBytes();
                        this.superstreamConnection.clientCounters.incrementTotalMessagesSuccessfullyProduce();
                        RecordHeader header = new RecordHeader("superstream_schema", this.superstreamConnection.ProducerSchemaID.getBytes(StandardCharsets.UTF_8));
                        headers.add(header);
                    }
                }
                catch (Exception e) {
                    this.superstreamConnection.handleError(String.format("error serializing data: %s", e.getMessage()));
                    this.superstreamConnection.clientCounters.incrementTotalMessagesFailedProduce();
                }
            } else if (this.superstreamConnection.reductionEnabled.booleanValue()) {
                if (this.superstreamConnection.learningFactorCounter <= this.superstreamConnection.learningFactor) {
                    this.superstreamConnection.sendLearningMessage(serializedData);
                    ++this.superstreamConnection.learningFactorCounter;
                } else if (!this.superstreamConnection.learningRequestSent) {
                    this.superstreamConnection.sendRegisterSchemaReq();
                }
            }
            if (this.superstreamConnection.compressionEnabled.booleanValue() && !this.producerCompressionEnabled) {
                this.compressionType = "zstd";
                serializedResult = this.compressData(serializedResult);
                headers.add(new RecordHeader("compression", this.compressionType.getBytes(StandardCharsets.UTF_8)));
            }
            this.superstreamConnection.clientCounters.incrementTotalBytesAfterReduction(serializedResult.length);
        }
        return serializedResult;
    }

    private byte[] compressData(byte[] data) {
        if ("none".equals(this.compressionType)) {
            return data;
        }
        try {
            switch (this.compressionType) {
                case "zstd": {
                    return Zstd.compress((byte[])data);
                }
            }
            this.superstreamConnection.handleError("Unsupported compression type: " + this.compressionType);
            return data;
        }
        catch (Exception e) {
            this.superstreamConnection.handleError(String.format("Error compressing data: %s", e.getMessage()));
            return data;
        }
    }

    private byte[] compressZstd(byte[] data) {
        return Zstd.compress((byte[])data);
    }

    @Override
    public void close() {
        if (this.originalSerializer != null) {
            this.originalSerializer.close();
        }
        if (this.superstreamConnection != null) {
            this.superstreamConnection.close();
        }
    }
}

