/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.assertion.sink;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.LongAccumulator;
import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.assertion.excecutor.AssertExecutor;
import org.apache.seatunnel.connectors.seatunnel.assertion.exception.AssertConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.assertion.exception.AssertConnectorException;
import org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertFieldRule;
import org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertTableRule;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;

public class AssertSinkWriter
extends AbstractSinkWriter<SeaTunnelRow, Void>
implements SupportMultiTableSinkWriter<Void> {
    private final SeaTunnelRowType seaTunnelRowType;
    private final Map<String, List<AssertFieldRule>> assertFieldRules;
    private final Map<String, List<AssertFieldRule.AssertRule>> assertRowRules;
    private final AssertTableRule assertTableRule;
    private static final AssertExecutor ASSERT_EXECUTOR = new AssertExecutor();
    private static final Map<String, LongAccumulator> LONG_ACCUMULATOR = new ConcurrentHashMap<String, LongAccumulator>();
    private static final Set<String> TABLE_NAMES = new CopyOnWriteArraySet<String>();
    private final String catalogTableName;

    public AssertSinkWriter(SeaTunnelRowType seaTunnelRowType, Map<String, List<AssertFieldRule>> assertFieldRules, Map<String, List<AssertFieldRule.AssertRule>> assertRowRules, AssertTableRule assertTableRule, String catalogTableName) {
        this.seaTunnelRowType = seaTunnelRowType;
        this.assertFieldRules = assertFieldRules;
        this.assertRowRules = assertRowRules;
        this.assertTableRule = assertTableRule;
        this.catalogTableName = catalogTableName;
    }

    public void write(SeaTunnelRow element) {
        TABLE_NAMES.add(element.getTableId());
        List<AssertFieldRule> assertFieldRule = null;
        String tableName = null;
        if (this.assertFieldRules.size() == 1) {
            assertFieldRule = this.assertFieldRules.values().iterator().next();
        }
        if (this.assertRowRules.size() == 1) {
            tableName = this.assertRowRules.keySet().iterator().next();
        }
        tableName = StringUtils.isEmpty(tableName) && StringUtils.isNotEmpty((CharSequence)element.getTableId()) ? element.getTableId() : this.catalogTableName;
        if (Objects.isNull(assertFieldRule)) {
            assertFieldRule = this.assertFieldRules.get(tableName);
        }
        LONG_ACCUMULATOR.computeIfAbsent(tableName, k -> new LongAccumulator(Long::sum, 0L)).accumulate(1L);
        if (Objects.nonNull(assertFieldRule)) {
            ASSERT_EXECUTOR.fail(element, this.seaTunnelRowType, assertFieldRule).ifPresent(failRule -> {
                throw new AssertConnectorException((SeaTunnelErrorCode)AssertConnectorErrorCode.RULE_VALIDATION_FAILED, "row :" + element + " fail rule: " + failRule);
            });
        }
    }

    public void close() {
        if (!this.assertRowRules.isEmpty()) {
            this.assertRowRules.entrySet().stream().filter(entry -> !((List)entry.getValue()).isEmpty()).forEach(entry -> {
                List assertRules = (List)entry.getValue();
                assertRules.stream().filter(assertRule -> {
                    long count = LONG_ACCUMULATOR.containsKey(entry.getKey()) ? LONG_ACCUMULATOR.get(entry.getKey()).longValue() : 0L;
                    switch (assertRule.getRuleType()) {
                        case MAX_ROW: {
                            return !((double)count <= assertRule.getRuleValue());
                        }
                        case MIN_ROW: {
                            return !((double)count >= assertRule.getRuleValue());
                        }
                    }
                    return false;
                }).findFirst().ifPresent(failRule -> {
                    long count = LONG_ACCUMULATOR.containsKey(entry.getKey()) ? LONG_ACCUMULATOR.get(entry.getKey()).longValue() : 0L;
                    throw new AssertConnectorException((SeaTunnelErrorCode)AssertConnectorErrorCode.RULE_VALIDATION_FAILED, "row num :" + count + " fail rule: " + failRule);
                });
            });
        }
        if (!this.assertTableRule.getTableNames().isEmpty() && !new HashSet<String>(this.assertTableRule.getTableNames()).equals(TABLE_NAMES)) {
            throw new AssertConnectorException((SeaTunnelErrorCode)AssertConnectorErrorCode.RULE_VALIDATION_FAILED, "table names: " + TABLE_NAMES + " is not equal to " + this.assertTableRule.getTableNames());
        }
    }
}

