/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.execution;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.util.queue.DisruptorExecutor;
import org.apache.hudi.common.util.queue.ExecutorType;
import org.apache.hudi.common.util.queue.HoodieConsumer;
import org.apache.hudi.common.util.queue.WaitStrategyFactory;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
import org.apache.spark.TaskContext;
import org.apache.spark.TaskContext$;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

public class TestDisruptorExecutionInSpark
extends HoodieSparkClientTestHarness {
    private final String instantTime = HoodieActiveTimeline.createNewInstantTime();
    private final HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withExecutorType(ExecutorType.DISRUPTOR.name()).withWriteExecutorDisruptorWriteBufferLimitBytes(8L).build(false);

    @BeforeEach
    public void setUp() throws Exception {
        this.initTestDataGenerator();
        this.initExecutorServiceWithFixedThreadPool(2);
    }

    @AfterEach
    public void tearDown() throws Exception {
        this.cleanupResources();
    }

    private Runnable getPreExecuteRunnable() {
        TaskContext taskContext = TaskContext.get();
        return () -> TaskContext$.MODULE$.setTaskContext(taskContext);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testExecutor() {
        List hoodieRecords = this.dataGen.generateInserts(this.instantTime, Integer.valueOf(128));
        final ArrayList consumedRecords = new ArrayList();
        HoodieConsumer<HoodieRecord, Integer> consumer = new HoodieConsumer<HoodieRecord, Integer>(){
            private int count = 0;

            public void consume(HoodieRecord record) {
                consumedRecords.add(record);
                ++this.count;
            }

            public Integer finish() {
                return this.count;
            }
        };
        DisruptorExecutor exec = null;
        try {
            exec = new DisruptorExecutor(this.writeConfig.getWriteExecutorDisruptorWriteBufferLimitBytes(), hoodieRecords.iterator(), (HoodieConsumer)consumer, Function.identity(), WaitStrategyFactory.DEFAULT_STRATEGY, this.getPreExecuteRunnable());
            int result = (Integer)exec.execute();
            Assertions.assertEquals((int)128, (int)result);
            Assertions.assertFalse((boolean)exec.isRunning());
            Assertions.assertEquals((Object)hoodieRecords, consumedRecords);
            for (int i = 0; i < hoodieRecords.size(); ++i) {
                Assertions.assertEquals(hoodieRecords.get(i), consumedRecords.get(i));
            }
        }
        finally {
            if (exec != null) {
                exec.shutdownNow();
            }
        }
    }

    @Test
    @Timeout(value=60L)
    public void testInterruptExecutor() {
        List hoodieRecords = this.dataGen.generateInserts(this.instantTime, Integer.valueOf(100));
        HoodieConsumer<HoodieRecord, Integer> consumer = new HoodieConsumer<HoodieRecord, Integer>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void consume(HoodieRecord record) {
                try {
                    2 var2_2 = this;
                    synchronized (var2_2) {
                        this.wait();
                    }
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            }

            public Integer finish() {
                return 0;
            }
        };
        DisruptorExecutor executor = new DisruptorExecutor(Integer.valueOf(1024), hoodieRecords.iterator(), (HoodieConsumer)consumer, Function.identity(), WaitStrategyFactory.DEFAULT_STRATEGY, this.getPreExecuteRunnable());
        try {
            Thread.currentThread().interrupt();
            Assertions.assertThrows(HoodieException.class, () -> ((DisruptorExecutor)executor).execute());
            Assertions.assertTrue((boolean)Thread.interrupted());
        }
        catch (Exception exception) {
            // empty catch block
        }
    }
}

