/*
 * Decompiled with CFR 0.152.
 */
package io.deephaven.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.kafka.schemaregistry.SchemaProvider;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.deephaven.UncheckedDeephavenException;
import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.kafka.KafkaTools;
import io.deephaven.kafka.ingest.JsonNodeChunkAdapter;
import io.deephaven.kafka.ingest.JsonNodeUtil;
import io.deephaven.kafka.ingest.KeyOrValueProcessor;
import io.deephaven.kafka.publish.JsonKeyOrValueSerializer;
import io.deephaven.kafka.publish.KeyOrValueSerializer;
import io.deephaven.stream.StreamChunkUtils;
import io.deephaven.util.mutable.MutableInt;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

class JsonImpl {
    JsonImpl() {
    }

    private static Function<Object, Object> jsonToObjectChunkMapper(@Nullable ObjectMapper mapper) {
        return in -> {
            String json;
            try {
                json = (String)in;
            }
            catch (ClassCastException ex) {
                throw new UncheckedDeephavenException("Could not convert input to json string", (Throwable)ex);
            }
            return JsonNodeUtil.makeJsonNode(mapper, json);
        };
    }

    static final class JsonProduce
    extends KafkaTools.Produce.KeyOrValueSpec {
        private final String[] includeColumns;
        private final Predicate<String> excludeColumns;
        private final Map<String, String> columnNameToFieldName;
        private final String nestedObjectDelimiter;
        private final boolean outputNulls;
        private final String timestampFieldName;

        JsonProduce(String[] includeColumns, Predicate<String> excludeColumns, Map<String, String> columnNameToFieldName, String nestedObjectDelimiter, boolean outputNulls, String timestampFieldName) {
            this.includeColumns = includeColumns;
            this.excludeColumns = excludeColumns;
            this.columnNameToFieldName = columnNameToFieldName;
            this.nestedObjectDelimiter = nestedObjectDelimiter;
            this.outputNulls = outputNulls;
            this.timestampFieldName = timestampFieldName;
        }

        @Override
        public Optional<SchemaProvider> getSchemaProvider() {
            return Optional.empty();
        }

        @Override
        Serializer<?> getSerializer(SchemaRegistryClient schemaRegistryClient, TableDefinition definition) {
            return new StringSerializer();
        }

        @Override
        String[] getColumnNames(@NotNull Table t, SchemaRegistryClient schemaRegistryClient) {
            if (this.excludeColumns != null && this.includeColumns != null) {
                throw new IllegalArgumentException("Can't have both excludeColumns and includeColumns not null");
            }
            String[] tableColumnNames = t.getDefinition().getColumnNamesArray();
            if (this.excludeColumns == null && this.includeColumns == null) {
                return tableColumnNames;
            }
            HashSet<String> tableColumnsSet = new HashSet<String>(Arrays.asList(tableColumnNames));
            if (this.includeColumns != null) {
                List missing = Arrays.stream(this.includeColumns).filter(cn -> !tableColumnsSet.contains(cn)).collect(Collectors.toList());
                if (!missing.isEmpty()) {
                    throw new IllegalArgumentException("includeColumns contains names not found in table columns: " + missing);
                }
                return this.includeColumns;
            }
            return (String[])Arrays.stream(tableColumnNames).filter(cn -> !this.excludeColumns.test((String)cn)).toArray(String[]::new);
        }

        @Override
        KeyOrValueSerializer<?> getKeyOrValueSerializer(@NotNull Table t, @NotNull String[] columnNames) {
            String[] fieldNames = this.getFieldNames(columnNames);
            return new JsonKeyOrValueSerializer(t, columnNames, fieldNames, this.timestampFieldName, this.nestedObjectDelimiter, this.outputNulls);
        }

        String[] getFieldNames(String[] columnNames) {
            String[] fieldNames = new String[columnNames.length];
            for (int i = 0; i < columnNames.length; ++i) {
                fieldNames[i] = this.columnNameToFieldName == null ? columnNames[i] : this.columnNameToFieldName.getOrDefault(columnNames[i], columnNames[i]);
            }
            return fieldNames;
        }
    }

