/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.source;

import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.GenericAppenderHelper;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.flink.HadoopTableResource;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.flink.source.IcebergSource;
import org.apache.iceberg.flink.source.StreamingStartingStrategy;
import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
import org.apache.iceberg.flink.source.assigner.SplitAssignerFactory;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class TestIcebergSourceFailover {
    private static final int PARALLELISM = 4;
    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    @Rule
    public final MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(4).setRpcServiceSharing(RpcServiceSharing.DEDICATED).withHaLeadershipControl().build());
    @Rule
    public final HadoopTableResource sourceTableResource = new HadoopTableResource(TEMPORARY_FOLDER, "default", "t", this.schema());
    @Rule
    public final HadoopTableResource sinkTableResource = new HadoopTableResource(TEMPORARY_FOLDER, "default", "t_sink", this.schema());

    protected IcebergSource.Builder<RowData> sourceBuilder() {
        Configuration config = new Configuration();
        config.setInteger(FlinkConfigOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 128);
        return IcebergSource.forRowData().tableLoader(this.sourceTableResource.tableLoader()).assignerFactory((SplitAssignerFactory)new SimpleSplitAssignerFactory()).flinkConfig((ReadableConfig)config);
    }

    protected Schema schema() {
        return TestFixtures.SCHEMA;
    }

    protected List<Record> generateRecords(int numRecords, long seed) {
        return RandomGenericData.generate((Schema)this.schema(), (int)numRecords, (long)seed);
    }

    protected void assertRecords(Table table, List<Record> expectedRecords, Duration interval, int maxCount) throws Exception {
        SimpleDataUtil.assertTableRecords(table, expectedRecords, interval, maxCount);
    }

    @Test
    public void testBoundedWithTaskManagerFailover() throws Exception {
        this.testBoundedIcebergSource(FailoverType.TM);
    }

    @Test
    public void testBoundedWithJobManagerFailover() throws Exception {
        this.testBoundedIcebergSource(FailoverType.JM);
    }

    private void testBoundedIcebergSource(FailoverType failoverType) throws Exception {
        ArrayList expectedRecords = Lists.newArrayList();
        GenericAppenderHelper dataAppender = new GenericAppenderHelper(this.sourceTableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER);
        for (int i = 0; i < 4; ++i) {
            List<Record> records = this.generateRecords(2, i);
            expectedRecords.addAll(records);
            dataAppender.appendToTable(records);
        }
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)1, (long)0L));
        DataStreamSource stream = env.fromSource((Source)this.sourceBuilder().build(), WatermarkStrategy.noWatermarks(), "IcebergSource", TypeInformation.of(RowData.class));
        DataStream streamFailingInTheMiddleOfReading = RecordCounterToFail.wrapWithFailureAfter((DataStream)stream, expectedRecords.size() / 2);
        FlinkSink.forRowData((DataStream)streamFailingInTheMiddleOfReading).table(this.sinkTableResource.table()).tableLoader(this.sinkTableResource.tableLoader()).append();
        JobClient jobClient = env.executeAsync("Bounded Iceberg Source Failover Test");
        JobID jobId = jobClient.getJobID();
        RecordCounterToFail.waitToFail();
        TestIcebergSourceFailover.triggerFailover(failoverType, jobId, () -> RecordCounterToFail.continueProcessing(), this.miniClusterResource.getMiniCluster());
        this.assertRecords(this.sinkTableResource.table(), expectedRecords, Duration.ofMillis(10L), 12000);
    }

    @Test
    public void testContinuousWithTaskManagerFailover() throws Exception {
        this.testContinuousIcebergSource(FailoverType.TM);
    }

    @Test
    public void testContinuousWithJobManagerFailover() throws Exception {
        this.testContinuousIcebergSource(FailoverType.JM);
    }

    private void testContinuousIcebergSource(FailoverType failoverType) throws Exception {
        GenericAppenderHelper dataAppender = new GenericAppenderHelper(this.sourceTableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER);
        ArrayList expectedRecords = Lists.newArrayList();
        List<Record> batch = this.generateRecords(2, 0L);
        expectedRecords.addAll(batch);
        dataAppender.appendToTable(batch);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        env.enableCheckpointing(10L);
        Configuration config = new Configuration();
        config.setInteger(FlinkConfigOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 128);
        DataStreamSource stream = env.fromSource((Source)this.sourceBuilder().streaming(true).monitorInterval(Duration.ofMillis(10L)).streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL).build(), WatermarkStrategy.noWatermarks(), "IcebergSource", TypeInformation.of(RowData.class));
        FlinkSink.forRowData((DataStream)stream).table(this.sinkTableResource.table()).tableLoader(this.sinkTableResource.tableLoader()).append();
        JobClient jobClient = env.executeAsync("Continuous Iceberg Source Failover Test");
        JobID jobId = jobClient.getJobID();
        for (int i = 1; i < 5; ++i) {
            Thread.sleep(10L);
            List<Record> records = this.generateRecords(2, i);
            expectedRecords.addAll(records);
            dataAppender.appendToTable(records);
            if (i != 2) continue;
            TestIcebergSourceFailover.triggerFailover(failoverType, jobId, () -> {}, this.miniClusterResource.getMiniCluster());
        }
        this.assertRecords(this.sinkTableResource.table(), expectedRecords, Duration.ofMillis(10L), 12000);
    }

    private static void triggerFailover(FailoverType type, JobID jobId, Runnable afterFailAction, MiniCluster miniCluster) throws Exception {
        switch (type) {
            case NONE: {
                afterFailAction.run();
                break;
            }
            case TM: {
                TestIcebergSourceFailover.restartTaskManager(afterFailAction, miniCluster);
                break;
            }
            case JM: {
                TestIcebergSourceFailover.triggerJobManagerFailover(jobId, afterFailAction, miniCluster);
            }
        }
    }

    private static void triggerJobManagerFailover(JobID jobId, Runnable afterFailAction, MiniCluster miniCluster) throws Exception {
        HaLeadershipControl haLeadershipControl = (HaLeadershipControl)miniCluster.getHaLeadershipControl().get();
        haLeadershipControl.revokeJobMasterLeadership(jobId).get();
        afterFailAction.run();
        haLeadershipControl.grantJobMasterLeadership(jobId).get();
    }

    private static void restartTaskManager(Runnable afterFailAction, MiniCluster miniCluster) throws Exception {
        miniCluster.terminateTaskManager(0).get();
        afterFailAction.run();
        miniCluster.startTaskManager();
    }

    private static class RecordCounterToFail {
        private static AtomicInteger records;
        private static CompletableFuture<Void> fail;
        private static CompletableFuture<Void> continueProcessing;

        private RecordCounterToFail() {
        }

        private static <T> DataStream<T> wrapWithFailureAfter(DataStream<T> stream, int failAfter) {
            records = new AtomicInteger();
            fail = new CompletableFuture();
            continueProcessing = new CompletableFuture();
            return stream.map((MapFunction & Serializable)record -> {
                boolean notFailedYet;
                boolean reachedFailPoint = records.incrementAndGet() > failAfter;
                boolean bl = notFailedYet = !fail.isDone();
                if (notFailedYet && reachedFailPoint) {
                    fail.complete(null);
                    continueProcessing.get();
                }
                return record;
            });
        }

        private static void waitToFail() throws ExecutionException, InterruptedException {
            fail.get();
        }

        private static void continueProcessing() {
            continueProcessing.complete(null);
        }
    }

    private static enum FailoverType {
        NONE,
        TM,
        JM;

    }
}

