/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.cdc;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Supplier;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogUtils;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.sink.cdc.FlinkCdcSyncDatabaseSinkBuilder;
import org.apache.paimon.flink.sink.cdc.TestCdcEvent;
import org.apache.paimon.flink.sink.cdc.TestCdcEventParser;
import org.apache.paimon.flink.sink.cdc.TestCdcSourceFunction;
import org.apache.paimon.flink.sink.cdc.TestTable;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FailingFileIO;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;

public class FlinkCdcSyncDatabaseSinkITCase
extends AbstractTestBase {
    private static final String DATABASE_NAME = "test";
    private static final String TABLE_NAME = "test_tbl";
    @TempDir
    java.nio.file.Path tempDir;

    @Test
    @Timeout(value=120L)
    public void testRandomCdcEvents() throws Exception {
        this.innerTestRandomCdcEvents(() -> ThreadLocalRandom.current().nextInt(5) + 1);
    }

    @Test
    @Timeout(value=120L)
    public void testRandomCdcEventsDynamicBucket() throws Exception {
        this.innerTestRandomCdcEvents(() -> -1);
    }

    private void innerTestRandomCdcEvents(Supplier<Integer> bucket) throws Exception {
        ThreadLocalRandom random = ThreadLocalRandom.current();
        int numTables = random.nextInt(3) + 1;
        boolean enableFailure = random.nextBoolean();
        int maxEvents = 1000;
        int maxSchemaChanges = 10;
        int maxPartitions = 3;
        int maxKeys = 150;
        String failingName = UUID.randomUUID().toString();
        ArrayList<TestTable> testTables = new ArrayList<TestTable>();
        ArrayList<FileStoreTable> fileStoreTables = new ArrayList<FileStoreTable>();
        for (int i = 0; i < numTables; ++i) {
            FailingFileIO fileIO;
            Path tablePath;
            String tableName = TABLE_NAME + i;
            TestTable testTable = new TestTable(tableName, random.nextInt(maxEvents) + 1, random.nextInt(maxSchemaChanges) + 1, random.nextInt(maxPartitions) + 1, random.nextInt(maxKeys) + 1);
            testTables.add(testTable);
            if (enableFailure) {
                tablePath = new Path(FailingFileIO.getFailingPath((String)failingName, (String)CatalogUtils.stringifyPath((String)this.tempDir.toString(), (String)DATABASE_NAME, (String)tableName)));
                fileIO = new FailingFileIO();
            } else {
                tablePath = new Path("traceable://" + CatalogUtils.stringifyPath((String)this.tempDir.toString(), (String)DATABASE_NAME, (String)tableName));
                fileIO = LocalFileIO.create();
            }
            FailingFileIO.reset((String)failingName, (int)0, (int)1);
            FileStoreTable fileStoreTable = this.createFileStoreTable(tablePath, (FileIO)fileIO, testTable.initialRowType(), Collections.singletonList("pt"), Arrays.asList("pt", "k"), bucket.get());
            fileStoreTables.add(fileStoreTable);
        }
        List<TestCdcEvent> events = this.mergeTestTableEvents(testTables);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getCheckpointConfig().setCheckpointInterval(100L);
        if (!enableFailure) {
            env.setRestartStrategy(RestartStrategies.noRestart());
        }
        TestCdcSourceFunction sourceFunction = new TestCdcSourceFunction(events);
        DataStreamSource source = env.addSource((SourceFunction)sourceFunction);
        source.setParallelism(2);
        Options catalogOptions = new Options();
        catalogOptions.set("warehouse", this.tempDir.toString());
        Catalog.Loader & Serializable catalogLoader = (Catalog.Loader & Serializable)() -> FlinkCatalogFactory.createPaimonCatalog((Options)catalogOptions);
        new FlinkCdcSyncDatabaseSinkBuilder().withInput((DataStream)source).withParserFactory(TestCdcEventParser::new).withTables(fileStoreTables).withTableOptions(Collections.singletonMap(FlinkConnectorOptions.SINK_PARALLELISM.key(), "2")).withDatabase(DATABASE_NAME).withCatalogLoader((Catalog.Loader)catalogLoader).build();
        FailingFileIO.reset((String)failingName, (int)10, (int)10000);
        env.execute();
        FailingFileIO.reset((String)failingName, (int)0, (int)1);
        for (int i = 0; i < numTables; ++i) {
            FileStoreTable table = ((FileStoreTable)fileStoreTables.get(i)).copyWithLatestSchema();
            SchemaManager schemaManager = new SchemaManager(table.fileIO(), table.location());
            TableSchema schema = (TableSchema)schemaManager.latest().get();
            ReadBuilder readBuilder = table.newReadBuilder();
            TableScan.Plan plan = readBuilder.newScan().plan();
            try (RecordReaderIterator it = new RecordReaderIterator(readBuilder.newRead().createReader(plan));){
                ((TestTable)testTables.get(i)).assertResult(schema, (Iterator<InternalRow>)it);
                continue;
            }
        }
    }

    private FileStoreTable createFileStoreTable(Path tablePath, FileIO fileIO, RowType rowType, List<String> partitions, List<String> primaryKeys, int numBucket) throws Exception {
        Options conf = new Options();
        conf.set(CoreOptions.BUCKET, (Object)numBucket);
        conf.set(CoreOptions.DYNAMIC_BUCKET_TARGET_ROW_NUM, (Object)100L);
        conf.set(CoreOptions.WRITE_BUFFER_SIZE, (Object)new MemorySize(12288L));
        conf.set(CoreOptions.PAGE_SIZE, (Object)new MemorySize(4096L));
        TableSchema tableSchema = SchemaUtils.forceCommit((SchemaManager)new SchemaManager(fileIO, tablePath), (Schema)new Schema(rowType.getFields(), partitions, primaryKeys, conf.toMap(), ""));
        return FileStoreTableFactory.create((FileIO)fileIO, (Path)tablePath, (TableSchema)tableSchema);
    }

    private List<TestCdcEvent> mergeTestTableEvents(List<TestTable> testTables) {
        ArrayList<Integer> toShuffle = new ArrayList<Integer>();
        for (int i = 0; i < testTables.size(); ++i) {
            for (int j = 0; j < testTables.get(i).events().size(); ++j) {
                toShuffle.add(i);
            }
        }
        Collections.shuffle(toShuffle);
        ArrayList<TestCdcEvent> events = new ArrayList<TestCdcEvent>();
        Iterator iterator = toShuffle.iterator();
        while (iterator.hasNext()) {
            int idx = (Integer)iterator.next();
            events.add(testTables.get(idx).events().poll());
        }
        return events;
    }
}

