/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.cdc;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.CatalogUtils;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.sink.cdc.CdcSinkBuilder;
import org.apache.paimon.flink.sink.cdc.TestCdcEventParser;
import org.apache.paimon.flink.sink.cdc.TestCdcSource;
import org.apache.paimon.flink.sink.cdc.TestTable;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FailingFileIO;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;

public class FlinkCdcSyncTableSinkITCase
extends AbstractTestBase {
    private static final String DATABASE_NAME = "test";
    private static final String TABLE_NAME = "test_tbl";
    @TempDir
    java.nio.file.Path tempDir;

    @Test
    @Timeout(value=120L)
    public void testRandomCdcEvents() throws Exception {
        this.innerTestRandomCdcEvents(ThreadLocalRandom.current().nextInt(5) + 1, false, false);
    }

    @Test
    @Timeout(value=120L)
    public void testRandomCdcEventsDynamicBucket() throws Exception {
        this.innerTestRandomCdcEvents(-1, false, false);
    }

    @Disabled
    @Test
    @Timeout(value=120L)
    public void testRandomCdcEventsGlobalDynamicBucket() throws Exception {
        this.innerTestRandomCdcEvents(-1, true, false);
    }

    @Test
    @Timeout(value=120L)
    public void testRandomCdcEventsUnawareBucket() throws Exception {
        this.innerTestRandomCdcEvents(-1, false, true);
    }

    private void innerTestRandomCdcEvents(int numBucket, boolean globalIndex, boolean unawareBucketMode) throws Exception {
        FailingFileIO fileIO;
        Path tablePath;
        ThreadLocalRandom random = ThreadLocalRandom.current();
        int numEvents = random.nextInt(1500) + 1;
        int numSchemaChanges = Math.min(numEvents / 2, random.nextInt(10) + 1);
        int numPartitions = random.nextInt(3) + 1;
        int numKeys = random.nextInt(150) + 1;
        boolean enableFailure = random.nextBoolean();
        TestTable testTable = new TestTable(TABLE_NAME, numEvents, numSchemaChanges, numPartitions, numKeys, unawareBucketMode);
        String failingName = UUID.randomUUID().toString();
        if (enableFailure) {
            tablePath = new Path(FailingFileIO.getFailingPath((String)failingName, (String)CatalogUtils.stringifyPath((String)this.tempDir.toString(), (String)DATABASE_NAME, (String)TABLE_NAME)));
            fileIO = new FailingFileIO();
        } else {
            tablePath = new Path("traceable://" + CatalogUtils.stringifyPath((String)this.tempDir.toString(), (String)DATABASE_NAME, (String)TABLE_NAME));
            fileIO = LocalFileIO.create();
        }
        FailingFileIO.reset((String)failingName, (int)0, (int)1);
        List<String> primaryKeys = unawareBucketMode ? Collections.emptyList() : (globalIndex ? Collections.singletonList("k") : Arrays.asList("pt", "k"));
        FileStoreTable table = this.createFileStoreTable(tablePath, (FileIO)fileIO, testTable.initialRowType(), Collections.singletonList("pt"), primaryKeys, numBucket);
        StreamExecutionEnvironment env = this.streamExecutionEnvironmentBuilder().streamingMode().checkpointIntervalMs(100).allowRestart(enableFailure).build();
        TestCdcSource testCdcSource = new TestCdcSource(testTable.events());
        DataStreamSource source = env.fromSource((Source)testCdcSource, WatermarkStrategy.noWatermarks(), "TestCdcSource");
        source.setParallelism(2);
        Options catalogOptions = new Options();
        catalogOptions.set("warehouse", this.tempDir.toString());
        CatalogLoader & Serializable catalogLoader = (CatalogLoader & Serializable)() -> FlinkCatalogFactory.createPaimonCatalog((Options)catalogOptions);
        new CdcSinkBuilder().withInput((DataStream)source).withParserFactory(TestCdcEventParser::new).withTable((Table)table).withParallelism(Integer.valueOf(3)).withIdentifier(Identifier.create((String)DATABASE_NAME, (String)TABLE_NAME)).withCatalogLoader((CatalogLoader)catalogLoader).build();
        FailingFileIO.reset((String)failingName, (int)10, (int)10000);
        env.execute();
        FailingFileIO.reset((String)failingName, (int)0, (int)1);
        table = table.copyWithLatestSchema();
        SchemaManager schemaManager = new SchemaManager(table.fileIO(), table.location());
        TableSchema schema = (TableSchema)schemaManager.latest().get();
        ReadBuilder readBuilder = table.newReadBuilder();
        TableScan.Plan plan = readBuilder.newScan().plan();
        try (RecordReaderIterator it = new RecordReaderIterator(readBuilder.newRead().createReader(plan));){
            testTable.assertResult(schema, (Iterator<InternalRow>)it);
        }
    }

    private FileStoreTable createFileStoreTable(Path tablePath, FileIO fileIO, RowType rowType, List<String> partitions, List<String> primaryKeys, int numBucket) throws Exception {
        Options conf = new Options();
        conf.set(CoreOptions.BUCKET, (Object)numBucket);
        conf.set(CoreOptions.DYNAMIC_BUCKET_TARGET_ROW_NUM, (Object)100L);
        conf.set(CoreOptions.WRITE_BUFFER_SIZE, (Object)new MemorySize(12288L));
        conf.set(CoreOptions.PAGE_SIZE, (Object)new MemorySize(4096L));
        if (primaryKeys.isEmpty() && numBucket == -1) {
            conf.set(CoreOptions.WRITE_ONLY, (Object)true);
        }
        TableSchema tableSchema = SchemaUtils.forceCommit((SchemaManager)new SchemaManager(fileIO, tablePath), (Schema)new Schema(rowType.getFields(), partitions, primaryKeys, conf.toMap(), ""));
        return FileStoreTableFactory.create((FileIO)fileIO, (Path)tablePath, (TableSchema)tableSchema);
    }
}

