/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.sink.RowDataChannelComputer;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.sink.FixedBucketRowKeyExtractor;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class RowDataChannelComputerTest {
    @TempDir
    java.nio.file.Path tempDir;

    @Test
    public void testSchemaWithPartition() throws Exception {
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT(), DataTypes.BIGINT(), DataTypes.DOUBLE()}, (String[])new String[]{"pt", "k", "v"});
        SchemaManager schemaManager = new SchemaManager((FileIO)LocalFileIO.create(), new Path(this.tempDir.toString()));
        TableSchema schema = schemaManager.createTable(new Schema(rowType.getFields(), Collections.singletonList("pt"), Arrays.asList("pt", "k"), Collections.singletonMap("bucket", "1"), ""));
        ThreadLocalRandom random = ThreadLocalRandom.current();
        int numInputs = random.nextInt(1000) + 1;
        ArrayList<InternalRow> input = new ArrayList<InternalRow>();
        for (int i = 0; i < numInputs; ++i) {
            input.add((InternalRow)GenericRow.of((Object[])new Object[]{random.nextInt(10) + 1, random.nextLong(), random.nextDouble()}));
        }
        this.testImpl(schema, input);
    }

    @Test
    public void testSchemaNoPartition() throws Exception {
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.BIGINT(), DataTypes.DOUBLE()}, (String[])new String[]{"k", "v"});
        SchemaManager schemaManager = new SchemaManager((FileIO)LocalFileIO.create(), new Path(this.tempDir.toString()));
        TableSchema schema = schemaManager.createTable(new Schema(rowType.getFields(), Collections.emptyList(), Collections.singletonList("k"), Collections.singletonMap("bucket", "1"), ""));
        ThreadLocalRandom random = ThreadLocalRandom.current();
        int numInputs = random.nextInt(1000) + 1;
        ArrayList<InternalRow> input = new ArrayList<InternalRow>();
        for (int i = 0; i < numInputs; ++i) {
            input.add((InternalRow)GenericRow.of((Object[])new Object[]{random.nextLong(), random.nextDouble()}));
        }
        this.testImpl(schema, input);
    }

    private void testImpl(TableSchema schema, List<InternalRow> input) {
        ThreadLocalRandom random = ThreadLocalRandom.current();
        FixedBucketRowKeyExtractor extractor = new FixedBucketRowKeyExtractor(schema);
        int numChannels = random.nextInt(10) + 1;
        boolean hasLogSink = random.nextBoolean();
        RowDataChannelComputer channelComputer = new RowDataChannelComputer(schema, hasLogSink);
        channelComputer.setup(numChannels);
        for (InternalRow rowData : input) {
            extractor.setRecord(rowData);
            BinaryRow partition = extractor.partition();
            int bucket = extractor.bucket();
            Assertions.assertThat((int)channelComputer.channel(rowData)).isEqualTo(channelComputer.channel(partition, bucket));
        }
        int numTests = random.nextInt(10) + 1;
        for (int test = 0; test < numTests; ++test) {
            HashMap<Integer, Integer> bucketsPerChannel = new HashMap<Integer, Integer>();
            for (int i = 0; i < numChannels; ++i) {
                bucketsPerChannel.put(i, 0);
            }
            extractor.setRecord(input.get(random.nextInt(input.size())));
            BinaryRow partition = extractor.partition();
            int numBuckets = random.nextInt(numChannels * 4) + 1;
            for (int i = 0; i < numBuckets; ++i) {
                int channel = channelComputer.channel(partition, i);
                bucketsPerChannel.compute(channel, (k, v) -> v + 1);
            }
            int max = (Integer)bucketsPerChannel.values().stream().max(Integer::compareTo).get();
            int min = (Integer)bucketsPerChannel.values().stream().min(Integer::compareTo).get();
            Assertions.assertThat((int)(max - min)).isLessThanOrEqualTo(1);
        }
        if (hasLogSink) {
            HashMap<Integer, Set> channelsPerBucket = new HashMap<Integer, Set>();
            for (InternalRow rowData : input) {
                extractor.setRecord(rowData);
                int bucket = extractor.bucket();
                channelsPerBucket.computeIfAbsent(bucket, k -> new HashSet()).add(channelComputer.channel(rowData));
            }
            for (Set channels : channelsPerBucket.values()) {
                Assertions.assertThat((Collection)channels).hasSize(1);
            }
        }
    }
}

