/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka.table;

import java.io.Serializable;
import java.util.function.Predicate;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.header.Header;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class KafkaRecordHeaderFilter
implements Serializable,
Predicate<ConsumerRecord<?, ?>> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordHeaderFilter.class);
    private final String filterExpression;
    private transient Predicate<ConsumerRecord<?, ?>> filterPredicate;

    public KafkaRecordHeaderFilter(String filterExpression) {
        this.filterExpression = filterExpression;
    }

    public void open() {
        try {
            this.filterPredicate = this.createPredicate(this.filterExpression);
        }
        catch (ParseException e) {
            throw new RuntimeException(String.format("Failed to parse filter expression \"%s\"", this.filterExpression), e);
        }
    }

    @Override
    public boolean test(ConsumerRecord<?, ?> consumerRecord) {
        return this.filterPredicate.test(consumerRecord);
    }

    private Predicate<ConsumerRecord<?, ?>> createPredicate(String expression) throws ParseException {
        String[] split = expression.split("(?=[&|])|(?<=[&|])");
        LOG.info("Parsing expression: {}", (Object)split);
        Predicate<ConsumerRecord<?, ?>> p = this.toPredicate(split[0]);
        String operator = null;
        for (int i = 1; i < split.length; ++i) {
            if (i % 2 == 0) {
                if (operator == null) {
                    throw new ParseException(String.format("Operator should not be null while processing key-value pair \"%s\"", split[i]));
                }
                p = this.combinePredicate(operator, p, this.toPredicate(split[i]));
                continue;
            }
            operator = split[i];
        }
        return p;
    }

    private Predicate<ConsumerRecord<?, ?>> toPredicate(String kvPair) throws ParseException {
        String[] split;
        boolean isNegating = false;
        if (kvPair.charAt(0) == '!') {
            isNegating = true;
            kvPair = kvPair.substring(1);
        }
        if ((split = kvPair.split(":")).length != 2) {
            throw new ParseException(String.format("Unable to split key-value pair \"%s\"", kvPair));
        }
        String key = split[0];
        if (key == null) {
            throw new ParseException(String.format("Key is parsed as null in key-value pair \"%s\"", kvPair));
        }
        String value = split[1];
        if (value == null) {
            throw new ParseException(String.format("Value is parsed as null in key-value pair \"%s\"", kvPair));
        }
        Predicate<ConsumerRecord<?, ?>> predicate = consumerRecord -> {
            Header header = consumerRecord.headers().lastHeader(key);
            return header != null && value.equals(new String(header.value()));
        };
        return isNegating ? predicate.negate() : predicate;
    }

    private <T> Predicate<T> combinePredicate(String operator, Predicate<T> left, Predicate<T> right) {
        switch (operator) {
            case "&": {
                return left.and(right);
            }
            case "|": {
                return left.or(right);
            }
        }
        throw new ParseException(String.format("Unsupported logical operator \"%s\"", operator));
    }
}

