/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source.align;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.runtime.source.event.SourceEventWrapper;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.streaming.api.operators.SourceOperator;
import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.flink.source.FileStoreSourceReader;
import org.apache.paimon.flink.source.FileStoreSourceReaderTest;
import org.apache.paimon.flink.source.FileStoreSourceSplitGenerator;
import org.apache.paimon.flink.source.align.AlignedContinuousFileStoreSource;
import org.apache.paimon.flink.source.align.AlignedSourceReader;
import org.apache.paimon.flink.source.align.CheckpointEvent;
import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ReflectionUtils;
import org.assertj.core.api.Assertions;
import org.junit.Ignore;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public class AlignedSourceReaderTest
extends FileStoreSourceReaderTest {
    private static final String COMMIT_USER = "commit_user";
    private static final String MAIN_OPERATOR_FIELD = "mainOperator";
    private static final String INPUT_PROCESSOR_FIELD = "inputProcessor";
    private FileStoreTable table;

    @Override
    @BeforeEach
    public void beforeEach() throws Exception {
        super.beforeEach();
        LocalFileIO fileIO = LocalFileIO.create();
        Path tablePath = new Path(this.tempDir.toUri());
        SchemaManager schemaManager = new SchemaManager((FileIO)fileIO, tablePath);
        TableSchema tableSchema = (TableSchema)schemaManager.latest().get();
        this.table = FileStoreTableFactory.create((FileIO)fileIO, (Path)tablePath, (TableSchema)tableSchema);
    }

    @Override
    @Test
    public void testAddMultipleSplits() throws Exception {
        TestingReaderContext context = new TestingReaderContext();
        AlignedSourceReader reader = (AlignedSourceReader)this.createReader(context);
        reader.start();
        Assertions.assertThat((int)context.getNumSplitRequests()).isEqualTo(1);
        reader.addSplits(Arrays.asList(AlignedSourceReaderTest.createTestFileSplit("id1"), AlignedSourceReaderTest.createTestFileSplit("id2")));
        TestingReaderOutput output = new TestingReaderOutput();
        while (reader.getNumberOfCurrentlyAssignedSplits() > 0) {
            reader.pollNext((ReaderOutput)output);
            Thread.sleep(10L);
        }
        Assertions.assertThat((int)context.getNumSplitRequests()).isEqualTo(1);
        reader.handleSourceEvents((SourceEvent)new CheckpointEvent(1L));
        Assertions.assertThat((Optional)reader.shouldTriggerCheckpoint()).isEqualTo(Optional.of(1L));
        Assertions.assertThat((int)context.getNumSplitRequests()).isEqualTo(2);
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    @Timeout(value=300L)
    public void testCheckpointTrigger(boolean rpcFirst) throws Exception {
        this.writeTable();
        AlignedContinuousFileStoreSource alignedSource = new AlignedContinuousFileStoreSource(this.table.newReadBuilder(), this.table.options(), null, false, null);
        SourceOperatorFactory sourceOperatorFactory = new SourceOperatorFactory((Source)alignedSource, WatermarkStrategy.noWatermarks());
        StreamTaskMailboxTestHarnessBuilder builder = new StreamTaskMailboxTestHarnessBuilder(SourceOperatorStreamTask::new, (TypeInformation)InternalTypeInfo.of((org.apache.flink.table.types.logical.RowType)LogicalTypeConversion.toLogicalType((RowType)this.table.rowType()))).setCollectNetworkEvents().setupOutputForSingletonOperatorChain((StreamOperatorFactory)sourceOperatorFactory);
        try (StreamTaskMailboxTestHarness testHarness = builder.build();){
            StreamTask streamTask = testHarness.getStreamTask();
            SourceOperator sourceOperator = (SourceOperator)ReflectionUtils.getPrivateFieldValue((Object)streamTask, (String)MAIN_OPERATOR_FIELD);
            AlignedSourceReader sourceReader = (AlignedSourceReader)sourceOperator.getSourceReader();
            StreamInputProcessor inputProcessor = (StreamInputProcessor)ReflectionUtils.getPrivateFieldValue((Object)streamTask, (String)INPUT_PROCESSOR_FIELD);
            Queue output = testHarness.getOutput();
            CheckpointMetaData checkpointMetaData = new CheckpointMetaData(1L, 2L);
            CheckpointOptions checkpointOptions = CheckpointOptions.alignedNoTimeout((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault());
            List splits = new FileStoreSourceSplitGenerator().createSplits(this.table.newStreamScan().plan());
            sourceOperator.handleOperatorEvent((OperatorEvent)new AddSplitEvent(splits, (SimpleVersionedSerializer)alignedSource.getSplitSerializer()));
            SourceEventWrapper checkpointEvent = new SourceEventWrapper((SourceEvent)new CheckpointEvent(1L));
            if (rpcFirst) {
                sourceOperator.handleOperatorEvent((OperatorEvent)checkpointEvent);
                streamTask.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
            }
            while (sourceReader.getNumberOfCurrentlyAssignedSplits() > 0 || inputProcessor.isAvailable()) {
                testHarness.processAll();
            }
            if (!rpcFirst) {
                sourceOperator.handleOperatorEvent((OperatorEvent)checkpointEvent);
                testHarness.processAll();
                CompletableFuture triggerCheckpointAsync = streamTask.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
                while (!triggerCheckpointAsync.isDone()) {
                    testHarness.processSingleStep();
                }
            }
            Assertions.assertThat((int)output.size()).isEqualTo(2);
            Assertions.assertThat(output.peek()).isInstanceOf(StreamRecord.class);
            RowData record = (RowData)((StreamRecord)output.poll()).getValue();
            Assertions.assertThat((Object)record).matches(rowData -> !rowData.isNullAt(0) && !rowData.isNullAt(1) && !rowData.isNullAt(2));
            Assertions.assertThat((Object)record).matches(rowData -> rowData.getLong(0) == 3L && rowData.getLong(1) == 33L && rowData.getInt(2) == 303);
            Assertions.assertThat(output.peek()).isInstanceOf(CheckpointBarrier.class);
            Assertions.assertThat(output.poll()).isEqualTo((Object)new CheckpointBarrier(checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), checkpointOptions));
        }
    }

    @Override
    @Ignore
    public void testReaderOnSplitFinished() throws Exception {
    }

    @Override
    protected FileStoreSourceReader createReader(TestingReaderContext context, TableRead tableRead) {
        return new AlignedSourceReader((SourceReaderContext)context, tableRead, new FileStoreSourceReaderMetrics((MetricGroup)new FileStoreSourceReaderTest.DummyMetricGroup()), IOManager.create((String)this.tempDir.toString()), null, null);
    }

    private void writeTable() throws Exception {
        try (TableWriteImpl write = this.table.newWrite(COMMIT_USER);
             TableCommitImpl commit = this.table.newCommit(COMMIT_USER);){
            write.write((InternalRow)GenericRow.of((Object[])new Object[]{3L, 33L, 303}), 0);
            commit.commit(1L, write.prepareCommit(true, 0L));
        }
    }
}

