/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.cdc;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.CdcRecordChannelComputer;
import org.apache.paimon.flink.sink.cdc.CdcRecordKeyAndBucketExtractor;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class CdcRecordChannelComputerTest {
    @TempDir
    java.nio.file.Path tempDir;

    @Test
    public void testSchemaWithPartition() throws Exception {
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT(), DataTypes.BIGINT(), DataTypes.DOUBLE()}, (String[])new String[]{"pt", "k", "v"});
        SchemaManager schemaManager = new SchemaManager((FileIO)LocalFileIO.create(), new Path(this.tempDir.toString()));
        TableSchema schema = schemaManager.createTable(new Schema(rowType.getFields(), Collections.singletonList("pt"), Arrays.asList("pt", "k"), new HashMap(), ""));
        ThreadLocalRandom random = ThreadLocalRandom.current();
        int numInputs = random.nextInt(1000) + 1;
        ArrayList<Map<String, String>> input = new ArrayList<Map<String, String>>();
        for (int i = 0; i < numInputs; ++i) {
            HashMap<String, String> fields = new HashMap<String, String>();
            fields.put("pt", String.valueOf(random.nextInt(10) + 1));
            fields.put("k", String.valueOf(random.nextLong()));
            fields.put("v", String.valueOf(random.nextDouble()));
            input.add(fields);
        }
        this.testImpl(schema, input);
    }

    @Test
    public void testSchemaNoPartition() throws Exception {
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.BIGINT(), DataTypes.DOUBLE()}, (String[])new String[]{"k", "v"});
        SchemaManager schemaManager = new SchemaManager((FileIO)LocalFileIO.create(), new Path(this.tempDir.toString()));
        TableSchema schema = schemaManager.createTable(new Schema(rowType.getFields(), Collections.emptyList(), Collections.singletonList("k"), new HashMap(), ""));
        ThreadLocalRandom random = ThreadLocalRandom.current();
        int numInputs = random.nextInt(1000) + 1;
        ArrayList<Map<String, String>> input = new ArrayList<Map<String, String>>();
        for (int i = 0; i < numInputs; ++i) {
            HashMap<String, String> fields = new HashMap<String, String>();
            fields.put("k", String.valueOf(random.nextLong()));
            fields.put("v", String.valueOf(random.nextDouble()));
            input.add(fields);
        }
        this.testImpl(schema, input);
    }

    private void testImpl(TableSchema schema, List<Map<String, String>> input) {
        BinaryRow partition;
        ThreadLocalRandom random = ThreadLocalRandom.current();
        CdcRecordKeyAndBucketExtractor extractor = new CdcRecordKeyAndBucketExtractor(schema);
        int numChannels = random.nextInt(10) + 1;
        CdcRecordChannelComputer channelComputer = new CdcRecordChannelComputer(schema);
        channelComputer.setup(numChannels);
        for (Map<String, String> fields : input) {
            CdcRecord insertRecord = new CdcRecord(RowKind.INSERT, fields);
            CdcRecord deleteRecord = new CdcRecord(RowKind.DELETE, fields);
            extractor.setRecord(random.nextBoolean() ? insertRecord : deleteRecord);
            partition = extractor.partition();
            int bucket = extractor.bucket();
            Assertions.assertThat((int)channelComputer.channel(insertRecord)).isEqualTo(channelComputer.channel(partition, bucket));
            Assertions.assertThat((int)channelComputer.channel(deleteRecord)).isEqualTo(channelComputer.channel(partition, bucket));
        }
        int numTests = random.nextInt(10) + 1;
        for (int test = 0; test < numTests; ++test) {
            HashMap<Integer, Integer> bucketsPerChannel = new HashMap<Integer, Integer>();
            for (int i = 0; i < numChannels; ++i) {
                bucketsPerChannel.put(i, 0);
            }
            Map<String, String> fields = input.get(random.nextInt(input.size()));
            extractor.setRecord(new CdcRecord(RowKind.INSERT, fields));
            partition = extractor.partition();
            int numBuckets = random.nextInt(numChannels * 4) + 1;
            for (int i = 0; i < numBuckets; ++i) {
                int channel = channelComputer.channel(partition, i);
                bucketsPerChannel.compute(channel, (k, v) -> v + 1);
            }
            int max = (Integer)bucketsPerChannel.values().stream().max(Integer::compareTo).get();
            int min = (Integer)bucketsPerChannel.values().stream().min(Integer::compareTo).get();
            Assertions.assertThat((int)(max - min)).isLessThanOrEqualTo(1);
        }
    }
}

