/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.data.input.impl;

import java.io.IOException;
import java.util.NoSuchElementException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.InputStats;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.joda.time.DateTime;

public class TimedShutoffInputSourceReader
implements InputSourceReader {
    private static final Logger LOG = new Logger(TimedShutoffInputSourceReader.class);
    private final InputSourceReader delegate;
    private final DateTime shutoffTime;

    public TimedShutoffInputSourceReader(InputSourceReader delegate, DateTime shutoffTime) {
        this.delegate = delegate;
        this.shutoffTime = shutoffTime;
    }

    @Override
    public CloseableIterator<InputRow> read(InputStats inputStats) throws IOException {
        ScheduledExecutorService shutdownExec = Execs.scheduledSingleThreaded("timed-shutoff-reader-%d");
        CloseableIterator<InputRow> delegateIterator = this.delegate.read(inputStats);
        return this.decorateShutdownTimeout(shutdownExec, delegateIterator);
    }

    @Override
    public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException {
        ScheduledExecutorService shutdownExec = Execs.scheduledSingleThreaded("timed-shutoff-reader-%d");
        CloseableIterator<InputRowListPlusRawValues> delegateIterator = this.delegate.sample();
        return this.decorateShutdownTimeout(shutdownExec, delegateIterator);
    }

    private <T> CloseableIterator<T> decorateShutdownTimeout(ScheduledExecutorService exec, final CloseableIterator<T> delegateIterator) {
        final Closer closer = Closer.create();
        closer.register(delegateIterator);
        closer.register(exec::shutdownNow);
        CloseableIterator wrappingIterator = new CloseableIterator<T>(){
            volatile boolean closed;
            T next = null;

            @Override
            public boolean hasNext() {
                if (this.next != null) {
                    return true;
                }
                if (!this.closed && delegateIterator.hasNext()) {
                    this.next = delegateIterator.next();
                    return true;
                }
                return false;
            }

            @Override
            public T next() {
                if (this.next != null) {
                    Object returnValue = this.next;
                    this.next = null;
                    return returnValue;
                }
                throw new NoSuchElementException();
            }

            @Override
            public void close() throws IOException {
                this.closed = true;
                closer.close();
            }
        };
        exec.schedule(() -> {
            LOG.info("Closing delegate inputSource.", new Object[0]);
            try {
                wrappingIterator.close();
            }
            catch (IOException e) {
                LOG.warn(e, "Failed to close delegate inputSource, ignoring.", new Object[0]);
            }
        }, this.shutoffTime.getMillis() - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        return wrappingIterator;
    }
}

