/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.source.reader;

import java.io.IOException;
import java.nio.file.Path;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Parameter;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.HadoopTableExtension;
import org.apache.iceberg.flink.source.reader.ColumnStatsWatermarkExtractor;
import org.apache.iceberg.flink.source.reader.ReaderUtil;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Assumptions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

@ExtendWith(value={ParameterizedTestExtension.class})
public class TestColumnStatsWatermarkExtractor {
    public static final Schema SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.required((int)1, (String)"timestamp_column", (Type)Types.TimestampType.withoutZone()), Types.NestedField.required((int)2, (String)"timestamptz_column", (Type)Types.TimestampType.withZone()), Types.NestedField.required((int)3, (String)"long_column", (Type)Types.LongType.get()), Types.NestedField.required((int)4, (String)"string_column", (Type)Types.StringType.get())});
    private static final GenericAppenderFactory APPENDER_FACTORY = new GenericAppenderFactory(SCHEMA);
    private static final List<List<Record>> TEST_RECORDS = ImmutableList.of((Object)RandomGenericData.generate((Schema)SCHEMA, (int)3, (long)2L), (Object)RandomGenericData.generate((Schema)SCHEMA, (int)3, (long)19L));
    private static final List<Map<String, Long>> MIN_VALUES = ImmutableList.of((Object)Maps.newHashMapWithExpectedSize((int)3), (Object)Maps.newHashMapWithExpectedSize((int)3));
    @TempDir
    protected Path temporaryFolder;
    @RegisterExtension
    private static final HadoopTableExtension SOURCE_TABLE_EXTENSION = new HadoopTableExtension("default", "t", SCHEMA);
    @Parameter(index=0)
    private String columnName;

    @BeforeAll
    public static void updateMinValue() {
        for (int i = 0; i < TEST_RECORDS.size(); ++i) {
            for (Record r : TEST_RECORDS.get(i)) {
                Map<String, Long> minValues = MIN_VALUES.get(i);
                LocalDateTime localDateTime = (LocalDateTime)r.get(0);
                minValues.merge("timestamp_column", localDateTime.toInstant(ZoneOffset.UTC).toEpochMilli(), Math::min);
                OffsetDateTime offsetDateTime = (OffsetDateTime)r.get(1);
                minValues.merge("timestamptz_column", offsetDateTime.toInstant().toEpochMilli(), Math::min);
                minValues.merge("long_column", (Long)r.get(2), Math::min);
            }
        }
    }

    @Parameters(name="columnName = {0}")
    public static Collection<Object[]> data() {
        return ImmutableList.of((Object)new Object[]{"timestamp_column"}, (Object)new Object[]{"timestamptz_column"}, (Object)new Object[]{"long_column"});
    }

    @TestTemplate
    public void testSingle() throws IOException {
        ColumnStatsWatermarkExtractor extractor = new ColumnStatsWatermarkExtractor(SCHEMA, this.columnName, TimeUnit.MILLISECONDS);
        Assertions.assertThat((long)extractor.extractWatermark(this.split(0))).isEqualTo(MIN_VALUES.get(0).get(this.columnName).longValue());
    }

    @TestTemplate
    public void testTimeUnit() throws IOException {
        Assumptions.assumeThat((String)this.columnName).isEqualTo("long_column");
        ColumnStatsWatermarkExtractor extractor = new ColumnStatsWatermarkExtractor(SCHEMA, this.columnName, TimeUnit.MICROSECONDS);
        Assertions.assertThat((long)extractor.extractWatermark(this.split(0))).isEqualTo(MIN_VALUES.get(0).get(this.columnName) / 1000L);
    }

    @TestTemplate
    public void testMultipleFiles() throws IOException {
        Assumptions.assumeThat((String)this.columnName).isEqualTo("timestamp_column");
        IcebergSourceSplit combinedSplit = IcebergSourceSplit.fromCombinedScanTask((CombinedScanTask)ReaderUtil.createCombinedScanTask(TEST_RECORDS, this.temporaryFolder, FileFormat.PARQUET, APPENDER_FACTORY));
        ColumnStatsWatermarkExtractor extractor = new ColumnStatsWatermarkExtractor(SCHEMA, this.columnName, null);
        Assertions.assertThat((long)extractor.extractWatermark(this.split(0))).isEqualTo(MIN_VALUES.get(0).get(this.columnName).longValue());
        Assertions.assertThat((long)extractor.extractWatermark(this.split(1))).isEqualTo(MIN_VALUES.get(1).get(this.columnName).longValue());
        Assertions.assertThat((long)extractor.extractWatermark(combinedSplit)).isEqualTo(Math.min(MIN_VALUES.get(0).get(this.columnName), MIN_VALUES.get(1).get(this.columnName)));
    }

    @TestTemplate
    public void testWrongColumn() {
        Assumptions.assumeThat((String)this.columnName).isEqualTo("string_column");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> new ColumnStatsWatermarkExtractor(SCHEMA, this.columnName, null)).isInstanceOf(IllegalArgumentException.class)).hasMessageContaining("Found STRING, expected a LONG or TIMESTAMP column for watermark generation.");
    }

    @TestTemplate
    public void testEmptyStatistics() throws IOException {
        Assumptions.assumeThat((String)this.columnName).isEqualTo("timestamp_column");
        ColumnStatsWatermarkExtractor extractor = new ColumnStatsWatermarkExtractor(10, "missing_field");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> extractor.extractWatermark(this.split(0))).isInstanceOf(IllegalArgumentException.class)).hasMessageContaining("Missing statistics for column");
    }

    private IcebergSourceSplit split(int id) throws IOException {
        return IcebergSourceSplit.fromCombinedScanTask((CombinedScanTask)ReaderUtil.createCombinedScanTask((List<List<Record>>)ImmutableList.of(TEST_RECORDS.get(id)), this.temporaryFolder, FileFormat.PARQUET, APPENDER_FACTORY));
    }
}

