/*
 * Decompiled with CFR 0.152.
 */
package io.floodplain.streams.debezium;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.floodplain.immutable.api.ImmutableMessage;
import io.floodplain.immutable.factory.ImmutableFactory;
import io.floodplain.replication.api.ReplicationMessage;
import io.floodplain.replication.factory.ReplicationFactory;
import io.floodplain.streams.debezium.DebeziumParseException;
import io.floodplain.streams.debezium.KeyValue;
import io.floodplain.streams.debezium.TableIdentifier;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.StreamSupport;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JSONToReplicationMessage {
    private static final ObjectMapper objectMapper = new ObjectMapper();
    private static final Logger logger = LoggerFactory.getLogger(JSONToReplicationMessage.class);

    public static KeyValue parse(String keyInput, byte[] data) {
        try {
            JsonNode mapped = objectMapper.readTree(keyInput);
            if (!(mapped instanceof ObjectNode)) {
                throw new ClassCastException("Expected debezium style key. Not type: " + mapped.getClass() + " data: " + new String(data, StandardCharsets.UTF_8));
            }
            ObjectNode keynode = (ObjectNode)mapped;
            TableIdentifier key = JSONToReplicationMessage.processDebeziumKey(keynode);
            ObjectNode valuenode = (ObjectNode)objectMapper.readTree(data);
            if (!valuenode.has("payload") || valuenode.get("payload").isNull()) {
                ReplicationMessage replMsg = ReplicationFactory.empty().withOperation(ReplicationMessage.Operation.DELETE);
                return new KeyValue(key.combinedKey, ReplicationFactory.getInstance().serialize(replMsg));
            }
            ReplicationMessage convOptional = JSONToReplicationMessage.convertToReplication(false, valuenode, Optional.empty());
            byte[] serialized = ReplicationFactory.getInstance().serialize(convOptional);
            return new KeyValue(key.combinedKey, serialized);
        }
        catch (IOException e) {
            logger.error("Error: ", (Throwable)e);
            return null;
        }
    }

    public static Deserializer<ReplicationMessage> replicationFromConnect() {
        return (topic, data) -> JSONToReplicationMessage.parseConnectMessage(data);
    }

    public static ReplicationMessage parseConnectMessage(byte[] data) {
        try {
            ObjectNode valuenode = (ObjectNode)objectMapper.readTree(data);
            if (!valuenode.has("payload") || valuenode.get("payload").isNull()) {
                return null;
            }
            return JSONToReplicationMessage.convertToReplication(false, valuenode, Optional.empty());
        }
        catch (IOException e) {
            throw new RuntimeException("JSON parse issue while parsing expected json to replication message: " + new String(data, StandardCharsets.UTF_8), e);
        }
    }

    public static ImmutableMessage convert(ObjectNode node, Consumer<String> callbackFieldList, boolean isKey, Optional<ReplicationMessage.Operation> o, Optional<String> table) {
        if (!isKey && o.isPresent() && o.get().equals((Object)ReplicationMessage.Operation.DELETE) && table.isPresent()) {
            return ImmutableFactory.empty().with("table", (Object)table.get(), ImmutableMessage.ValueType.STRING);
        }
        try {
            JsonNode payLoad = node.get("payload");
            Optional payload = payLoad.isNull() ? Optional.empty() : Optional.of((ObjectNode)payLoad);
            JsonNode schema = node.get("schema");
            if (schema.isNull()) {
                logger.info("WRITING FAILED: {}", (Object)objectMapper.writeValueAsString((Object)node));
            }
            ArrayNode fields = (ArrayNode)schema.get("fields");
            if (!isKey) {
                Optional<JsonNode> firstFields = JSONToReplicationMessage.findFirstChild(fields, e -> true);
                fields = firstFields.isPresent() ? (ArrayNode)firstFields.get().get("fields") : fields;
            }
            HashMap types = new HashMap();
            HashMap jsonValues = new HashMap();
            fields.forEach(e -> {
                String field = e.get("field").asText();
                callbackFieldList.accept(field);
                JsonNode name = e.get("name");
                Optional<String> typeName = name == null ? Optional.empty() : Optional.of(name.asText());
                Optional<JsonNode> typeParameters = Optional.ofNullable(e.get("parameters"));
                String rawType = e.get("type").asText();
                ImmutableMessage.ValueType type = JSONToReplicationMessage.resolveType(rawType, typeName, typeParameters);
                types.put(field, type);
                boolean hasAfter = ((ObjectNode)payload.get()).has("after");
                boolean reallyHasAfter = hasAfter && !((ObjectNode)payload.get()).get("after").isNull();
                Optional<ObjectNode> after = reallyHasAfter ? Optional.ofNullable((ObjectNode)((ObjectNode)payload.get()).get("after")) : Optional.empty();
                Object resolvedValue = reallyHasAfter ? JSONToReplicationMessage.resolveValue(after, field, rawType, typeName, e) : JSONToReplicationMessage.resolveValue(payload, field, rawType, typeName, e);
                jsonValues.put(field, resolvedValue);
            });
            return ImmutableFactory.create(jsonValues, types);
        }
        catch (JsonProcessingException e1) {
            logger.error("Error: ", (Throwable)e1);
            return ImmutableFactory.empty();
        }
    }

    public static ReplicationMessage convertToReplication(boolean isKey, ObjectNode node, Optional<String> table) {
        ObjectNode payload = (ObjectNode)node.get("payload");
        long millis = payload.get("ts_ms").asLong();
        ReplicationMessage.Operation o = JSONToReplicationMessage.resolveOperation(payload, payload.get("op").asText());
        ImmutableMessage core = JSONToReplicationMessage.convert(node, s -> {}, isKey, Optional.of(o), table);
        return ReplicationFactory.standardMessage((ImmutableMessage)core).withOperation(o).atTime(millis);
    }

    private static Optional<JsonNode> findFirstChild(ArrayNode node, Predicate<JsonNode> pred) {
        return StreamSupport.stream(node.spliterator(), false).filter(pred).findFirst();
    }

    private static Object resolveValue(Optional<ObjectNode> fields, String field, String type, Optional<String> typeName, JsonNode currentType) {
        try {
            JsonNode node = fields.get().get(field);
            if (node == null) {
                throw new NullPointerException("Missing node for field: " + field + " type: " + type + " typeName: " + typeName);
            }
            return JSONToReplicationMessage.resolveValue(type, typeName, node, currentType);
        }
        catch (Throwable e) {
            throw new RuntimeException("Error resolving value: " + field + " with type: " + type + " named type: " + typeName, e);
        }
    }

    public static ImmutableMessage.ValueType resolveType(String type, Optional<String> namedType, Optional<JsonNode> parameters) {
        if (namedType.isEmpty()) {
            return JSONToReplicationMessage.resolveSimpleType(type);
        }
        switch (namedType.get()) {
            case "io.debezium.time.Date": {
                return ImmutableMessage.ValueType.DATE;
            }
            case "io.debezium.time.MicroTimestamp": 
            case "io.debezium.time.ZonedTimestamp": 
            case "io.debezium.time.NanoTimestamp": {
                return ImmutableMessage.ValueType.TIMESTAMP;
            }
            case "io.debezium.data.VariableScaleDecimal": {
                return ImmutableMessage.ValueType.LONG;
            }
            case "org.apache.kafka.connect.data.Decimal": {
                JsonNode scaleNode;
                if (parameters.isPresent() && (scaleNode = parameters.get().get("scale")) != null) {
                    return Integer.parseInt(scaleNode.asText()) > 0 ? ImmutableMessage.ValueType.DECIMAL : ImmutableMessage.ValueType.LONG;
                }
                return ImmutableMessage.ValueType.LONG;
            }
            case "io.debezium.data.Enum": {
                return ImmutableMessage.ValueType.ENUM;
            }
            case "io.debezium.data.Uuid": {
                return ImmutableMessage.ValueType.STRING;
            }
        }
        logger.warn("Unknown type with name, this will probably fail: {}", (Object)namedType.get());
        return JSONToReplicationMessage.resolveSimpleType(type);
    }

    public static Object resolveValue(String type, Optional<String> namedType, JsonNode value, JsonNode typeParameters) {
        if (value.isNull()) {
            return null;
        }
        if (namedType.isEmpty()) {
            return JSONToReplicationMessage.resolveSimple(type, value);
        }
        switch (namedType.get()) {
            case "io.debezium.time.Date": {
                int valueInt = value.asInt();
                return LocalDate.ofEpochDay(valueInt);
            }
            case "io.debezium.time.ZonedTimestamp": {
                return ZonedDateTime.ofInstant(Instant.ofEpochMilli(value.asLong()), ZoneId.systemDefault());
            }
            case "io.debezium.time.NanoTimestamp": {
                long nano = value.asLong();
                Instant instant2 = Instant.ofEpochMilli(nano / 1000000L).plusNanos(nano % 1000000L);
                return LocalDateTime.ofInstant(instant2, ZoneId.systemDefault());
            }
            case "io.debezium.time.MicroTimestamp": {
                long l3 = value.asLong();
                long remain = l3 % 1000L;
                Instant instant3 = Instant.ofEpochMilli(l3 / 1000L).plusNanos(remain * 1000L);
                return LocalDateTime.ofInstant(instant3, ZoneId.systemDefault());
            }
            case "io.debezium.data.VariableScaleDecimal": {
                ObjectNode node = (ObjectNode)value;
                int scale = node.get("scale").asInt();
                String val = node.get("value").asText();
                byte[] binary = Base64.getDecoder().decode(val);
                BigDecimal decoded = new BigDecimal(new BigInteger(binary), scale);
                logger.info("VariableScale: {} -> decoded length: {}", (Object)decoded, (Object)binary.length);
                return decoded.longValue();
            }
            case "org.apache.kafka.connect.data.Decimal": {
                String decval = value.asText();
                Optional<ObjectNode> typeParams = Optional.ofNullable((ObjectNode)typeParameters.get("parameters"));
                return JSONToReplicationMessage.parseDecimal(Base64.getDecoder().decode(decval), typeParams);
            }
            case "io.debezium.data.Enum": 
            case "io.debezium.data.Uuid": {
                return value.asText();
            }
        }
        return JSONToReplicationMessage.resolveSimple(type, value);
    }

    private static Object parseDecimal(byte[] bytes, Optional<ObjectNode> typeParams) {
        Optional<JsonNode> scaleNode = typeParams.map(e -> e.get("scale"));
        Optional<Integer> scale = scaleNode.filter(e -> !e.isNull()).map(e -> Integer.parseInt(e.asText()));
        BigDecimal decoded = scale.map(integer -> new BigDecimal(new BigInteger(bytes), (int)integer)).orElseGet(() -> new BigDecimal(new BigInteger(bytes)));
        if (decoded.scale() > 0) {
            return decoded;
        }
        return decoded.longValue();
    }

    private static Object resolveSimple(String type, JsonNode value) {
        switch (type) {
            case "int16": 
            case "integer": 
            case "int32": {
                return value.asInt();
            }
            case "int64": 
            case "long": {
                return value.asLong();
            }
            case "string": {
                return value.asText();
            }
            case "boolean": {
                return value.asBoolean();
            }
            case "double": {
                return value.asDouble();
            }
            case "binary": 
            case "bytes": {
                return Base64.getDecoder().decode(value.asText());
            }
            case "list": 
            case "array": {
                ArrayList ar = new ArrayList();
                value.forEach(e -> ar.add(e.asText()));
                return Collections.unmodifiableList(ar);
            }
        }
        throw new RuntimeException("Unknown type: " + type);
    }

    private static ImmutableMessage.ValueType resolveSimpleType(String type) {
        switch (type) {
            case "int16": 
            case "int32": {
                return ImmutableMessage.ValueType.INTEGER;
            }
            case "int64": {
                return ImmutableMessage.ValueType.LONG;
            }
            case "string": {
                return ImmutableMessage.ValueType.STRING;
            }
            case "double": {
                return ImmutableMessage.ValueType.DOUBLE;
            }
            case "bytes": {
                return ImmutableMessage.ValueType.BINARY;
            }
            case "array": {
                return ImmutableMessage.ValueType.STRINGLIST;
            }
            case "boolean": {
                return ImmutableMessage.ValueType.BOOLEAN;
            }
        }
        throw new RuntimeException("Unknown type: " + type);
    }

    private static ReplicationMessage.Operation resolveOperation(ObjectNode payloadNode, String opName) {
        boolean hasBefore = payloadNode.has("before");
        boolean hasAfter = payloadNode.has("after");
        if (hasBefore && !hasAfter) {
            if (!"d".equals(opName)) {
                logger.warn("Unexpected operation: " + opName);
            }
            return ReplicationMessage.Operation.DELETE;
        }
        switch (opName) {
            case "u": 
            case "r": 
            case "c": {
                return ReplicationMessage.Operation.UPDATE;
            }
            case "d": {
                return ReplicationMessage.Operation.DELETE;
            }
        }
        return ReplicationMessage.Operation.NONE;
    }

    public static ReplicationMessage processDebeziumBody(byte[] data, Optional<String> table) throws DebeziumParseException {
        if (data == null) {
            return null;
        }
        try {
            ObjectNode valueNode = (ObjectNode)objectMapper.readTree(data);
            if (!valueNode.has("payload") || valueNode.get("payload").isNull()) {
                return ReplicationFactory.empty().withOperation(ReplicationMessage.Operation.DELETE);
            }
            return JSONToReplicationMessage.convertToReplication(false, valueNode, table);
        }
        catch (IOException e) {
            throw new DebeziumParseException("Error parsing debezium body", e);
        }
    }

    public static TableIdentifier processDebeziumKey(ObjectNode on) {
        ArrayList<String> fields = new ArrayList<String>();
        ImmutableMessage converted = JSONToReplicationMessage.convert(on, fields::add, true, Optional.empty(), Optional.empty());
        Optional tableId = converted.value("__dbz__physicalTableIdentifier");
        fields.remove("__dbz__physicalTableIdentifier");
        return new TableIdentifier(tableId.map(e -> (String)e).orElse(on.get("schema").get("name").asText()), converted, fields);
    }
}

