/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.common.util.queue;

import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.hudi.common.util.CustomizedThreadFactory;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.queue.HoodieConsumer;
import org.apache.hudi.common.util.queue.HoodieMessageQueue;
import org.apache.hudi.common.util.queue.WaitStrategyFactory;
import org.apache.hudi.exception.HoodieException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DisruptorMessageQueue<I, O>
implements HoodieMessageQueue<I, O> {
    private static final Logger LOG = LoggerFactory.getLogger(DisruptorMessageQueue.class);
    private final Disruptor<HoodieDisruptorEvent> queue;
    private final Function<I, O> transformFunction;
    private final RingBuffer<HoodieDisruptorEvent> ringBuffer;
    private AtomicReference<Throwable> throwable = new AtomicReference<Object>(null);
    private boolean isShutdown = false;
    private boolean isStarted = false;
    private static final long TIMEOUT_WAITING_SECS = 10L;

    public DisruptorMessageQueue(int bufferSize, Function<I, O> transformFunction, String waitStrategyId, int totalProducers, Runnable preExecuteRunnable) {
        WaitStrategy waitStrategy = WaitStrategyFactory.build(waitStrategyId);
        CustomizedThreadFactory threadFactory = new CustomizedThreadFactory("disruptor", true, preExecuteRunnable);
        this.queue = new Disruptor<HoodieDisruptorEvent>(() -> new HoodieDisruptorEvent(), bufferSize, threadFactory, totalProducers > 1 ? ProducerType.MULTI : ProducerType.SINGLE, waitStrategy);
        this.ringBuffer = this.queue.getRingBuffer();
        this.transformFunction = transformFunction;
    }

    @Override
    public long size() {
        return (long)this.ringBuffer.getBufferSize() - this.ringBuffer.remainingCapacity();
    }

    @Override
    public void insertRecord(I value) throws Exception {
        if (!this.isStarted) {
            throw new HoodieException("Can't insert into the queue since the queue is not started yet");
        }
        if (this.isShutdown) {
            throw new HoodieException("Can't insert into the queue after it had already been closed");
        }
        Object applied = this.transformFunction.apply(value);
        EventTranslator<HoodieDisruptorEvent> translator = (event, sequence) -> event.set(applied);
        this.queue.getRingBuffer().publishEvent(translator);
    }

    @Override
    public Option<O> readNextRecord() {
        throw new UnsupportedOperationException("Should not call readNextRecord here. And let DisruptorMessageHandler to handle consuming logic");
    }

    @Override
    public void markAsFailed(Throwable e) {
        this.throwable.compareAndSet(null, e);
    }

    @Override
    public Throwable getThrowable() {
        return this.throwable.get();
    }

    @Override
    public boolean isEmpty() {
        return (long)this.ringBuffer.getBufferSize() == this.ringBuffer.remainingCapacity();
    }

    @Override
    public void seal() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        DisruptorMessageQueue disruptorMessageQueue = this;
        synchronized (disruptorMessageQueue) {
            if (!this.isShutdown) {
                this.isShutdown = true;
                this.isStarted = false;
                if (Thread.currentThread().isInterrupted()) {
                    LOG.error("Disruptor Queue has been interrupted! Shutdown now.");
                    try {
                        this.queue.shutdown(10L, TimeUnit.SECONDS);
                    }
                    catch (TimeoutException e) {
                        LOG.error("Disruptor queue shutdown timeout: " + e);
                        throw new HoodieException(e);
                    }
                    throw new HoodieException("Disruptor Queue has been interrupted! Shutdown now.");
                }
                this.queue.shutdown();
            }
        }
    }

    protected void setHandlers(HoodieConsumer<O, ?> consumer) {
        this.queue.handleEventsWith((event, sequence, endOfBatch) -> {
            try {
                consumer.consume(event.get());
            }
            catch (Exception e) {
                LOG.error("Failed consuming records", (Throwable)e);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void start() {
        DisruptorMessageQueue disruptorMessageQueue = this;
        synchronized (disruptorMessageQueue) {
            if (!this.isStarted) {
                this.queue.start();
                this.isStarted = true;
            }
        }
    }

    class HoodieDisruptorEvent {
        private O value;

        HoodieDisruptorEvent() {
        }

        public void set(O value) {
            this.value = value;
        }

        public O get() {
            return this.value;
        }
    }
}

