/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.execution;

import java.util.Iterator;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.InProcessTimeGenerator;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.ExecutorType;
import org.apache.hudi.common.util.queue.HoodieConsumer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.execution.HoodieLazyInsertIterable;
import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
import org.apache.spark.TaskContext;
import org.apache.spark.TaskContext$;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class TestBoundedInMemoryExecutorInSpark
extends HoodieSparkClientTestHarness {
    private final String instantTime = InProcessTimeGenerator.createNewInstantTime();
    private final HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withExecutorType(ExecutorType.BOUNDED_IN_MEMORY.name()).withWriteBufferLimitBytes(1024).build(false);

    @BeforeEach
    public void setUp() throws Exception {
        this.initTestDataGenerator();
    }

    @AfterEach
    public void tearDown() throws Exception {
        this.cleanupResources();
    }

    private Runnable getPreExecuteRunnable() {
        TaskContext taskContext = TaskContext.get();
        return () -> TaskContext$.MODULE$.setTaskContext(taskContext);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testExecutor() {
        int recordNumber = 100;
        List hoodieRecords = this.dataGen.generateInserts(this.instantTime, Integer.valueOf(100));
        HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> consumer = new HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>(){
            private int count = 0;

            public void consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> record) {
                ++this.count;
            }

            public Integer finish() {
                return this.count;
            }
        };
        BoundedInMemoryExecutor executor = null;
        try {
            executor = new BoundedInMemoryExecutor((long)this.writeConfig.getWriteBufferLimitBytes(), hoodieRecords.iterator(), (HoodieConsumer)consumer, HoodieLazyInsertIterable.getTransformerInternal((Schema)HoodieTestDataGenerator.AVRO_SCHEMA, (HoodieWriteConfig)this.writeConfig), this.getPreExecuteRunnable());
            int result = (Integer)executor.execute();
            Assertions.assertEquals((int)100, (int)result);
            Assertions.assertFalse((boolean)executor.isRunning());
        }
        finally {
            if (executor != null) {
                executor.shutdownNow();
                executor.awaitTermination();
            }
        }
    }

    @Test
    public void testInterruptExecutor() {
        List hoodieRecords = this.dataGen.generateInserts(this.instantTime, Integer.valueOf(100));
        HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> consumer = new HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>(){

            public void consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> record) {
                try {
                    while (true) {
                        Thread.sleep(1000L);
                    }
                }
                catch (InterruptedException ie) {
                    return;
                }
            }

            public Integer finish() {
                return 0;
            }
        };
        BoundedInMemoryExecutor executor = new BoundedInMemoryExecutor((long)this.writeConfig.getWriteBufferLimitBytes(), hoodieRecords.iterator(), (HoodieConsumer)consumer, HoodieLazyInsertIterable.getTransformerInternal((Schema)HoodieTestDataGenerator.AVRO_SCHEMA, (HoodieWriteConfig)this.writeConfig), this.getPreExecuteRunnable());
        Thread.currentThread().interrupt();
        Assertions.assertThrows(HoodieException.class, () -> ((BoundedInMemoryExecutor)executor).execute());
        Assertions.assertTrue((boolean)Thread.interrupted());
        executor.shutdownNow();
        executor.awaitTermination();
    }

    @Test
    public void testExecutorTermination() {
        Iterator<HoodieRecord> unboundedRecordIter = new Iterator<HoodieRecord>(){

            @Override
            public boolean hasNext() {
                return true;
            }

            @Override
            public HoodieRecord next() {
                return (HoodieRecord)TestBoundedInMemoryExecutorInSpark.this.dataGen.generateInserts(TestBoundedInMemoryExecutorInSpark.this.instantTime, Integer.valueOf(1)).get(0);
            }
        };
        HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> consumer = new HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>(){

            public void consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> record) {
            }

            public Integer finish() {
                return 0;
            }
        };
        BoundedInMemoryExecutor executor = new BoundedInMemoryExecutor((long)this.writeConfig.getWriteBufferLimitBytes(), (Iterator)unboundedRecordIter, (HoodieConsumer)consumer, HoodieLazyInsertIterable.getTransformerInternal((Schema)HoodieTestDataGenerator.AVRO_SCHEMA, (HoodieWriteConfig)this.writeConfig), this.getPreExecuteRunnable());
        executor.shutdownNow();
        boolean terminatedGracefully = executor.awaitTermination();
        Assertions.assertTrue((boolean)terminatedGracefully);
    }
}

