/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.cdc;

import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.sink.cdc.CdcMultiplexRecord;
import org.apache.paimon.flink.sink.cdc.CdcMultiplexRecordChannelComputer;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class CdcMultiplexRecordChannelComputerTest {
    @TempDir
    java.nio.file.Path tempDir;
    private Catalog.Loader catalogLoader;
    private Path warehouse;
    private String databaseName;
    private Identifier tableWithPartition;
    private Catalog catalog;
    private Identifier tableWithoutPartition;

    @BeforeEach
    public void before() throws Exception {
        this.warehouse = new Path("traceable://" + this.tempDir.toString());
        this.databaseName = "test_db";
        this.tableWithPartition = Identifier.create((String)this.databaseName, (String)"test_table1");
        this.tableWithoutPartition = Identifier.create((String)this.databaseName, (String)"test_table2");
        Options catalogOptions = new Options();
        catalogOptions.set(CatalogOptions.WAREHOUSE, (Object)this.warehouse.toString());
        catalogOptions.set(CatalogOptions.URI, (Object)"");
        this.catalogLoader = (Catalog.Loader & Serializable)() -> CatalogFactory.createCatalog((CatalogContext)CatalogContext.create((Options)catalogOptions));
        this.catalog = this.catalogLoader.load();
        this.catalog.createDatabase(this.databaseName, true);
        Options conf = new Options();
        conf.set(CdcRecordStoreWriteOperator.RETRY_SLEEP_TIME, (Object)Duration.ofMillis(10L));
        RowType rowTypeWithPartition = RowType.of((DataType[])new DataType[]{DataTypes.INT(), DataTypes.BIGINT(), DataTypes.DOUBLE()}, (String[])new String[]{"pt", "k", "v"});
        RowType rowTypeWithoutPartition = RowType.of((DataType[])new DataType[]{DataTypes.BIGINT(), DataTypes.DOUBLE()}, (String[])new String[]{"k", "v"});
        List<Tuple2> tables = Arrays.asList(Tuple2.of((Object)this.tableWithPartition, (Object)new Schema(rowTypeWithPartition.getFields(), Collections.singletonList("pt"), Arrays.asList("pt", "k"), conf.toMap(), "")), Tuple2.of((Object)this.tableWithoutPartition, (Object)new Schema(rowTypeWithoutPartition.getFields(), Collections.emptyList(), Collections.singletonList("k"), conf.toMap(), "")));
        for (Tuple2 tableAndSchema : tables) {
            this.catalog.createTable((Identifier)tableAndSchema.f0, (Schema)tableAndSchema.f1, false);
        }
    }

    @Test
    public void testSchemaWithPartition() throws Exception {
        ThreadLocalRandom random = ThreadLocalRandom.current();
        int numInputs = random.nextInt(1000) + 1;
        ArrayList<Map<String, String>> input = new ArrayList<Map<String, String>>();
        for (int i = 0; i < numInputs; ++i) {
            HashMap<String, String> fields = new HashMap<String, String>();
            fields.put("pt", String.valueOf(random.nextInt(10) + 1));
            fields.put("k", String.valueOf(random.nextLong()));
            fields.put("v", String.valueOf(random.nextDouble()));
            input.add(fields);
        }
        this.testImpl(this.tableWithPartition, input);
    }

    @Test
    public void testSchemaNoPartition() {
        ThreadLocalRandom random = ThreadLocalRandom.current();
        int numInputs = random.nextInt(1000) + 1;
        ArrayList<Map<String, String>> input = new ArrayList<Map<String, String>>();
        for (int i = 0; i < numInputs; ++i) {
            HashMap<String, String> fields = new HashMap<String, String>();
            fields.put("k", String.valueOf(random.nextLong()));
            fields.put("v", String.valueOf(random.nextDouble()));
            input.add(fields);
        }
        this.testImpl(this.tableWithoutPartition, input);
    }

    private void testImpl(Identifier tableId, List<Map<String, String>> input) {
        ThreadLocalRandom random = ThreadLocalRandom.current();
        int numChannels = random.nextInt(10) + 1;
        CdcMultiplexRecordChannelComputer channelComputer = new CdcMultiplexRecordChannelComputer(this.catalogLoader, new HashMap());
        channelComputer.setup(numChannels);
        for (Map<String, String> fields : input) {
            CdcRecord insertRecord = new CdcRecord(RowKind.INSERT, fields);
            CdcRecord deleteRecord = new CdcRecord(RowKind.DELETE, fields);
            Assertions.assertThat((int)channelComputer.channel(CdcMultiplexRecord.fromCdcRecord((String)tableId.getDatabaseName(), (String)tableId.getObjectName(), (CdcRecord)insertRecord))).isEqualTo(channelComputer.channel(CdcMultiplexRecord.fromCdcRecord((String)tableId.getDatabaseName(), (String)tableId.getObjectName(), (CdcRecord)deleteRecord)));
        }
        int numTests = random.nextInt(10) + 1;
        for (int test = 0; test < numTests; ++test) {
            Map<String, String> fields = input.get(random.nextInt(input.size()));
            CdcRecord record = new CdcRecord(RowKind.INSERT, fields);
            int numBuckets = random.nextInt(numChannels * 4) + 1;
            for (int i = 0; i < numBuckets; ++i) {
                int channel = channelComputer.channel(CdcMultiplexRecord.fromCdcRecord((String)tableId.getDatabaseName(), (String)tableId.getObjectName(), (CdcRecord)record));
                Assertions.assertThat((channel >= 0 ? 1 : 0) != 0).isTrue();
            }
        }
    }
}

