/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.source;

import java.io.Serializable;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.InMemoryReporter;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Collector;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericAppenderHelper;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.HadoopTableResource;
import org.apache.iceberg.flink.MiniClusterResource;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.source.IcebergSource;
import org.apache.iceberg.flink.source.StreamingStartingStrategy;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.awaitility.Awaitility;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class TestIcebergSourceWithWatermarkExtractor
implements Serializable {
    private static final int PARALLELISM = 4;
    private static final String SOURCE_NAME = "IcebergSource";
    private static final int RECORD_NUM_FOR_2_SPLITS = 200;
    private static final ConcurrentMap<Long, Integer> WINDOWS = Maps.newConcurrentMap();
    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    private final InMemoryReporter reporter = InMemoryReporter.createWithRetainedMetrics();
    @Rule
    public final MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(4).setRpcServiceSharing(RpcServiceSharing.DEDICATED).setConfiguration(this.reporter.addToConfiguration(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)).withHaLeadershipControl().build());
    @Rule
    public final HadoopTableResource sourceTableResource = new HadoopTableResource(TEMPORARY_FOLDER, "default", "t", TestFixtures.TS_SCHEMA);

    @Test
    public void testWindowing() throws Exception {
        GenericAppenderHelper dataAppender = this.appender();
        ArrayList expectedRecords = Lists.newArrayList();
        Object batch = ImmutableList.of((Object)this.generateRecord(100, "file_1-recordTs_100"), (Object)this.generateRecord(101, "file_1-recordTs_101"), (Object)this.generateRecord(103, "file_1-recordTs_103"));
        expectedRecords.addAll(batch);
        dataAppender.appendToTable((List)batch);
        batch = Lists.newArrayListWithCapacity((int)100);
        for (int i = 0; i < 200; ++i) {
            batch.add(this.generateRecord(4 - i % 5, "file_2-recordTs_" + i));
        }
        expectedRecords.addAll(batch);
        dataAppender.appendToTable((List)batch);
        batch = ImmutableList.of((Object)this.generateRecord(6, "file_3-recordTs_6"), (Object)this.generateRecord(7, "file_3-recordTs_7"));
        expectedRecords.addAll(batch);
        dataAppender.appendToTable((List)batch);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource stream = env.fromSource(this.source(), WatermarkStrategy.noWatermarks().withTimestampAssigner((SerializableTimestampAssigner)new RowDataTimestampAssigner()), SOURCE_NAME, TypeInformation.of(RowData.class));
        stream.windowAll((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.minutes((long)5L))).apply((AllWindowFunction)new AllWindowFunction<RowData, RowData, TimeWindow>(){

            public void apply(TimeWindow window, Iterable<RowData> values, Collector<RowData> out) {
                AtomicInteger count = new AtomicInteger(0);
                values.forEach(a -> count.incrementAndGet());
                out.collect((Object)TestIcebergSourceWithWatermarkExtractor.row(window.getStart(), count.get()));
                WINDOWS.put(window.getStart(), count.get());
            }
        });
        WINDOWS.clear();
        env.executeAsync("Iceberg Source Windowing Test");
        Awaitility.await().pollInterval(Duration.ofMillis(10L)).atMost(30L, TimeUnit.SECONDS).until(() -> WINDOWS.equals(ImmutableMap.of((Object)0L, (Object)200, (Object)TimeUnit.MINUTES.toMillis(5L), (Object)2)));
        dataAppender.appendToTable(new DataFile[]{dataAppender.writeFile((List)ImmutableList.of((Object)this.generateRecord(1500, "last-record")))});
        Awaitility.await().pollInterval(Duration.ofMillis(10L)).atMost(30L, TimeUnit.SECONDS).until(() -> WINDOWS.equals(ImmutableMap.of((Object)0L, (Object)200, (Object)TimeUnit.MINUTES.toMillis(5L), (Object)2, (Object)TimeUnit.MINUTES.toMillis(100L), (Object)3)));
    }

    @Test
    public void testThrottling() throws Exception {
        GenericAppenderHelper dataAppender = this.appender();
        ImmutableList batch1 = ImmutableList.of((Object)this.generateRecord(100, "file_1-recordTs_100"), (Object)this.generateRecord(103, "file_1-recordTs_103"));
        ArrayList batch2 = Lists.newArrayListWithCapacity((int)100);
        for (int i = 0; i < 200; ++i) {
            batch2.add(this.generateRecord(4 - i % 5, "file_2-recordTs_" + i));
        }
        ImmutableList batch3 = ImmutableList.of((Object)this.generateRecord(15, "file_3-recordTs_15"), (Object)this.generateRecord(16, "file_3-recordTs_16"), (Object)this.generateRecord(17, "file_3-recordTs_17"));
        ImmutableList batch4 = ImmutableList.of((Object)this.generateRecord(15, "file_4-recordTs_15"), (Object)this.generateRecord(16, "file_4-recordTs_16"), (Object)this.generateRecord(17, "file_4-recordTs_17"));
        ImmutableList batch5 = ImmutableList.of((Object)this.generateRecord(90, "file_5-recordTs_90"), (Object)this.generateRecord(91, "file_5-recordTs_91"));
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        DataStreamSource stream = env.fromSource(this.source(), WatermarkStrategy.noWatermarks().withWatermarkAlignment("iceberg", Duration.ofMinutes(20L), Duration.ofMillis(10L)), SOURCE_NAME, TypeInformation.of(RowData.class));
        try (CloseableIterator resultIterator = stream.collectAsync();){
            JobClient jobClient = env.executeAsync("Iceberg Source Throttling Test");
            CommonTestUtils.waitForAllTaskRunning((MiniCluster)this.miniClusterResource.getMiniCluster(), (JobID)jobClient.getJobID(), (boolean)false);
            dataAppender.appendToTable(new DataFile[]{dataAppender.writeFile((List)batch1), dataAppender.writeFile((List)batch2)});
            Awaitility.await().pollInterval(Duration.ofMillis(10L)).atMost(30L, TimeUnit.SECONDS).until(() -> this.findAlignmentDriftMetric(jobClient.getJobID(), TimeUnit.MINUTES.toMillis(80L)).isPresent());
            Gauge<Long> drift = this.findAlignmentDriftMetric(jobClient.getJobID(), TimeUnit.MINUTES.toMillis(80L)).get();
            dataAppender.appendToTable(new DataFile[]{dataAppender.writeFile((List)batch3), dataAppender.writeFile((List)batch4)});
            Awaitility.await().pollInterval(Duration.ofMillis(10L)).atMost(30L, TimeUnit.SECONDS).until(() -> ((Long)drift.getValue()).longValue() == TimeUnit.MINUTES.toMillis(65L));
            dataAppender.appendToTable((List)batch5);
            Awaitility.await().pollInterval(Duration.ofMillis(10L)).atMost(30L, TimeUnit.SECONDS).until(() -> (Long)drift.getValue() < TimeUnit.MINUTES.toMillis(20L));
        }
    }

    protected IcebergSource<RowData> source() {
        return IcebergSource.builder().tableLoader(this.sourceTableResource.tableLoader()).watermarkColumn("ts").project(TestFixtures.TS_SCHEMA).splitSize(Long.valueOf(100L)).streaming(true).monitorInterval(Duration.ofMillis(10L)).streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL).build();
    }

    protected Record generateRecord(int minutes, String str) {
        GenericRecord record = GenericRecord.create((Schema)TestFixtures.TS_SCHEMA);
        LocalDateTime ts = LocalDateTime.ofInstant(Instant.ofEpochMilli(Time.of((long)minutes, (TimeUnit)TimeUnit.MINUTES).toMilliseconds()), ZoneId.of("Z"));
        record.setField("ts", (Object)ts);
        record.setField("str", (Object)str);
        return record;
    }

    private Optional<Gauge<Long>> findAlignmentDriftMetric(JobID jobID, long withValue) {
        String metricsName = "IcebergSource.*watermarkAlignmentDrift";
        return this.reporter.findMetrics(jobID, metricsName).values().stream().map(m -> (Gauge)m).filter(m -> (Long)m.getValue() == withValue).findFirst();
    }

    private GenericAppenderHelper appender() {
        Configuration hadoopConf = new Configuration();
        hadoopConf.set("write.parquet.page-size-bytes", "64");
        hadoopConf.set("write.parquet.row-group-size-bytes", "64");
        return new GenericAppenderHelper(this.sourceTableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER, hadoopConf);
    }

    private static RowData row(long time, long count) {
        GenericRowData result = new GenericRowData(2);
        result.setField(0, (Object)time);
        result.setField(1, (Object)String.valueOf(count));
        return result;
    }

    private static class RowDataTimestampAssigner
    implements SerializableTimestampAssigner<RowData> {
        private RowDataTimestampAssigner() {
        }

        public long extractTimestamp(RowData element, long recordTimestamp) {
            return element.getTimestamp(0, 0).getMillisecond();
        }
    }
}

