/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.source.reader;

import java.io.IOException;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.HadoopTableResource;
import org.apache.iceberg.flink.source.reader.ColumnStatsWatermarkExtractor;
import org.apache.iceberg.flink.source.reader.ReaderUtil;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class TestColumnStatsWatermarkExtractor {
    public static final Schema SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.required((int)1, (String)"timestamp_column", (Type)Types.TimestampType.withoutZone()), Types.NestedField.required((int)2, (String)"timestamptz_column", (Type)Types.TimestampType.withZone()), Types.NestedField.required((int)3, (String)"long_column", (Type)Types.LongType.get()), Types.NestedField.required((int)4, (String)"string_column", (Type)Types.StringType.get())});
    private static final GenericAppenderFactory APPENDER_FACTORY = new GenericAppenderFactory(SCHEMA);
    private static final List<List<Record>> TEST_RECORDS = ImmutableList.of((Object)RandomGenericData.generate((Schema)SCHEMA, (int)3, (long)2L), (Object)RandomGenericData.generate((Schema)SCHEMA, (int)3, (long)19L));
    private static final List<Map<String, Long>> MIN_VALUES = ImmutableList.of((Object)Maps.newHashMapWithExpectedSize((int)3), (Object)Maps.newHashMapWithExpectedSize((int)3));
    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    @Rule
    public final HadoopTableResource sourceTableResource = new HadoopTableResource(TEMPORARY_FOLDER, "default", "t", SCHEMA);
    private final String columnName;

    @BeforeClass
    public static void updateMinValue() {
        for (int i = 0; i < TEST_RECORDS.size(); ++i) {
            for (Record r : TEST_RECORDS.get(i)) {
                Map<String, Long> minValues = MIN_VALUES.get(i);
                LocalDateTime localDateTime = (LocalDateTime)r.get(0);
                minValues.merge("timestamp_column", localDateTime.toInstant(ZoneOffset.UTC).toEpochMilli(), Math::min);
                OffsetDateTime offsetDateTime = (OffsetDateTime)r.get(1);
                minValues.merge("timestamptz_column", offsetDateTime.toInstant().toEpochMilli(), Math::min);
                minValues.merge("long_column", (Long)r.get(2), Math::min);
            }
        }
    }

    @Parameterized.Parameters(name="{0}")
    public static Collection<Object[]> data() {
        return ImmutableList.of((Object)new Object[]{"timestamp_column"}, (Object)new Object[]{"timestamptz_column"}, (Object)new Object[]{"long_column"});
    }

    public TestColumnStatsWatermarkExtractor(String columnName) {
        this.columnName = columnName;
    }

    @Test
    public void testSingle() throws IOException {
        ColumnStatsWatermarkExtractor extractor = new ColumnStatsWatermarkExtractor(SCHEMA, this.columnName, TimeUnit.MILLISECONDS);
        Assert.assertEquals((long)MIN_VALUES.get(0).get(this.columnName), (long)extractor.extractWatermark(this.split(0)));
    }

    @Test
    public void testTimeUnit() throws IOException {
        Assume.assumeTrue((String)"Run only for long column", (boolean)this.columnName.equals("long_column"));
        ColumnStatsWatermarkExtractor extractor = new ColumnStatsWatermarkExtractor(SCHEMA, this.columnName, TimeUnit.MICROSECONDS);
        Assert.assertEquals((long)(MIN_VALUES.get(0).get(this.columnName) / 1000L), (long)extractor.extractWatermark(this.split(0)));
    }

    @Test
    public void testMultipleFiles() throws IOException {
        Assume.assumeTrue((String)"Run only for the timestamp column", (boolean)this.columnName.equals("timestamp_column"));
        IcebergSourceSplit combinedSplit = IcebergSourceSplit.fromCombinedScanTask((CombinedScanTask)ReaderUtil.createCombinedScanTask(TEST_RECORDS, TEMPORARY_FOLDER, FileFormat.PARQUET, APPENDER_FACTORY));
        ColumnStatsWatermarkExtractor extractor = new ColumnStatsWatermarkExtractor(SCHEMA, this.columnName, null);
        Assert.assertEquals((long)MIN_VALUES.get(0).get(this.columnName), (long)extractor.extractWatermark(this.split(0)));
        Assert.assertEquals((long)MIN_VALUES.get(1).get(this.columnName), (long)extractor.extractWatermark(this.split(1)));
        Assert.assertEquals((long)Math.min(MIN_VALUES.get(0).get(this.columnName), MIN_VALUES.get(1).get(this.columnName)), (long)extractor.extractWatermark(combinedSplit));
    }

    @Test
    public void testWrongColumn() {
        Assume.assumeTrue((String)"Run only for string column", (boolean)this.columnName.equals("string_column"));
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> new ColumnStatsWatermarkExtractor(SCHEMA, this.columnName, null)).isInstanceOf(IllegalArgumentException.class)).hasMessageContaining("Found STRING, expected a LONG or TIMESTAMP column for watermark generation.");
    }

    @Test
    public void testEmptyStatistics() throws IOException {
        Assume.assumeTrue((String)"Run only for timestamp column", (boolean)this.columnName.equals("timestamp_column"));
        ColumnStatsWatermarkExtractor extractor = new ColumnStatsWatermarkExtractor(10, "missing_field");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> extractor.extractWatermark(this.split(0))).isInstanceOf(IllegalArgumentException.class)).hasMessageContaining("Missing statistics for column");
    }

    private IcebergSourceSplit split(int id) throws IOException {
        return IcebergSourceSplit.fromCombinedScanTask((CombinedScanTask)ReaderUtil.createCombinedScanTask((List<List<Record>>)ImmutableList.of(TEST_RECORDS.get(id)), TEMPORARY_FOLDER, FileFormat.PARQUET, APPENDER_FACTORY));
    }
}

