/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.common.superstream;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.JsonParser;
import com.google.gson.JsonSyntaxException;
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.Message;
import com.google.protobuf.MessageOrBuilder;
import com.google.protobuf.util.JsonFormat;
import io.nats.client.Connection;
import io.nats.client.ConnectionListener;
import io.nats.client.Dispatcher;
import io.nats.client.JetStream;
import io.nats.client.Message;
import io.nats.client.MessageHandler;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.client.Subscription;
import io.nats.client.api.ServerInfo;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.InetAddress;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.superstream.Consts;
import org.apache.kafka.common.superstream.SuperstreamConsumerInterceptor;
import org.apache.kafka.common.superstream.SuperstreamCounters;
import org.apache.kafka.common.superstream.SuperstreamDeserializer;
import org.apache.kafka.common.superstream.SuperstreamProducerInterceptor;
import org.apache.kafka.common.superstream.SuperstreamSerializer;

public class Superstream {
    public Connection brokerConnection;
    public JetStream jetstream;
    public String superstreamJwt;
    public String superstreamNkey;
    public byte[] descriptorAsBytes;
    public Descriptors.Descriptor descriptor;
    public String natsConnectionID;
    public String clientHash;
    public String accountName;
    public int learningFactor = 20;
    public int learningFactorCounter = 0;
    public boolean learningRequestSent = false;
    private static final ObjectMapper objectMapper = new ObjectMapper();
    public String ProducerSchemaID = "0";
    public String ConsumerSchemaID = "0";
    public Map<String, Descriptors.Descriptor> SchemaIDMap = new HashMap<String, Descriptors.Descriptor>();
    public Map<String, Object> configs;
    private Map<String, Object> fullClientConfigs = new HashMap<String, Object>();
    private Map<String, ?> superstreamConfigs;
    public SuperstreamCounters clientCounters = new SuperstreamCounters();
    private Subscription updatesSubscription;
    private String host;
    private String token;
    public String type;
    public Boolean reductionEnabled;
    public Map<String, Set<Integer>> topicPartitions = new ConcurrentHashMap<String, Set<Integer>>();
    public ExecutorService executorService = Executors.newFixedThreadPool(3);
    private Integer kafkaConnectionID = 0;
    public Boolean superstreamReady = false;
    private String tags = "";
    public Boolean canStart = false;
    public Boolean compressionEnabled;
    public String compressionType = "zstd";
    public Boolean compressionTurnedOffBySuperstream = false;
    private String clientIp;
    private String clientHost;
    private static boolean isStdoutSuppressed = false;
    private static boolean isStderrSuppressed = false;
    private static PrintStream superstreamPrintStream;
    private static PrintStream superstreamErrStream;
    private static final PrintStream originalOut;
    private static final PrintStream originalErr;

    public Superstream(String token, String host, Integer learningFactor, Map<String, Object> configs, Boolean enableReduction, String type, String tags, Boolean enableCompression) {
        this.learningFactor = learningFactor;
        this.token = token;
        this.host = host;
        this.configs = this.deepCopyMap(configs);
        this.reductionEnabled = enableReduction;
        this.type = type;
        this.tags = tags;
        this.compressionEnabled = enableCompression;
        superstreamPrintStream = new PrintStream(new ClassOutputStream());
        superstreamErrStream = new PrintStream(new ClassErrorStream());
    }

    public Superstream(String token, String host, Integer learningFactor, Map<String, Object> configs, Boolean enableReduction, String type) {
        this(token, host, learningFactor, configs, enableReduction, type, "", false);
    }

    public void init() {
        this.executorService.submit(() -> {
            try {
                this.initializeNatsConnection(this.token, this.host);
                if (this.brokerConnection != null) {
                    this.registerClient(this.configs);
                    this.waitForStart();
                    if (!this.canStart.booleanValue()) {
                        throw new Exception("Could not start superstream");
                    }
                    superstreamPrintStream.println("Successfully connected to superstream");
                    this.subscribeToUpdates();
                    this.superstreamReady = true;
                    this.reportClientsUpdate();
                    this.sendClientTypeUpdateReq();
                }
            }
            catch (Exception e) {
                this.handleError(e.getMessage());
            }
        });
    }

    private static void checkStdoutEnvVar() {
        if (Boolean.parseBoolean(System.getenv("SUPERSTREAM_DEBUG"))) {
            isStdoutSuppressed = false;
            isStderrSuppressed = false;
        } else {
            isStdoutSuppressed = true;
            isStderrSuppressed = true;
        }
    }

