/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.execution;

import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.queue.DisruptorExecutor;
import org.apache.hudi.common.util.queue.DisruptorMessageQueue;
import org.apache.hudi.common.util.queue.ExecutorType;
import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
import org.apache.hudi.common.util.queue.HoodieConsumer;
import org.apache.hudi.common.util.queue.HoodieMessageQueue;
import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
import org.apache.hudi.common.util.queue.WaitStrategyFactory;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.ExceptionUtil;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.execution.HoodieLazyInsertIterable;
import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
import org.apache.spark.TaskContext;
import org.apache.spark.TaskContext$;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

public class TestDisruptorMessageQueue
extends HoodieSparkClientTestHarness {
    private final String instantTime = HoodieActiveTimeline.createNewInstantTime();
    private final HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withExecutorType(ExecutorType.DISRUPTOR.name()).withWriteExecutorDisruptorWriteBufferLimitBytes(16L).build(false);

    @BeforeEach
    public void setUp() throws Exception {
        this.initTestDataGenerator();
        this.initExecutorServiceWithFixedThreadPool(2);
    }

    @AfterEach
    public void tearDown() throws Exception {
        this.cleanupResources();
    }

    private Runnable getPreExecuteRunnable() {
        TaskContext taskContext = TaskContext.get();
        return () -> TaskContext$.MODULE$.setTaskContext(taskContext);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Timeout(value=60L)
    public void testRecordReading() throws Exception {
        List hoodieRecords = this.dataGen.generateInserts(this.instantTime, Integer.valueOf(100));
        ArrayList beforeRecord = new ArrayList();
        ArrayList beforeIndexedRecord = new ArrayList();
        final ArrayList afterRecord = new ArrayList();
        final ArrayList afterIndexedRecord = new ArrayList();
        hoodieRecords.forEach(record -> {
            HoodieAvroRecord originalRecord = (HoodieAvroRecord)record;
            beforeRecord.add(originalRecord);
            try {
                Option originalInsertValue = originalRecord.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA);
                beforeIndexedRecord.add(originalInsertValue.get());
            }
            catch (IOException iOException) {
                // empty catch block
            }
        });
        HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> consumer = new HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>(){
            private int count = 0;

            public void consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> record) {
                ++this.count;
                afterRecord.add((HoodieAvroRecord)record.getResult());
                try {
                    IndexedRecord indexedRecord = (IndexedRecord)((HoodieAvroRecord)record.getResult()).getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA).get();
                    afterIndexedRecord.add(indexedRecord);
                }
                catch (IOException iOException) {
                    // empty catch block
                }
            }

            public Integer finish() {
                return this.count;
            }
        };
        DisruptorExecutor exec = null;
        try {
            exec = new DisruptorExecutor(this.writeConfig.getWriteExecutorDisruptorWriteBufferLimitBytes(), hoodieRecords.iterator(), (HoodieConsumer)consumer, HoodieLazyInsertIterable.getTransformerInternal((Schema)HoodieTestDataGenerator.AVRO_SCHEMA, (HoodieWriteConfig)this.writeConfig), WaitStrategyFactory.DEFAULT_STRATEGY, this.getPreExecuteRunnable());
            int result = (Integer)exec.execute();
            Assertions.assertEquals((int)100, (int)result);
            Assertions.assertFalse((boolean)exec.isRunning());
            Assertions.assertEquals(beforeRecord, afterRecord);
            Assertions.assertEquals(beforeIndexedRecord, afterIndexedRecord);
        }
        finally {
            if (exec != null) {
                exec.shutdownNow();
            }
        }
    }

    @Test
    @Timeout(value=60L)
    public void testCompositeProducerRecordReading() throws Exception {
        int numRecords = 1000;
        int numProducers = 40;
        ArrayList<List> recs = new ArrayList<List>();
        DisruptorMessageQueue queue = new DisruptorMessageQueue(1024, HoodieLazyInsertIterable.getTransformerInternal((Schema)HoodieTestDataGenerator.AVRO_SCHEMA, (HoodieWriteConfig)this.writeConfig), "BLOCKING_WAIT", 40, new Runnable(){

            @Override
            public void run() {
            }
        });
        final HashMap<String, Pair> keyToProducerAndIndexMap = new HashMap<String, Pair>();
        for (int i = 0; i < 40; ++i) {
            List pRecs = this.dataGen.generateInserts(this.instantTime, Integer.valueOf(1000));
            int j = 0;
            for (HoodieRecord r : pRecs) {
                Assertions.assertFalse((boolean)keyToProducerAndIndexMap.containsKey(r.getRecordKey()));
                keyToProducerAndIndexMap.put(r.getRecordKey(), Pair.of((Object)i, (Object)j));
                ++j;
            }
            recs.add(pRecs);
        }
        ArrayList<IteratorBasedQueueProducer> producers = new ArrayList<IteratorBasedQueueProducer>();
        for (int i = 0; i < recs.size(); ++i) {
            IteratorBasedQueueProducer producer2;
            List r = (List)recs.get(i);
            if (i % 2 == 0) {
                producer2 = new IteratorBasedQueueProducer(r.iterator());
                producers.add(producer2);
                continue;
            }
            producer2 = new FunctionBasedQueueProducer(buf -> {
                Iterator itr = r.iterator();
                while (itr.hasNext()) {
                    try {
                        buf.insertRecord(itr.next());
                    }
                    catch (Exception e) {
                        throw new HoodieException((Throwable)e);
                    }
                }
                return true;
            });
            producers.add(producer2);
        }
        final Map lastSeenMap = IntStream.range(0, 40).boxed().collect(Collectors.toMap(Function.identity(), x -> -1));
        final Map countMap = IntStream.range(0, 40).boxed().collect(Collectors.toMap(Function.identity(), x -> 0));
        HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> consumer = new HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>(){

            public void consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> payload) {
                HoodieRecord rec = payload.getResult();
                Pair producerPos = (Pair)keyToProducerAndIndexMap.get(rec.getRecordKey());
                Integer lastSeenPos = (Integer)lastSeenMap.get(producerPos.getLeft());
                countMap.put(producerPos.getLeft(), (Integer)countMap.get(producerPos.getLeft()) + 1);
                lastSeenMap.put(producerPos.getLeft(), lastSeenPos + 1);
                Assertions.assertEquals((int)(lastSeenPos + 1), (int)((Integer)producerPos.getRight()));
            }

            public Integer finish() {
                return 0;
            }
        };
        Method setHandlersFunc = queue.getClass().getDeclaredMethod("setHandlers", HoodieConsumer.class);
        setHandlersFunc.setAccessible(true);
        setHandlersFunc.invoke((Object)queue, consumer);
        Method startFunc = queue.getClass().getDeclaredMethod("start", new Class[0]);
        startFunc.setAccessible(true);
        startFunc.invoke((Object)queue, new Object[0]);
        CompletableFuture<Void> producerFuture = CompletableFuture.allOf((CompletableFuture[])producers.stream().map(producer -> CompletableFuture.supplyAsync(() -> {
            try {
                producer.produce((HoodieMessageQueue)queue);
            }
            catch (Throwable e) {
                throw new HoodieException("Error producing records in disruptor executor", e);
            }
            return true;
        }, this.executorService)).toArray(CompletableFuture[]::new));
        producerFuture.get();
        queue.close();
        for (int i = 0; i < 40; ++i) {
            Assertions.assertEquals((Integer)1000, (Integer)countMap.get(i));
        }
    }

    @Test
    @Timeout(value=60L)
    public void testException() throws Exception {
        int numRecords = 1000;
        List pRecs = this.dataGen.generateInserts(this.instantTime, Integer.valueOf(1000));
        ArrayList<Object> producers = new ArrayList<Object>();
        for (int i = 0; i < 2; ++i) {
            if (i % 2 == 0) {
                producers.add(new IteratorBasedQueueProducer(pRecs.iterator()));
                continue;
            }
            producers.add(new FunctionBasedQueueProducer(buf -> {
                throw new HoodieException("Exception when produce records!!!");
            }));
        }
        HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> consumer = new HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>(){
            int count = 0;

            public void consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> payload) {
                HoodieRecord rec = payload.getResult();
                ++this.count;
            }

            public Integer finish() {
                return this.count;
            }
        };
        DisruptorExecutor exec = new DisruptorExecutor(1024, producers, (HoodieConsumer)consumer, HoodieLazyInsertIterable.getTransformerInternal((Schema)HoodieTestDataGenerator.AVRO_SCHEMA, (HoodieWriteConfig)this.writeConfig), WaitStrategyFactory.DEFAULT_STRATEGY, this.getPreExecuteRunnable());
        Throwable thrown = Assertions.assertThrows(HoodieException.class, () -> ((DisruptorExecutor)exec).execute(), (String)"exception is expected");
        Assertions.assertEquals((Object)"Exception when produce records!!!", (Object)ExceptionUtil.getRootCause((Throwable)thrown).getMessage());
    }
}

