/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.execution;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.InProcessTimeGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.queue.HoodieConsumer;
import org.apache.hudi.common.util.queue.SimpleExecutor;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

public class TestSimpleExecutionInSpark
extends HoodieSparkClientTestHarness {
    private final String instantTime = InProcessTimeGenerator.createNewInstantTime();

    @BeforeEach
    public void setUp() throws Exception {
        this.initTestDataGenerator();
        this.initExecutorServiceWithFixedThreadPool(2);
    }

    @AfterEach
    public void tearDown() throws Exception {
        this.cleanupResources();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testExecutor() {
        List hoodieRecords = this.dataGen.generateInserts(this.instantTime, Integer.valueOf(128));
        final ArrayList consumedRecords = new ArrayList();
        HoodieConsumer<HoodieRecord, Integer> consumer = new HoodieConsumer<HoodieRecord, Integer>(){
            private int count = 0;

            public void consume(HoodieRecord record) throws Exception {
                consumedRecords.add(record);
                ++this.count;
            }

            public Integer finish() {
                return this.count;
            }
        };
        SimpleExecutor exec = null;
        try {
            exec = new SimpleExecutor(hoodieRecords.iterator(), (HoodieConsumer)consumer, Function.identity());
            int result = (Integer)exec.execute();
            Assertions.assertEquals((int)128, (int)result);
            Assertions.assertEquals((Object)hoodieRecords, consumedRecords);
        }
        finally {
            if (exec != null) {
                exec.shutdownNow();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Timeout(value=60L)
    public void testRecordReading() {
        List hoodieRecords = this.dataGen.generateInserts(this.instantTime, Integer.valueOf(100));
        ArrayList beforeRecord = new ArrayList();
        ArrayList beforeIndexedRecord = new ArrayList();
        final ArrayList afterRecord = new ArrayList();
        final ArrayList afterIndexedRecord = new ArrayList();
        hoodieRecords.forEach(record -> {
            HoodieAvroRecord originalRecord = (HoodieAvroRecord)record;
            beforeRecord.add(originalRecord);
            try {
                Option originalInsertValue = originalRecord.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA);
                beforeIndexedRecord.add(originalInsertValue.get());
            }
            catch (IOException iOException) {
                // empty catch block
            }
        });
        HoodieConsumer<HoodieRecord, Integer> consumer = new HoodieConsumer<HoodieRecord, Integer>(){
            private int count = 0;

            public void consume(HoodieRecord record) throws Exception {
                ++this.count;
                afterRecord.add((HoodieAvroRecord)record);
                try {
                    IndexedRecord indexedRecord = (IndexedRecord)((HoodieAvroRecord)record).getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA).get();
                    afterIndexedRecord.add(indexedRecord);
                }
                catch (IOException iOException) {
                    // empty catch block
                }
            }

            public Integer finish() {
                return this.count;
            }
        };
        SimpleExecutor exec = null;
        try {
            exec = new SimpleExecutor(hoodieRecords.iterator(), (HoodieConsumer)consumer, Function.identity());
            int result = (Integer)exec.execute();
            Assertions.assertEquals((int)100, (int)result);
            Assertions.assertEquals(beforeRecord, afterRecord);
            Assertions.assertEquals(beforeIndexedRecord, afterIndexedRecord);
        }
        finally {
            if (exec != null) {
                exec.shutdownNow();
            }
        }
    }

    @Test
    @Timeout(value=60L)
    public void testException() {
        int numRecords = 1000;
        String errorMessage = "Exception when iterating records!!!";
        List pRecs = this.dataGen.generateInserts(this.instantTime, Integer.valueOf(1000));
        InnerIterator iterator = new InnerIterator(pRecs.iterator(), "Exception when iterating records!!!", 100);
        HoodieConsumer<HoodieRecord, Integer> consumer = new HoodieConsumer<HoodieRecord, Integer>(){
            int count = 0;

            public void consume(HoodieRecord payload) throws Exception {
                ++this.count;
            }

            public Integer finish() {
                return this.count;
            }
        };
        SimpleExecutor exec = new SimpleExecutor((Iterator)iterator, (HoodieConsumer)consumer, Function.identity());
        Throwable thrown = Assertions.assertThrows(HoodieException.class, () -> ((SimpleExecutor)exec).execute(), (String)"exception is expected");
        Assertions.assertTrue((boolean)thrown.getMessage().contains("Exception when iterating records!!!"));
    }

    class InnerIterator
    implements Iterator<HoodieRecord> {
        private Iterator<HoodieRecord> iterator;
        private AtomicInteger count = new AtomicInteger(0);
        private String errorMessage;
        private int errorMessageCount;

        public InnerIterator(Iterator<HoodieRecord> iterator, String errorMessage, int errorMessageCount) {
            this.iterator = iterator;
            this.errorMessage = errorMessage;
            this.errorMessageCount = errorMessageCount;
        }

        @Override
        public boolean hasNext() {
            return this.iterator.hasNext();
        }

        @Override
        public HoodieRecord next() {
            if (this.count.get() == this.errorMessageCount) {
                throw new HoodieException(this.errorMessage);
            }
            this.count.incrementAndGet();
            return this.iterator.next();
        }
    }
}

