/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.flink.bigquery.table.restrictions;

import com.google.cloud.flink.bigquery.common.exceptions.BigQueryConnectorException;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.FieldReferenceExpression;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.expressions.ValueLiteralExpression;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.FunctionDefinition;

@Internal
public class BigQueryRestriction {
    private static final Pattern STARTS_WITH_PATTERN = Pattern.compile("([^%]+)%");
    private static final Map<FunctionDefinition, Operation> FILTERS = BigQueryRestriction.initializeOperationMapper();

    private BigQueryRestriction() {
    }

    private static Map<FunctionDefinition, Operation> initializeOperationMapper() {
        HashMap<FunctionDefinition, Operation> mapping = new HashMap<FunctionDefinition, Operation>();
        mapping.put((FunctionDefinition)BuiltInFunctionDefinitions.EQUALS, Operation.EQ);
        mapping.put((FunctionDefinition)BuiltInFunctionDefinitions.NOT_EQUALS, Operation.NOT_EQ);
        mapping.put((FunctionDefinition)BuiltInFunctionDefinitions.GREATER_THAN, Operation.GT);
        mapping.put((FunctionDefinition)BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL, Operation.GT_EQ);
        mapping.put((FunctionDefinition)BuiltInFunctionDefinitions.LESS_THAN, Operation.LT);
        mapping.put((FunctionDefinition)BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL, Operation.LT_EQ);
        mapping.put((FunctionDefinition)BuiltInFunctionDefinitions.IS_NULL, Operation.IS_NULL);
        mapping.put((FunctionDefinition)BuiltInFunctionDefinitions.IS_NOT_NULL, Operation.NOT_NULL);
        mapping.put((FunctionDefinition)BuiltInFunctionDefinitions.AND, Operation.AND);
        mapping.put((FunctionDefinition)BuiltInFunctionDefinitions.OR, Operation.OR);
        mapping.put((FunctionDefinition)BuiltInFunctionDefinitions.NOT, Operation.NOT);
        mapping.put((FunctionDefinition)BuiltInFunctionDefinitions.LIKE, Operation.STARTS_WITH);
        return mapping;
    }

    public static Optional<String> convert(Expression flinkExpression) {
        if (!(flinkExpression instanceof CallExpression)) {
            return Optional.empty();
        }
        CallExpression call = (CallExpression)flinkExpression;
        Operation op = FILTERS.get(call.getFunctionDefinition());
        switch (op) {
            case IS_NULL: {
                return BigQueryRestriction.onlyChildAs(call, FieldReferenceExpression.class).map(FieldReferenceExpression::getName).map(field -> field + " IS NULL");
            }
            case NOT_NULL: {
                return BigQueryRestriction.onlyChildAs(call, FieldReferenceExpression.class).map(FieldReferenceExpression::getName).map(field -> "NOT " + field + " IS NULL");
            }
            case LT: {
                return BigQueryRestriction.convertOperationPartsWithItsSymbol("<", call);
            }
            case LT_EQ: {
                return BigQueryRestriction.convertOperationPartsWithItsSymbol("<=", call);
            }
            case GT: {
                return BigQueryRestriction.convertOperationPartsWithItsSymbol(">", call);
            }
            case GT_EQ: {
                return BigQueryRestriction.convertOperationPartsWithItsSymbol(">=", call);
            }
            case EQ: {
                return BigQueryRestriction.convertOperationPartsWithItsSymbol("=", call);
            }
            case NOT_EQ: {
                return BigQueryRestriction.convertOperationPartsWithItsSymbol("<>", call);
            }
            case NOT: {
                return BigQueryRestriction.onlyChildAs(call, CallExpression.class).flatMap(BigQueryRestriction::convert).map(field -> "NOT " + field);
            }
            case AND: {
                return BigQueryRestriction.convertLogicExpressionWithOperandsSymbol("AND", call);
            }
            case OR: {
                return BigQueryRestriction.convertLogicExpressionWithOperandsSymbol("OR", call);
            }
            case STARTS_WITH: {
                return BigQueryRestriction.convertLike(call);
            }
        }
        throw new BigQueryConnectorException(String.format("The provided Flink expression is not supported %s.", call.getFunctionName()));
    }

