/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.common.table.read;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.testutils.reader.DataGenerationPlan;
import org.apache.hudi.common.testutils.reader.HoodieFileGroupReaderTestHarness;
import org.apache.hudi.common.testutils.reader.HoodieFileSliceTestUtils;
import org.apache.hudi.common.testutils.reader.HoodieRecordTestPayload;
import org.apache.hudi.common.testutils.reader.HoodieTestReaderContext;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

public class TestCustomMerger
extends HoodieFileGroupReaderTestHarness {
    @Override
    protected Properties getMetaProps() {
        Properties metaProps = super.getMetaProps();
        metaProps.setProperty(HoodieTableConfig.RECORD_MERGE_MODE.key(), RecordMergeMode.CUSTOM.name());
        metaProps.setProperty(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key(), "KEEP_CERTAIN_TIMESTAMP_VALUE_ONLY");
        return metaProps;
    }

    @BeforeAll
    public static void setUp() throws IOException {
        readerContext = new HoodieTestReaderContext(Option.of((Object)new CustomAvroMerger()), Option.of((Object)HoodieRecordTestPayload.class.getName()));
        properties.setProperty("hoodie.write.record.merge.mode", RecordMergeMode.CUSTOM.name());
        keyRanges = Arrays.asList(new HoodieFileSliceTestUtils.KeyRange(1, 10), new HoodieFileSliceTestUtils.KeyRange(1, 5), new HoodieFileSliceTestUtils.KeyRange(1, 3), new HoodieFileSliceTestUtils.KeyRange(6, 8), new HoodieFileSliceTestUtils.KeyRange(1, 10));
        timestamps = Arrays.asList(2L, 3L, 4L, 1L, 9L);
        operationTypes = Arrays.asList(DataGenerationPlan.OperationType.INSERT, DataGenerationPlan.OperationType.DELETE, DataGenerationPlan.OperationType.UPDATE, DataGenerationPlan.OperationType.DELETE, DataGenerationPlan.OperationType.UPDATE);
        instantTimes = Arrays.asList("001", "002", "003", "004", "005");
        shouldWritePositions = Arrays.asList(false, false, false, false, false);
    }

    @BeforeEach
    public void initialize() throws Exception {
        this.setTableName(TestCustomMerger.class.getName());
        this.initPath(this.tableName);
        this.initMetaClient();
        this.initTestDataGenerator(new String[]{"any-partition-path"});
        testTable = HoodieTestTable.of(this.metaClient);
        this.setUpMockCommits();
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testWithOneLogFile(boolean useRecordPositions) throws IOException, InterruptedException {
        shouldWritePositions = Arrays.asList(useRecordPositions, useRecordPositions);
        ClosableIterator<IndexedRecord> iterator = this.getFileGroupIterator(2, useRecordPositions);
        List<String> leftKeysExpected = Arrays.asList("6", "7", "8", "9", "10");
        ArrayList<String> leftKeysActual = new ArrayList<String>();
        while (iterator.hasNext()) {
            leftKeysActual.add(((IndexedRecord)iterator.next()).get(HoodieTestDataGenerator.AVRO_SCHEMA.getField("_row_key").pos()).toString());
        }
        Assertions.assertEquals(leftKeysExpected, leftKeysActual);
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testWithTwoLogFiles(boolean useRecordPositions) throws IOException, InterruptedException {
        shouldWritePositions = Arrays.asList(useRecordPositions, useRecordPositions, useRecordPositions);
        ClosableIterator<IndexedRecord> iterator = this.getFileGroupIterator(3, useRecordPositions);
        List<String> leftKeysExpected = Arrays.asList("1", "3", "6", "7", "8", "9", "10");
        ArrayList<String> leftKeysActual = new ArrayList<String>();
        while (iterator.hasNext()) {
            leftKeysActual.add(((IndexedRecord)iterator.next()).get(HoodieTestDataGenerator.AVRO_SCHEMA.getField("_row_key").pos()).toString());
        }
        Assertions.assertEquals(leftKeysExpected, leftKeysActual);
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testWithThreeLogFiles(boolean useRecordPositions) throws IOException, InterruptedException {
        shouldWritePositions = Arrays.asList(useRecordPositions, useRecordPositions, useRecordPositions, useRecordPositions);
        ClosableIterator<IndexedRecord> iterator = this.getFileGroupIterator(4, useRecordPositions);
        List<String> leftKeysExpected = Arrays.asList("1", "3", "7", "9", "10");
        ArrayList<String> leftKeysActual = new ArrayList<String>();
        while (iterator.hasNext()) {
            leftKeysActual.add(((IndexedRecord)iterator.next()).get(HoodieTestDataGenerator.AVRO_SCHEMA.getField("_row_key").pos()).toString());
        }
        Assertions.assertEquals(leftKeysExpected, leftKeysActual);
    }

    @Test
    public void testWithFourLogFiles() throws IOException, InterruptedException {
        ClosableIterator<IndexedRecord> iterator = this.getFileGroupIterator(5);
        List<String> leftKeysExpected = Arrays.asList("1", "3", "5", "7", "9");
        ArrayList<String> leftKeysActual = new ArrayList<String>();
        while (iterator.hasNext()) {
            leftKeysActual.add(((IndexedRecord)iterator.next()).get(HoodieTestDataGenerator.AVRO_SCHEMA.getField("_row_key").pos()).toString());
        }
        Assertions.assertEquals(leftKeysExpected, leftKeysActual);
    }

    @ParameterizedTest
    @MethodSource(value={"testArgs"})
    public void testPositionMergeFallback(boolean log1haspositions, boolean log2haspositions, boolean log3haspositions, boolean log4haspositions) throws IOException, InterruptedException {
        shouldWritePositions = Arrays.asList(true, log1haspositions, log2haspositions, log3haspositions, log4haspositions);
        ClosableIterator<IndexedRecord> iterator = this.getFileGroupIterator(5, true);
        List<String> leftKeysExpected = Arrays.asList("1", "3", "5", "7", "9");
        ArrayList<String> leftKeysActual = new ArrayList<String>();
        while (iterator.hasNext()) {
            leftKeysActual.add(((IndexedRecord)iterator.next()).get(HoodieTestDataGenerator.AVRO_SCHEMA.getField("_row_key").pos()).toString());
        }
        Assertions.assertEquals(leftKeysExpected, leftKeysActual);
    }

    private static Stream<Arguments> testArgs() {
        Stream.Builder<Arguments> b = Stream.builder();
        for (int i = 0; i < 16; ++i) {
            b.add(Arguments.of((Object[])new Object[]{i % 2 == 0, i / 2 % 2 == 0, i / 4 % 2 == 0, i / 8 % 2 == 0}));
        }
        return b.build();
    }

    public static class CustomAvroMerger
    implements HoodieRecordMerger {
        public static final String KEEP_CERTAIN_TIMESTAMP_VALUE_ONLY = "KEEP_CERTAIN_TIMESTAMP_VALUE_ONLY";
        public static final String TIMESTAMP = "timestamp";

        public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws IOException {
            if (newer.getOrderingValue(newSchema, (Properties)props).compareTo(older.getOrderingValue(oldSchema, (Properties)props)) >= 0) {
                if (newer.isDelete(newSchema, (Properties)props)) {
                    return Option.empty();
                }
                int id = Integer.parseInt(((IndexedRecord)((HoodieAvroIndexedRecord)newer).getData()).get(newSchema.getField("_row_key").pos()).toString());
                if ((long)(id % 2) == 1L) {
                    return Option.of((Object)Pair.of((Object)newer, (Object)newSchema));
                }
            } else {
                if (older.isDelete(oldSchema, (Properties)props)) {
                    return Option.empty();
                }
                int id = Integer.parseInt(((IndexedRecord)((HoodieAvroIndexedRecord)older).getData()).get(oldSchema.getField("_row_key").pos()).toString());
                if ((long)(id % 2) == 1L) {
                    return Option.of((Object)Pair.of((Object)older, (Object)oldSchema));
                }
            }
            return Option.empty();
        }

        public boolean shouldFlush(HoodieRecord record, Schema schema, TypedProperties props) {
            long timestamp = (Long)((IndexedRecord)((HoodieAvroIndexedRecord)record).getData()).get(schema.getField(TIMESTAMP).pos());
            return timestamp % 3L == 0L;
        }

        public HoodieRecord.HoodieRecordType getRecordType() {
            return HoodieRecord.HoodieRecordType.AVRO;
        }

        public String getMergingStrategy() {
            return KEEP_CERTAIN_TIMESTAMP_VALUE_ONLY;
        }
    }
}

