/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.io.storage.row;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.storage.row.HoodieRowCreateHandle;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
import org.apache.hudi.testutils.SparkDatasetTestUtils;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public class TestHoodieRowCreateHandle
extends HoodieSparkClientTestHarness {
    private static final Random RANDOM = new Random();

    @BeforeEach
    public void setUp() throws Exception {
        this.initSparkContexts("TestHoodieRowCreateHandle");
        this.initPath();
        this.initHoodieStorage();
        this.initTestDataGenerator();
        this.initMetaClient();
        this.initTimelineService();
    }

    @AfterEach
    public void tearDown() throws Exception {
        this.cleanupResources();
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testRowCreateHandle(boolean populateMetaFields) throws Exception {
        HoodieWriteConfig config = SparkDatasetTestUtils.getConfigBuilder((String)this.basePath, (int)timelineServicePort).withPopulateMetaFields(populateMetaFields).build();
        HoodieSparkTable table = HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
        ArrayList<String> fileNames = new ArrayList<String>();
        ArrayList<String> fileAbsPaths = new ArrayList<String>();
        Dataset totalInputRows = null;
        for (int i = 0; i < 5; ++i) {
            String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[i % 3];
            String fileId = UUID.randomUUID().toString();
            String instantTime = "000";
            HoodieRowCreateHandle handle = new HoodieRowCreateHandle((HoodieTable)table, config, partitionPath, fileId, instantTime, RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(), SparkDatasetTestUtils.STRUCT_TYPE);
            int size = 10 + RANDOM.nextInt(1000);
            Dataset inputRows = SparkDatasetTestUtils.getRandomRows((SQLContext)this.sqlContext, (int)size, (String)partitionPath, (boolean)false);
            totalInputRows = totalInputRows == null ? inputRows : totalInputRows.union(inputRows);
            WriteStatus writeStatus = this.writeAndGetWriteStatus((Dataset<Row>)inputRows, handle);
            fileAbsPaths.add(this.basePath + "/" + writeStatus.getStat().getPath());
            fileNames.add(handle.getFileName());
            this.assertOutput(writeStatus, size, fileId, partitionPath, instantTime, (Dataset<Row>)totalInputRows, fileNames, fileAbsPaths, populateMetaFields);
        }
    }

    @Test
    public void testGlobalFailure() throws Exception {
        HoodieWriteConfig cfg = SparkDatasetTestUtils.getConfigBuilder((String)this.basePath, (int)timelineServicePort).build();
        HoodieSparkTable table = HoodieSparkTable.create((HoodieWriteConfig)cfg, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
        String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
        String fileId = UUID.randomUUID().toString();
        String instantTime = "000";
        HoodieRowCreateHandle handle = new HoodieRowCreateHandle((HoodieTable)table, cfg, partitionPath, fileId, instantTime, RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(), SparkDatasetTestUtils.STRUCT_TYPE);
        int size = 10 + RANDOM.nextInt(1000);
        int totalFailures = 5;
        Dataset inputRows = SparkDatasetTestUtils.getRandomRows((SQLContext)this.sqlContext, (int)(size / 2), (String)partitionPath, (boolean)false);
        List internalRows = SparkDatasetTestUtils.toInternalRows((Dataset)inputRows, (ExpressionEncoder)SparkDatasetTestUtils.ENCODER);
        for (int i = 0; i < totalFailures; ++i) {
            internalRows.add(SparkDatasetTestUtils.getInternalRowWithError((String)partitionPath));
        }
        Dataset inputRows2 = SparkDatasetTestUtils.getRandomRows((SQLContext)this.sqlContext, (int)(size / 2), (String)partitionPath, (boolean)false);
        internalRows.addAll(SparkDatasetTestUtils.toInternalRows((Dataset)inputRows2, (ExpressionEncoder)SparkDatasetTestUtils.ENCODER));
        try {
            for (InternalRow internalRow : internalRows) {
                handle.write(internalRow);
            }
            Assertions.fail((String)"Should have failed");
        }
        catch (Throwable throwable) {
            // empty catch block
        }
        WriteStatus writeStatus = handle.close();
        ArrayList<String> fileNames = new ArrayList<String>();
        fileNames.add(handle.getFileName());
        Assertions.assertNotNull((Object)writeStatus.getGlobalError());
        String expectedError = HoodieTestUtils.getJavaVersion() == 11 || HoodieTestUtils.getJavaVersion() == 17 ? "class java.lang.String cannot be cast to class org.apache.spark.unsafe.types.UTF8String" : "java.lang.String cannot be cast to org.apache.spark.unsafe.types.UTF8String";
        try {
            Assertions.assertTrue((boolean)writeStatus.getGlobalError().getMessage().contains(expectedError));
        }
        catch (Throwable e) {
            Assertions.fail((String)("Expected error to contain: " + expectedError + ", the actual error message: " + writeStatus.getGlobalError().getMessage()));
        }
        Assertions.assertEquals((Object)writeStatus.getFileId(), (Object)fileId);
        Assertions.assertEquals((Object)writeStatus.getPartitionPath(), (Object)partitionPath);
        Dataset result = this.sqlContext.read().parquet(this.basePath + "/" + partitionPath);
        this.assertRows((Dataset<Row>)inputRows, (Dataset<Row>)result, instantTime, fileNames, true);
    }

    private WriteStatus writeAndGetWriteStatus(Dataset<Row> inputRows, HoodieRowCreateHandle handle) throws Exception {
        List internalRows = SparkDatasetTestUtils.toInternalRows(inputRows, (ExpressionEncoder)SparkDatasetTestUtils.ENCODER);
        for (InternalRow internalRow : internalRows) {
            handle.write(internalRow);
        }
        return handle.close();
    }

    private void assertOutput(WriteStatus writeStatus, int size, String fileId, String partitionPath, String instantTime, Dataset<Row> inputRows, List<String> filenames, List<String> fileAbsPaths, boolean populateMetaFields) {
        Assertions.assertEquals((Object)writeStatus.getPartitionPath(), (Object)partitionPath);
        Assertions.assertEquals((long)writeStatus.getTotalRecords(), (long)size);
        Assertions.assertEquals((long)writeStatus.getTotalErrorRecords(), (long)0L);
        Assertions.assertEquals((long)writeStatus.getTotalErrorRecords(), (long)0L);
        Assertions.assertFalse((boolean)writeStatus.hasErrors());
        Assertions.assertNull((Object)writeStatus.getGlobalError());
        Assertions.assertEquals((Object)writeStatus.getFileId(), (Object)fileId);
        HoodieWriteStat writeStat = writeStatus.getStat();
        Assertions.assertEquals((long)size, (long)writeStat.getNumInserts());
        Assertions.assertEquals((long)size, (long)writeStat.getNumWrites());
        Assertions.assertEquals((Object)fileId, (Object)writeStat.getFileId());
        Assertions.assertEquals((Object)partitionPath, (Object)writeStat.getPartitionPath());
        Assertions.assertEquals((long)0L, (long)writeStat.getNumDeletes());
        Assertions.assertEquals((long)0L, (long)writeStat.getNumUpdateWrites());
        Assertions.assertEquals((long)0L, (long)writeStat.getTotalWriteErrors());
        Dataset result = this.sqlContext.read().parquet(fileAbsPaths.toArray(new String[0]));
        this.assertRows(inputRows, (Dataset<Row>)result, instantTime, filenames, populateMetaFields);
    }

    private void assertRows(Dataset<Row> expectedRows, Dataset<Row> actualRows, String instantTime, List<String> filenames, boolean populateMetaFields) {
        actualRows.collectAsList().forEach(entry -> {
            String commitTime = entry.getString(((Integer)HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)).intValue());
            String fileName = entry.getString(((Integer)HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.FILENAME_METADATA_FIELD)).intValue());
            String seqId = entry.getString(((Integer)HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD)).intValue());
            if (populateMetaFields) {
                Assertions.assertEquals((Object)instantTime, (Object)commitTime);
                Assertions.assertFalse((boolean)StringUtils.isNullOrEmpty((String)seqId));
                Assertions.assertTrue((boolean)filenames.contains(fileName));
            } else {
                Assertions.assertEquals((Object)"", (Object)commitTime);
                Assertions.assertEquals((Object)"", (Object)seqId);
                Assertions.assertEquals((Object)"", (Object)fileName);
            }
        });
        Dataset trimmedExpected = expectedRows.drop(new String[]{HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD});
        Dataset trimmedActual = actualRows.drop(new String[]{HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD});
        Assertions.assertEquals((long)0L, (long)trimmedActual.except(trimmedExpected).count());
    }
}