    private static <T extends ResolvedExpression> Optional<T> onlyChildAs(CallExpression call, Class<T> expectedChildClass) {
        List children = call.getResolvedChildren();
        if (children.size() != 1) {
            return Optional.empty();
        }
        ResolvedExpression child = (ResolvedExpression)children.get(0);
        if (!expectedChildClass.isInstance(child)) {
            return Optional.empty();
        }
        return Optional.of(expectedChildClass.cast(child));
    }

    private static Optional<String> convertLike(CallExpression call) {
        List args = call.getResolvedChildren();
        if (args.size() != 2) {
            return Optional.empty();
        }
        Expression left = (Expression)args.get(0);
        Expression right = (Expression)args.get(1);
        if (left instanceof FieldReferenceExpression && right instanceof ValueLiteralExpression) {
            String name = ((FieldReferenceExpression)left).getName();
            return BigQueryRestriction.convertLiteral((ValueLiteralExpression)right).flatMap(lit -> {
                if (lit instanceof String) {
                    String pattern = (String)lit;
                    Matcher matcher = STARTS_WITH_PATTERN.matcher(pattern);
                    if (!pattern.contains("_") && matcher.matches()) {
                        return Optional.of(name + " LIKE '" + matcher.group(1) + "'");
                    }
                }
                return Optional.empty();
            });
        }
        return Optional.empty();
    }

    private static Optional<String> convertLogicExpressionWithOperandsSymbol(String operandsSymbol, CallExpression call) {
        List args = call.getResolvedChildren();
        if (args == null || args.size() != 2) {
            return Optional.empty();
        }
        Optional<String> left = BigQueryRestriction.convert((Expression)args.get(0));
        Optional<String> right = BigQueryRestriction.convert((Expression)args.get(1));
        if (left.isPresent() && right.isPresent()) {
            return Optional.of(String.format("(%s %s %s)", left.get(), operandsSymbol, right.get()));
        }
        return Optional.empty();
    }

    private static String addSingleQuotes(String input) {
        return "'" + input + "'";
    }

    private static Optional<Object> convertLiteral(ValueLiteralExpression expression) {
        Optional value = expression.getValueAs(expression.getOutputDataType().getLogicalType().getDefaultConversion());
        return value.map(o -> {
            if (o instanceof LocalDateTime) {
                return BigQueryRestriction.addSingleQuotes(((LocalDateTime)o).toString());
            }
            if (o instanceof Instant) {
                return BigQueryRestriction.addSingleQuotes(((Instant)o).toString());
            }
            if (o instanceof LocalTime) {
                return BigQueryRestriction.addSingleQuotes(((LocalTime)o).toString());
            }
            if (o instanceof LocalDate) {
                return BigQueryRestriction.addSingleQuotes(((LocalDate)o).toString());
            }
            if (o instanceof String) {
                return BigQueryRestriction.addSingleQuotes((String)o);
            }
            return o;
        });
    }

    private static Optional<String> convertOperationPartsWithItsSymbol(String operationSymbol, CallExpression call) {
        List args = call.getResolvedChildren();
        if (args.size() != 2) {
            return Optional.empty();
        }
        Expression left = (Expression)args.get(0);
        Expression right = (Expression)args.get(1);
        Optional<Object> leftOption = Optional.empty();
        Optional<Object> rightOption = Optional.empty();
        if (left instanceof FieldReferenceExpression && right instanceof ValueLiteralExpression) {
            leftOption = Optional.of(((FieldReferenceExpression)left).getName());
            rightOption = BigQueryRestriction.convertLiteral((ValueLiteralExpression)right);
        } else if (left instanceof ValueLiteralExpression && right instanceof FieldReferenceExpression) {
            leftOption = BigQueryRestriction.convertLiteral((ValueLiteralExpression)left);
            rightOption = Optional.of(((FieldReferenceExpression)right).getName());
        }
        if (leftOption.isPresent() && rightOption.isPresent()) {
            return Optional.of(String.format("(%s %s %s)", leftOption.get(), operationSymbol, rightOption.get()));
        }
        return Optional.empty();
    }

    static enum Operation {
        EQ,
        NOT_EQ,
        GT,
        GT_EQ,
        LT,
        LT_EQ,
        IS_NULL,
        NOT_NULL,
        AND,
        OR,
        NOT,
        STARTS_WITH;

    }
}