    static final class JsonConsume
    extends KafkaTools.Consume.KeyOrValueSpec {
        @Nullable
        private final ObjectMapper objectMapper;
        private final ColumnDefinition<?>[] columnDefinitions;
        private final Map<String, String> fieldToColumnName;

        JsonConsume(@NotNull ColumnDefinition<?>[] columnDefinitions, @Nullable Map<String, String> fieldNameToColumnName, @Nullable ObjectMapper objectMapper) {
            this.columnDefinitions = columnDefinitions;
            this.fieldToColumnName = JsonConsume.mapNonPointers(fieldNameToColumnName);
            this.objectMapper = objectMapper;
        }

        @Override
        public Optional<SchemaProvider> getSchemaProvider() {
            return Optional.empty();
        }

        @Override
        protected Deserializer<?> getDeserializer(KafkaTools.KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient, Map<String, ?> configs) {
            return new StringDeserializer();
        }

        @Override
        protected KafkaTools.KeyOrValueIngestData getIngestData(KafkaTools.KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient, Map<String, ?> configs, MutableInt nextColumnIndexMut, List<ColumnDefinition<?>> columnDefinitionsOut) {
            KafkaTools.KeyOrValueIngestData data = new KafkaTools.KeyOrValueIngestData();
            data.toObjectChunkMapper = JsonImpl.jsonToObjectChunkMapper(this.objectMapper);
            columnDefinitionsOut.addAll(Arrays.asList(this.columnDefinitions));
            data.fieldPathToColumnName = new HashMap<String, String>(this.columnDefinitions.length);
            HashSet<String> coveredColumns = new HashSet<String>(this.columnDefinitions.length);
            if (this.fieldToColumnName != null) {
                for (Map.Entry entry : this.fieldToColumnName.entrySet()) {
                    String colName = (String)entry.getValue();
                    data.fieldPathToColumnName.put((String)entry.getKey(), colName);
                    coveredColumns.add(colName);
                }
            }
            for (ColumnDefinition<?> colDef : this.columnDefinitions) {
                String colName = colDef.getName();
                if (coveredColumns.contains(colName)) continue;
                String jsonPtrStr = JsonConsume.mapFieldNameToJsonPointerStr(colName);
                data.fieldPathToColumnName.put(jsonPtrStr, colName);
            }
            return data;
        }

        @Override
        protected KeyOrValueProcessor getProcessor(TableDefinition tableDef, KafkaTools.KeyOrValueIngestData data) {
            return JsonNodeChunkAdapter.make(tableDef, ci -> StreamChunkUtils.chunkTypeForColumnIndex((TableDefinition)tableDef, (int)ci), data.fieldPathToColumnName, true);
        }

        private static Map<String, String> mapNonPointers(Map<String, String> fieldNameToColumnName) {
            if (fieldNameToColumnName == null) {
                return null;
            }
            boolean needsMapping = fieldNameToColumnName.keySet().stream().anyMatch(key -> !key.startsWith("/"));
            if (!needsMapping) {
                return fieldNameToColumnName;
            }
            HashMap<String, String> ans = new HashMap<String, String>(fieldNameToColumnName.size());
            for (Map.Entry<String, String> entry : fieldNameToColumnName.entrySet()) {
                String key2 = entry.getKey();
                if (key2.startsWith("/")) {
                    ans.put(key2, entry.getValue());
                    continue;
                }
                ans.put(JsonConsume.mapFieldNameToJsonPointerStr(key2), entry.getValue());
            }
            return ans;
        }

        public static String mapFieldNameToJsonPointerStr(String key) {
            return "/" + key.replace("~", "~0").replace("/", "~1");
        }
    }
}