    public void close() {
        try {
            if (this.brokerConnection != null) {
                this.brokerConnection.close();
            }
            this.executorService.shutdown();
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    private void initializeNatsConnection(String token, String host) {
        try {
            Options options = new Options.Builder().server(host).userInfo("superstream_internal", token).maxReconnects(-1).connectionTimeout(Duration.ofSeconds(10L)).reconnectWait(Duration.ofSeconds(1L)).connectionListener(new ConnectionListener(){

                public void connectionEvent(Connection conn, ConnectionListener.Events type) {
                    if (type == ConnectionListener.Events.DISCONNECTED) {
                        Superstream.this.brokerConnection = null;
                        Superstream.this.superstreamReady = false;
                        superstreamPrintStream.println("superstream: disconnected from superstream");
                    } else if (type == ConnectionListener.Events.RECONNECTED) {
                        try {
                            Superstream.this.brokerConnection = conn;
                            if (Superstream.this.brokerConnection != null) {
                                Superstream.this.natsConnectionID = Superstream.this.generateNatsConnectionID();
                                HashMap<String, String> reqData = new HashMap<String, String>();
                                reqData.put("new_nats_connection_id", Superstream.this.natsConnectionID);
                                reqData.put("client_hash", Superstream.this.clientHash);
                                ObjectMapper mapper = new ObjectMapper();
                                byte[] reqBytes = mapper.writeValueAsBytes(reqData);
                                Superstream.this.brokerConnection.publish("internal_tasks.clientReconnectionUpdate", reqBytes);
                                Superstream.this.subscribeToUpdates();
                                Superstream.this.superstreamReady = true;
                                Superstream.this.reportClientsUpdate();
                            }
                        }
                        catch (Exception e) {
                            superstreamPrintStream.println("superstream: failed to reconnect: " + e.getMessage());
                        }
                        superstreamPrintStream.println("superstream: reconnected to superstream");
                    }
                }
            }).build();
            Connection nc = Nats.connect((Options)options);
            if (nc == null) {
                throw new Exception(String.format("Failed to connect to host: %s", host));
            }
            JetStream js = nc.jetStream();
            if (js == null) {
                throw new Exception(String.format("Failed to connect to host: %s", host));
            }
            this.brokerConnection = nc;
            this.jetstream = js;
            this.natsConnectionID = this.generateNatsConnectionID();
        }
        catch (Exception e) {
            superstreamPrintStream.println(String.format("superstream: %s", e.getMessage()));
        }
    }

    private String generateNatsConnectionID() {
        ServerInfo serverInfo = this.brokerConnection.getServerInfo();
        String connectedServerName = serverInfo.getServerName();
        int serverClientID = serverInfo.getClientId();
        return connectedServerName + ":" + serverClientID;
    }

    public void registerClient(Map<String, ?> configs) {
        try {
            String kafkaConnID = this.consumeConnectionID();
            if (kafkaConnID != null) {
                try {
                    this.kafkaConnectionID = Integer.parseInt(kafkaConnID);
                }
                catch (Exception e) {
                    this.kafkaConnectionID = 0;
                }
            }
            InetAddress localHost = InetAddress.getLocalHost();
            this.clientIp = localHost.getHostAddress();
            this.clientHost = localHost.getHostName();
            Map<String, Object> configToSend = this.populateConfigToSend(configs);
            HashMap<String, Object> reqData = new HashMap<String, Object>();
            reqData.put("nats_connection_id", this.natsConnectionID);
            reqData.put("language", "java");
            reqData.put("learning_factor", this.learningFactor);
            reqData.put("version", "3.5.116");
            reqData.put("config", configToSend);
            reqData.put("reduction_enabled", this.reductionEnabled);
            reqData.put("connection_id", this.kafkaConnectionID);
            reqData.put("tags", this.tags);
            reqData.put("client_ip", this.clientIp);
            reqData.put("client_host", this.clientHost);
            ObjectMapper mapper = new ObjectMapper();
            byte[] reqBytes = mapper.writeValueAsBytes(reqData);
            reqData.put("type", this.type);
            Message reply = this.brokerConnection.request("internal.registerClient", reqBytes, Duration.ofMinutes(5L));
            if (reply != null) {
                Map replyData = (Map)mapper.readValue(reply.getData(), Map.class);
                Object clientHashObject = replyData.get("client_hash");
                if (clientHashObject != null) {
                    this.clientHash = clientHashObject.toString();
                } else {
                    superstreamPrintStream.println("superstream: client_hash is not a valid string: " + clientHashObject);
                }
                Object accountNameObject = replyData.get("account_name");
                if (accountNameObject != null) {
                    this.accountName = accountNameObject.toString();
                } else {
                    superstreamPrintStream.println("superstream: account_name is not a valid string: " + accountNameObject);
                }
                Object learningFactorObject = replyData.get("learning_factor");
                if (learningFactorObject instanceof Integer) {
                    this.learningFactor = (Integer)learningFactorObject;
                } else if (learningFactorObject instanceof String) {
                    try {
                        this.learningFactor = Integer.parseInt((String)learningFactorObject);
                    }
                    catch (NumberFormatException e) {
                        superstreamPrintStream.println("superstream: learning_factor is not a valid integer: " + learningFactorObject);
                    }
                } else {
                    superstreamPrintStream.println("superstream: learning_factor is not a valid integer: " + learningFactorObject);
                }
            } else {
                String errMsg = "superstream: registering client: No reply received within the timeout period.";
                superstreamPrintStream.println(errMsg);
                this.handleError(errMsg);
            }
        }
        catch (Exception e) {
            superstreamPrintStream.println(String.format("superstream: %s", e.getMessage()));
        }
    }

    public Map<String, Object> deepCopyMap(Map<String, ?> originalMap) {
        HashMap<String, Object> copiedMap = new HashMap<String, Object>();
        for (Map.Entry<String, ?> entry : originalMap.entrySet()) {
            String key = entry.getKey();
            Object value = entry.getValue();
            Object copiedValue = this.deepCopyObject(value);
            copiedMap.put(key, copiedValue);
        }
        return copiedMap;
    }

    private <T> T deepCopyObject(Object object) {
        try {
            ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
            ObjectOutputStream out = new ObjectOutputStream(byteOut);
            out.writeObject(object);
            ByteArrayInputStream byteIn = new ByteArrayInputStream(byteOut.toByteArray());
            ObjectInputStream in = new ObjectInputStream(byteIn);
            return (T)in.readObject();
        }
        catch (IOException | ClassNotFoundException e) {
            throw new RuntimeException("Error during deep copy", e);
        }
    }

    private Map<String, Object> populateConfigToSend(Map<String, ?> configs) {
        HashMap<String, Object> configToSend = new HashMap<String, Object>();
        if (configs != null && !configs.isEmpty()) {
            for (Map.Entry<String, ?> entry : configs.entrySet()) {
                if ("superstream.connection".equalsIgnoreCase(entry.getKey())) continue;
                configToSend.put(entry.getKey(), entry.getValue());
            }
        }
        return configToSend;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void waitForStart() {
        Dispatcher dispatcher;
        block4: {
            CountDownLatch latch = new CountDownLatch(1);
            dispatcher = this.brokerConnection.createDispatcher(msg -> {
                try {
                    ObjectMapper mapper = new ObjectMapper();
                    Map messageData = (Map)mapper.readValue(msg.getData(), Map.class);
                    if (messageData.containsKey("start")) {
                        boolean start = (Boolean)messageData.get("start");
                        if (start) {
                            this.canStart = true;
                            if (messageData.containsKey("optimized_configuration")) {
                                this.superstreamConfigs = (Map)messageData.get("optimized_configuration");
                            }
                            latch.countDown();
                        } else {
                            String err = (String)messageData.get("error");
                            superstreamPrintStream.println("superstream: could not start: " + err);
                            Thread.currentThread().interrupt();
                        }
                    }
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            });
            dispatcher.subscribe(String.format("internal.startClient.%s", this.clientHash));
            try {
                if (latch.await(10L, TimeUnit.MINUTES)) break block4;
                superstreamPrintStream.println("superstream: unable not connect with superstream for 10 minutes");
            }
            catch (InterruptedException e) {
                try {
                    Thread.currentThread().interrupt();
                    superstreamPrintStream.println("superstream: Could not start superstream: " + e.getMessage());
                }
                catch (Throwable throwable) {
                    dispatcher.unsubscribe(String.format("internal.startClient.%s", this.clientHash));
                    throw throwable;
                }
                dispatcher.unsubscribe(String.format("internal.startClient.%s", this.clientHash));
            }
        }
        dispatcher.unsubscribe(String.format("internal.startClient.%s", this.clientHash));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private String consumeConnectionID() {
        Properties consumerProps = this.copyAuthConfig();
        consumerProps.put("key.deserializer", StringDeserializer.class.getName());
        consumerProps.put("value.deserializer", StringDeserializer.class.getName());
        consumerProps.put("auto.offset.reset", "latest");
        consumerProps.put("superstream.inner.consumer", "true");
        consumerProps.put("max.poll.records", (Object)1);
        String connectionId = null;
        try (KafkaConsumer consumer = null;){
            consumer = new KafkaConsumer(consumerProps);
            List<PartitionInfo> partitions = consumer.partitionsFor("superstream.metadata", Duration.ofMillis(10000L));
            if (partitions == null || partitions.isEmpty()) {
                if (consumer != null) {
                    consumer.close();
                }
                String string = "0";
                return string;
            }
            TopicPartition topicPartition = new TopicPartition("superstream.metadata", 0);
            consumer.assign(Collections.singletonList(topicPartition));
            consumer.seekToEnd(Collections.singletonList(topicPartition));
            long endOffset = consumer.position(topicPartition);
            if (endOffset > 0L) {
                consumer.seek(topicPartition, endOffset - 1L);
                ConsumerRecords records = consumer.poll(Duration.ofMillis(10000L));
                if (!records.isEmpty()) {
                    connectionId = (String)records.iterator().next().value();
                }
            }
        }
        return connectionId != null ? connectionId : "0";
    }

    private Properties copyAuthConfig() {
        String[] relevantKeys = new String[]{"security.protocol", "ssl.truststore.location", "ssl.truststore.password", "ssl.keystore.location", "ssl.keystore.password", "ssl.key.password", "ssl.endpoint.identification.algorithm", "sasl.mechanism", "sasl.jaas.config", "sasl.kerberos.service.name", "bootstrap.servers", "client.dns.lookup", "connections.max.idle.ms", "request.timeout.ms", "metadata.max.age.ms", "reconnect.backoff.ms", "reconnect.backoff.max.ms"};
        Properties relevantProps = new Properties();
        for (String key : relevantKeys) {
            if (!this.configs.containsKey(key)) continue;
            if (key == "bootstrap.servers") {
                Object value = this.configs.get(key);
                if (value instanceof String[]) {
                    relevantProps.put(key, Arrays.toString((String[])value));
                    continue;
                }
                if (value instanceof ArrayList) {
                    ArrayList arrayList = (ArrayList)value;
                    relevantProps.put(key, String.join((CharSequence)", ", arrayList));
                    continue;
                }
                relevantProps.put(key, value);
                continue;
            }
            relevantProps.put(key, String.valueOf(this.configs.get(key)));
        }
        return relevantProps;
    }

    public void sendClientTypeUpdateReq() {
        if (this.type == "" || this.type == null) {
            return;
        }
        try {
            HashMap<String, String> reqData = new HashMap<String, String>();
            reqData.put("client_hash", this.clientHash);
            reqData.put("type", this.type);
            ObjectMapper mapper = new ObjectMapper();
            byte[] reqBytes = mapper.writeValueAsBytes(reqData);
            this.brokerConnection.publish("internal.clientTypeUpdate", reqBytes);
        }
        catch (Exception e) {
            this.handleError(String.format("sendClientTypeUpdateReq: %s", e.getMessage()));
        }
    }

    private void executeSendClientConfigUpdateReqWithWait() {
        new Thread(() -> {
            try {
                this.waitForCanStart();
                this.sendClientConfigUpdateReq();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.err.println("Thread was interrupted: " + e.getMessage());
            }
            catch (RuntimeException e) {
                System.err.println("Error: " + e.getMessage());
            }
        }).start();
    }

    private void waitForCanStart() throws InterruptedException {
        long remainingTime = 600000L;
        while (remainingTime > 0L && !this.canStart.booleanValue()) {
            Thread.sleep(3000L);
            if ((remainingTime -= 3000L) > 0L) continue;
            superstreamPrintStream.println("superstream could not start within the expected timeout period");
        }
    }

    public void waitForSuperstreamConfigs(AbstractConfig config) throws InterruptedException {
        long remainingTime;
        String timeoutEnv = System.getenv("SUPERSTREAM_RESPONSE_TIMEOUT");
        long l = remainingTime = timeoutEnv != null ? Long.parseLong(timeoutEnv) : 3000L;
        while (remainingTime > 0L) {
            if (this.superstreamConfigs != null) {
                config.getValues().putAll(this.getSuperstreamConfigs());
                break;
            }
            if ((remainingTime -= 30L) > 0L) {
                Thread.sleep(30L);
                continue;
            }
            superstreamPrintStream.println("superstream client configuration was not set within the expected timeout period");
        }
    }

    private void sendClientConfigUpdateReq() {
        if (this.fullClientConfigs != null && !this.fullClientConfigs.isEmpty()) {
            try {
                ObjectMapper mapper = new ObjectMapper();
                HashMap<String, Object> reqData = new HashMap<String, Object>();
                reqData.put("client_hash", this.clientHash);
                this.convertEntryValueWhenNoSerializer(this.fullClientConfigs, mapper);
                reqData.put("config", this.fullClientConfigs);
                byte[] reqBytes = mapper.writeValueAsBytes(reqData);
                this.brokerConnection.publish("internal.clientConfigUpdate", reqBytes);
            }
            catch (JsonProcessingException e) {
                throw new RuntimeException(e);
            }
            catch (Exception e) {
                this.handleError(String.format("sendClientConfigUpdateReq: %s", e.getMessage()));
            }
        }
    }

    private void convertEntryValueWhenNoSerializer(Map<String, Object> config, ObjectMapper mapper) {
        if (config != null && !config.isEmpty()) {
            for (Map.Entry<String, Object> entry : config.entrySet()) {
                Object value = entry.getValue();
                String key = entry.getKey();
                if (key == "sasl.jaas.config") {
                    entry.setValue("[hidden]");
                    continue;
                }
                try {
                    mapper.writeValueAsBytes(value);
                }
                catch (JsonProcessingException e) {
                    entry.setValue(value.toString());
                }
            }
        }
    }

    public void subscribeToUpdates() {
        try {
            String subject = String.format("internal.updates.%s", this.clientHash);
            Dispatcher dispatcher = this.brokerConnection.createDispatcher(this.updatesHandler());
            this.updatesSubscription = dispatcher.subscribe(subject, this.updatesHandler());
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void reportClientsUpdate() {
        ScheduledExecutorService singleExecutorService = Executors.newSingleThreadScheduledExecutor();
        singleExecutorService.scheduleAtFixedRate(() -> {
            if (this.brokerConnection != null && this.superstreamReady.booleanValue()) {
                long backupReadBytes = this.clientCounters.getTotalReadBytesReduced();
                long backupWriteBytes = this.clientCounters.getTotalWriteBytesReduced();
                Double producerCompressionRate = this.clientCounters.getProducerCompressionRate();
                long calculatedWriteBytes = Math.round((double)backupWriteBytes * producerCompressionRate);
                Double consumerCompressionRate = this.clientCounters.getConsumerCompressionRate();
                long calculatedReadBytes = Math.round((double)backupReadBytes * consumerCompressionRate);
                this.clientCounters.reset();
                try {
                    HashMap<String, Number> countersMap = new HashMap<String, Number>();
                    countersMap.put("total_read_bytes_reduced", calculatedReadBytes);
                    countersMap.put("total_write_bytes_reduced", calculatedWriteBytes);
                    countersMap.put("connection_id", this.kafkaConnectionID);
                    byte[] byteCounters = objectMapper.writeValueAsBytes(countersMap);
                    this.brokerConnection.publish(String.format("internal_tasks.clientsUpdate.%s.%s", "counters", this.clientHash), byteCounters);
                }
                catch (Exception e) {
                    this.clientCounters.incrementTotalReadBytesReduced(backupReadBytes);
                    this.clientCounters.incrementTotalWriteBytesReduced(backupWriteBytes);
                    this.handleError("reportClientsUpdate config: " + e.getMessage());
                }
                try {
                    HashMap<String, Object> topicPartitionConfig = new HashMap<String, Object>();
                    if (!this.topicPartitions.isEmpty()) {
                        Map<String, Integer[]> topicPartitionsToSend = Superstream.convertMap(this.topicPartitions);
                        switch (this.type) {
                            case "producer": {
                                topicPartitionConfig.put("producer_topics_partitions", topicPartitionsToSend);
                                topicPartitionConfig.put("consumer_group_topics_partitions", new HashMap());
                                break;
                            }
                            case "consumer": {
                                topicPartitionConfig.put("producer_topics_partitions", new HashMap());
                                topicPartitionConfig.put("consumer_group_topics_partitions", topicPartitionsToSend);
                            }
                        }
                    }
                    topicPartitionConfig.put("connection_id", this.kafkaConnectionID);
                    byte[] byteConfig = objectMapper.writeValueAsBytes(topicPartitionConfig);
                    this.brokerConnection.publish(String.format("internal_tasks.clientsUpdate.%s.%s", "config", this.clientHash), byteConfig);
                }
                catch (Exception e) {
                    this.handleError("reportClientsUpdate config: " + e.getMessage());
                }
            }
        }, 0L, 10L, TimeUnit.MINUTES);
    }

    public static Map<String, Integer[]> convertMap(Map<String, Set<Integer>> topicPartitions) {
        HashMap<String, Integer[]> result = new HashMap<String, Integer[]>();
        for (Map.Entry<String, Set<Integer>> entry : topicPartitions.entrySet()) {
            Integer[] array = entry.getValue().toArray(new Integer[0]);
            result.put(entry.getKey(), array);
        }
        return result;
    }

    public void sendLearningMessage(byte[] msg) {
        try {
            this.brokerConnection.publish(String.format("internal.schema.learnSchema.%s", this.clientHash), msg);
        }
        catch (Exception e) {
            this.handleError("sendLearningMessage: " + e.getMessage());
        }
    }

    public void sendRegisterSchemaReq() {
        try {
            this.brokerConnection.publish(String.format("internal_tasks.schema.registerSchema.%s", this.clientHash), new byte[0]);
            this.learningRequestSent = true;
        }
        catch (Exception e) {
            this.handleError("sendLearningMessage: " + e.getMessage());
        }
    }

    public JsonToProtoResult jsonToProto(byte[] msgBytes) throws Exception {
        try {
            String jsonString = new String(msgBytes);
            if (!this.isJsonObject(jsonString)) {
                jsonString = Superstream.convertEscapedJsonString(jsonString);
            }
            if (jsonString == null || jsonString.isEmpty()) {
                return new JsonToProtoResult(false, msgBytes);
            }
            if (jsonString != null && jsonString.length() > 2 && jsonString.startsWith("\"{") && jsonString.endsWith("}\"")) {
                jsonString = jsonString.substring(1, jsonString.length() - 1);
            }
            DynamicMessage.Builder newMessageBuilder = DynamicMessage.newBuilder((Descriptors.Descriptor)this.descriptor);
            JsonFormat.parser().merge(jsonString, (Message.Builder)newMessageBuilder);
            DynamicMessage message = newMessageBuilder.build();
            return new JsonToProtoResult(true, message.toByteArray());
        }
        catch (Exception e) {
            return new JsonToProtoResult(false, msgBytes);
        }
    }

    private boolean isJsonObject(String jsonString) {
        try {
            JsonParser.parseString((String)jsonString).getAsJsonObject();
            return true;
        }
        catch (JsonSyntaxException | IllegalStateException e) {
            return false;
        }
    }

    private static String convertEscapedJsonString(String escapedJsonString) throws Exception {
        ObjectMapper mapper = new ObjectMapper();
        JsonNode jsonNode = mapper.readTree(escapedJsonString);
        return mapper.writeValueAsString((Object)jsonNode).replace("\\\"", "\"").replace("\\\\", "\\");
    }

    public byte[] protoToJson(byte[] msgBytes, Descriptors.Descriptor desc) throws Exception {
        try {
            DynamicMessage message = DynamicMessage.parseFrom((Descriptors.Descriptor)desc, (byte[])msgBytes);
            String jsonString = JsonFormat.printer().omittingInsignificantWhitespace().print((MessageOrBuilder)message);
            return jsonString.getBytes(StandardCharsets.UTF_8);
        }
        catch (Exception e) {
            if (e.getMessage().contains("the input ended unexpectedly")) {
                return msgBytes;
            }
            throw e;
        }
    }

    private MessageHandler updatesHandler() {
        return msg -> {
            try {
                Map update = (Map)objectMapper.readValue(msg.getData(), Map.class);
                this.processUpdate(update);
            }
            catch (IOException e) {
                this.handleError("updatesHandler at json.Unmarshal: " + e.getMessage());
            }
        };
    }

    private void processUpdate(Map<String, Object> update) {
        String type = (String)update.get("type");
        try {
            String payloadBytesString = (String)update.get("payload");
            byte[] payloadBytes = Base64.getDecoder().decode(payloadBytesString);
            Map payload = (Map)objectMapper.readValue(payloadBytes, Map.class);
            Map<String, String> envVars = System.getenv();
            switch (type) {
                case "LearnedSchema": {
                    String schemaID;
                    String descriptorBytesString = (String)payload.get("desc");
                    String masterMsgName = (String)payload.get("master_msg_name");
                    String fileName = (String)payload.get("file_name");
                    this.descriptor = this.compileMsgDescriptor(descriptorBytesString, masterMsgName, fileName);
                    this.ProducerSchemaID = schemaID = (String)payload.get("schema_id");
                    break;
                }
                case "ToggleReduction": {
                    Boolean reductionEnabled;
                    String reductionEnabledString = envVars.get("SUPERSTREAM_REDUCTION_ENABLED");
                    if (reductionEnabledString != null && !(reductionEnabled = Boolean.valueOf(Boolean.parseBoolean(reductionEnabledString))).booleanValue()) {
                        this.reductionEnabled = false;
                        break;
                    }
                    Boolean enableReduction = (Boolean)payload.get("enable_reduction");
                    if (enableReduction.booleanValue()) {
                        this.reductionEnabled = true;
                        break;
                    }
                    this.reductionEnabled = false;
                    break;
                }
                case "CompressionUpdate": {
                    Boolean compressionEnabled;
                    String compressionEnabledString = envVars.get("SUPERSTREAM_COMPRESSION_ENABLED");
                    if (compressionEnabledString != null && !(compressionEnabled = Boolean.valueOf(Boolean.parseBoolean(compressionEnabledString))).booleanValue()) {
                        this.compressionEnabled = false;
                        break;
                    }
                    Boolean enableCompression = (Boolean)payload.get("enable_compression");
                    this.compressionTurnedOffBySuperstream = enableCompression != false ? Boolean.valueOf(false) : Boolean.valueOf(true);
                    this.compressionEnabled = enableCompression;
                    String compType = (String)payload.get("compression_type");
                    if (compType == null) break;
                    this.compressionType = compType;
                }
            }
        }
        catch (Exception e) {
            this.handleError("processUpdate: " + e.getMessage());
        }
    }

    public void sendGetSchemaRequest(String schemaID) {
        try {
            Descriptors.Descriptor respDescriptor;
            HashMap<String, String> reqData = new HashMap<String, String>();
            reqData.put("schema_id", schemaID);
            ObjectMapper mapper = new ObjectMapper();
            byte[] reqBytes = mapper.writeValueAsBytes(reqData);
            Message msg = this.brokerConnection.request(String.format("internal.schema.getSchema.%s", this.clientHash), reqBytes, Duration.ofSeconds(5L));
            if (msg == null) {
                throw new Exception("Could not get descriptor");
            }
            Map respMap = (Map)objectMapper.readValue(new String(msg.getData(), StandardCharsets.UTF_8), Map.class);
            if (respMap.containsKey("desc") && respMap.get("desc") instanceof String) {
                String fileName;
                String masterMsgName;
                String descriptorBytesString = (String)respMap.get("desc");
                respDescriptor = this.compileMsgDescriptor(descriptorBytesString, masterMsgName = (String)respMap.get("master_msg_name"), fileName = (String)respMap.get("file_name"));
                if (respDescriptor == null) {
                    throw new Exception("Error compiling schema.");
                }
            } else {
                throw new Exception("Response map does not contain expected keys.");
            }
            this.SchemaIDMap.put((String)respMap.get("schema_id"), respDescriptor);
        }
        catch (Exception e) {
            this.handleError(String.format("sendGetSchemaRequest: %s", e.getMessage()));
        }
    }

    private Descriptors.Descriptor compileMsgDescriptor(String descriptorBytesString, String masterMsgName, String fileName) {
        try {
            byte[] descriptorAsBytes = Base64.getDecoder().decode(descriptorBytesString);
            if (descriptorAsBytes == null) {
                throw new Exception("error decoding descriptor bytes");
            }
            DescriptorProtos.FileDescriptorSet descriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom((byte[])descriptorAsBytes);
            Descriptors.FileDescriptor fileDescriptor = null;
            for (DescriptorProtos.FileDescriptorProto fdp : descriptorSet.getFileList()) {
                if (!fdp.getName().equals(fileName)) continue;
                fileDescriptor = Descriptors.FileDescriptor.buildFrom((DescriptorProtos.FileDescriptorProto)fdp, (Descriptors.FileDescriptor[])new Descriptors.FileDescriptor[0]);
                break;
            }
            if (fileDescriptor == null) {
                throw new Exception("file not found");
            }
            for (Descriptors.Descriptor md : fileDescriptor.getMessageTypes()) {
                if (!md.getName().equals(masterMsgName)) continue;
                return md;
            }
        }
        catch (Exception e) {
            this.handleError(String.format("compileMsgDescriptor: %s", e.getMessage()));
        }
        return null;
    }

    public void handleError(String msg) {
        if (this.brokerConnection != null && this.superstreamReady.booleanValue()) {
            Map<String, String> envVars = System.getenv();
            String tags = envVars.get("SUPERSTREAM_TAGS");
            if (tags == null) {
                tags = "";
            }
            if (this.clientHash == "") {
                String message = String.format("[sdk: java][version: %s][tags: %s] %s", "3.5.116", tags, msg);
                this.brokerConnection.publish("internal.clientErrors", message.getBytes(StandardCharsets.UTF_8));
            } else {
                String message = String.format("[clientHash: %s][sdk: java][version: %s][tags: %s] %s", this.clientHash, "3.5.116", tags, msg);
                this.brokerConnection.publish("internal.clientErrors", message.getBytes(StandardCharsets.UTF_8));
            }
        }
    }

    public static Map<String, Object> initSuperstreamConfig(Map<String, Object> configs, String type) {
        String isInnerConsumer = (String)configs.get("superstream.inner.consumer");
        if (Boolean.parseBoolean(isInnerConsumer)) {
            return configs;
        }
        String interceptorToAdd = Superstream.getSuperstreamClientInterceptorName(type);
        try {
            Map<String, String> envVars;
            String superstreamHost;
            ArrayList<String> interceptors = null;
            Object existingInterceptors = configs.get("interceptor.classes");
            if (!interceptorToAdd.isEmpty()) {
                if (existingInterceptors != null) {
                    if (existingInterceptors instanceof List) {
                        interceptors = new ArrayList((List)existingInterceptors);
                    } else if (existingInterceptors instanceof String) {
                        interceptors = new ArrayList();
                        interceptors.add((String)existingInterceptors);
                    } else {
                        interceptors = new ArrayList();
                    }
                } else {
                    interceptors = new ArrayList<String>();
                }
            }
            if (!interceptorToAdd.isEmpty()) {
                interceptors.add(interceptorToAdd);
                configs.put("interceptor.classes", interceptors);
            }
            if ((superstreamHost = (envVars = System.getenv()).get("SUPERSTREAM_HOST")) == null) {
                throw new Exception("host is required");
            }
            configs.put("superstream.host", superstreamHost);
            String token = envVars.get("SUPERSTREAM_TOKEN");
            if (token == null) {
                token = "no-auth";
            }
            configs.put("superstream.token", token);
            String learningFactorString = envVars.get("SUPERSTREAM_LEARNING_FACTOR");
            Integer learningFactor = Consts.superstreamDefaultLearningFactor;
            if (learningFactorString != null) {
                learningFactor = Integer.parseInt(learningFactorString);
            }
            configs.put("superstream.learning.factor", learningFactor);
            boolean reductionEnabled = false;
            String reductionEnabledString = envVars.get("SUPERSTREAM_REDUCTION_ENABLED");
            if (reductionEnabledString != null) {
                reductionEnabled = Boolean.parseBoolean(reductionEnabledString);
            }
            configs.put("superstream.reduction.enabled", reductionEnabled);
            String tags = envVars.get("SUPERSTREAM_TAGS");
            if (tags == null) {
                tags = "";
            }
            boolean compressionEnabled = false;
            String compressionEnabledString = envVars.get("SUPERSTREAM_COMPRESSION_ENABLED");
            if (compressionEnabledString != null) {
                compressionEnabled = Boolean.parseBoolean(compressionEnabledString);
            }
            Superstream.checkStdoutEnvVar();
            Superstream superstreamConnection = new Superstream(token, superstreamHost, learningFactor, configs, reductionEnabled, type, tags, compressionEnabled);
            superstreamConnection.init();
            configs.put("superstream.connection", superstreamConnection);
        }
        catch (Exception e) {
            String errMsg = String.format("superstream: error initializing superstream: %s", e.getMessage());
            System.out.println(errMsg);
            Superstream.handleConfigsWhenErrorInitializeSuperstream(type, configs);
        }
        return configs;
    }

    private static void handleConfigsWhenErrorInitializeSuperstream(String type, Map<String, Object> configs) {
        switch (type) {
            case "producer": {
                if (!configs.containsKey("original.serializer")) break;
                configs.put("value.serializer", configs.get("original.serializer"));
                configs.remove("original.serializer");
                break;
            }
            case "consumer": {
                if (!configs.containsKey("original.deserializer")) break;
                configs.put("value.deserializer", configs.get("original.deserializer"));
                configs.remove("original.deserializer");
            }
        }
    }

    private static String getSuperstreamClientInterceptorName(String type) {
        switch (type) {
            case "producer": {
                Superstream.handleSerializerLogicForPayloadReduction();
                return SuperstreamProducerInterceptor.class.getName();
            }
            case "consumer": {
                Superstream.handleDeserializerLogicForPayloadReduction();
                return SuperstreamConsumerInterceptor.class.getName();
            }
        }
        return "";
    }

    private static void handleDeserializerLogicForPayloadReduction() {
    }

    private static void handleSerializerLogicForPayloadReduction() {
    }

    public static Properties initSuperstreamProps(Properties properties, String type) {
        String interceptors = (String)properties.get("interceptor.classes");
        switch (type) {
            case "producer": {
                interceptors = interceptors != null && !interceptors.isEmpty() ? interceptors + "," + SuperstreamProducerInterceptor.class.getName() : SuperstreamProducerInterceptor.class.getName();
                if (!properties.containsKey("value.serializer") || properties.containsKey("original.serializer")) break;
                properties.put("original.serializer", properties.get("value.serializer"));
                properties.put("value.serializer", SuperstreamSerializer.class.getName());
                break;
            }
            case "consumer": {
                interceptors = interceptors != null && !interceptors.isEmpty() ? interceptors + "," + SuperstreamConsumerInterceptor.class.getName() : SuperstreamConsumerInterceptor.class.getName();
                if (!properties.containsKey("value.deserializer") || properties.containsKey("original.deserializer")) break;
                properties.put("original.deserializer", properties.get("value.deserializer"));
                properties.put("value.deserializer", SuperstreamDeserializer.class.getName());
            }
        }
        if (interceptors != null) {
            properties.put("interceptor.classes", interceptors);
        }
        try {
            Map<String, String> envVars = System.getenv();
            String superstreamHost = envVars.get("SUPERSTREAM_HOST");
            if (superstreamHost == null) {
                throw new Exception("host is required");
            }
            properties.put("superstream.host", superstreamHost);
            String token = envVars.get("SUPERSTREAM_TOKEN");
            if (token == null) {
                token = "no-auth";
            }
            properties.put("superstream.token", token);
            String learningFactorString = envVars.get("SUPERSTREAM_LEARNING_FACTOR");
            Integer learningFactor = Consts.superstreamDefaultLearningFactor;
            if (learningFactorString != null) {
                learningFactor = Integer.parseInt(learningFactorString);
            }
            properties.put("superstream.learning.factor", learningFactor);
            Boolean reductionEnabled = false;
            String reductionEnabledString = envVars.get("SUPERSTREAM_REDUCTION_ENABLED");
            if (reductionEnabledString != null) {
                reductionEnabled = Boolean.parseBoolean(reductionEnabledString);
            }
            properties.put("superstream.reduction.enabled", reductionEnabled);
            String tags = envVars.get("SUPERSTREAM_TAGS");
            if (tags != null) {
                properties.put("superstream.tags", tags);
            }
            Map<String, Object> configs = Superstream.propertiesToMap(properties);
            Superstream superstreamConnection = new Superstream(token, superstreamHost, learningFactor, configs, reductionEnabled, type);
            superstreamConnection.init();
            properties.put("superstream.connection", superstreamConnection);
        }
        catch (Exception e) {
            String errMsg = String.format("superstream: error initializing superstream: %s", e.getMessage());
            superstreamPrintStream.println(errMsg);
        }
        return properties;
    }

    public static Map<String, Object> propertiesToMap(Properties properties) {
        return properties.entrySet().stream().collect(Collectors.toMap(e -> String.valueOf(e.getKey()), e -> e.getValue()));
    }

    public void updateTopicPartitions(String topic, Integer partition) {
        Set partitions = this.topicPartitions.computeIfAbsent(topic, k -> new HashSet());
        partitions.add(partition);
    }

    public void setFullClientConfigs(Map<String, ?> configsUpdate) {
        for (Map.Entry<String, ?> entry : configsUpdate.entrySet()) {
            String key = entry.getKey();
            Object value = entry.getValue();
            Object copiedValue = this.configs.containsKey(key) ? this.deepCopyObject(this.configs.get(key)) : this.deepCopyObject(value);
            this.fullClientConfigs.put(key, copiedValue);
        }
        this.executeSendClientConfigUpdateReqWithWait();
    }

    public PrintStream getSuperstreamPrintStream() {
        return superstreamPrintStream;
    }

    public Map<String, ?> getSuperstreamConfigs() {
        return this.superstreamConfigs;
    }

    static {
        originalOut = System.out;
        originalErr = System.err;
    }

    private static class ClassOutputStream
    extends OutputStream {
        private ClassOutputStream() {
        }

        @Override
        public void write(int b) {
            if (!isStdoutSuppressed) {
                originalOut.write(b);
            }
        }

        @Override
        public void write(byte[] b, int off, int len) {
            if (!isStdoutSuppressed) {
                originalOut.write(b, off, len);
            }
        }
    }

    private static class ClassErrorStream
    extends OutputStream {
        private ClassErrorStream() {
        }

        @Override
        public void write(int b) {
            if (!isStderrSuppressed) {
                originalErr.write(b);
            }
        }

        @Override
        public void write(byte[] b, int off, int len) {
            if (!isStderrSuppressed) {
                originalErr.write(b, off, len);
            }
        }
    }

    public class JsonToProtoResult {
        private final boolean success;
        private final byte[] messageBytes;

        public JsonToProtoResult(boolean success, byte[] messageBytes) {
            this.success = success;
            this.messageBytes = messageBytes;
        }

        public boolean isSuccess() {
            return this.success;
        }

        public byte[] getMessageBytes() {
            return this.messageBytes;
        }
    }
}

