/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.twister2.translation.wrappers;

import edu.iu.dsc.tws.api.tset.TSetContext;
import edu.iu.dsc.tws.api.tset.fn.BaseSourceFunc;
import java.io.IOException;
import java.io.ObjectStreamException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.logging.Logger;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.Environments;
import org.apache.beam.runners.core.construction.ReadTranslation;
import org.apache.beam.runners.core.construction.SdkComponents;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.twister2.Twister2TranslationContext;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.KeyForBottom;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.checkerframework.dataflow.qual.Pure;
import org.joda.time.Instant;

public class Twister2BoundedSource<@UnknownKeyFor T>
extends BaseSourceFunc<WindowedValue<T>> {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = Logger.getLogger(Twister2BoundedSource.class.getName());
    private transient @UnknownKeyFor @NonNull @Initialized BoundedSource<T> source;
    private @UnknownKeyFor @NonNull @Initialized int numPartitions;
    private @UnknownKeyFor @NonNull @Initialized long splitSize = 100L;
    private @UnknownKeyFor @NonNull @Initialized List<@KeyForBottom @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized Source<T>> partitionedSources;
    private @UnknownKeyFor @NonNull @Initialized Source<T> localPartition;
    private transient @UnknownKeyFor @NonNull @Initialized PipelineOptions options;
    private @UnknownKeyFor @NonNull @Initialized String serializedOptions;
    private transient @UnknownKeyFor @NonNull @Initialized Iterator<@UnknownKeyFor @NonNull @Initialized WindowedValue<T>> readerIterator;
    private static final @UnknownKeyFor @NonNull @Initialized long DEFAULT_BUNDLE_SIZE = 0x4000000L;
    private @UnknownKeyFor @NonNull @Initialized boolean isInitialized = false;
    private @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [] sourceBytes;

    private Twister2BoundedSource() {
        this.isInitialized = false;
    }

    public Twister2BoundedSource(@UnknownKeyFor @NonNull @Initialized BoundedSource<T> boundedSource, @UnknownKeyFor @NonNull @Initialized Twister2TranslationContext context, @UnknownKeyFor @NonNull @Initialized PipelineOptions options) {
        this.source = boundedSource;
        this.options = options;
        this.serializedOptions = new SerializablePipelineOptions(options).toString();
        SdkComponents components = SdkComponents.create();
        components.registerEnvironment(Environments.createOrGetDefaultEnvironment((PortablePipelineOptions)((PortablePipelineOptions)options.as(PortablePipelineOptions.class))));
        RunnerApi.FunctionSpec sourceProto = ReadTranslation.toProto(this.source);
        this.sourceBytes = sourceProto.getPayload().toByteArray();
    }

    public void prepare(@UnknownKeyFor @NonNull @Initialized TSetContext context) {
        this.initTransient();
        this.numPartitions = context.getParallelism();
        try {
            this.splitSize = this.source.getEstimatedSizeBytes(this.options) / (long)this.numPartitions;
        }
        catch (Exception e) {
            LOG.warning(String.format("Failed to get estimated bundle size for source %s, using default bundle size of %d bytes.", this.source.toString(), 0x4000000L));
        }
        int index = context.getIndex();
        ArrayList<Object> partitionList = new ArrayList<Object>();
        try {
            this.partitionedSources = this.source.split(this.splitSize, this.options);
            if (this.partitionedSources.size() == 0) {
                partitionList.add(this.source);
            } else if (this.numPartitions == this.partitionedSources.size()) {
                this.localPartition = this.partitionedSources.get(index);
                partitionList.add(this.localPartition);
            } else {
                int startIndex;
                int q = (int)Math.floor((double)this.partitionedSources.size() / (double)this.numPartitions);
                int r = this.partitionedSources.size() % this.numPartitions;
                int n = q + (index < r ? 1 : 0);
                for (int i = startIndex = q * index + (index < r ? index : r); i < startIndex + n; ++i) {
                    partitionList.add(this.partitionedSources.get(i));
                }
            }
            ArrayList readers = new ArrayList();
            for (Source source : partitionList) {
                BoundedSource.BoundedReader<T> reader = this.createReader(source);
                readers.add(reader);
            }
            this.readerIterator = new ReaderToIteratorAdapter(readers);
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to create partitions for source " + this.source.getClass().getSimpleName(), e);
        }
    }

    public @UnknownKeyFor @NonNull @Initialized boolean hasNext() {
        return this.readerIterator.hasNext();
    }

    public @UnknownKeyFor @NonNull @Initialized WindowedValue<T> next() {
        return this.readerIterator.next();
    }

    private // Could not load outer class - annotation placement on inner may be incorrect
    @UnknownKeyFor @NonNull @Initialized BoundedSource.BoundedReader<T> createReader(@UnknownKeyFor @NonNull @Initialized Source<T> partition) {
        try {
            return ((BoundedSource)partition).createReader(this.options);
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to create reader from a BoundedSource.", e);
        }
    }

    private void initTransient() {
        if (this.isInitialized) {
            return;
        }
        this.options = new SerializablePipelineOptions(this.serializedOptions).get();
        this.source = (BoundedSource)SerializableUtils.deserializeFromByteArray((byte[])this.sourceBytes, (String)"WindowFn");
        this.isInitialized = true;
    }

    protected @UnknownKeyFor @NonNull @Initialized Object readResolve() throws @UnknownKeyFor @NonNull @Initialized ObjectStreamException {
        return this;
    }

    static class ReaderToIteratorAdapter<@UnknownKeyFor T>
    implements Iterator<WindowedValue<T>> {
        private static final @UnknownKeyFor @NonNull @Initialized boolean FAILED_TO_OBTAIN_NEXT = false;
        private static final @UnknownKeyFor @NonNull @Initialized boolean SUCCESSFULLY_OBTAINED_NEXT = true;
        private final @UnknownKeyFor @NonNull @Initialized List<// Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized Source.Reader<T>> readers;
        // Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized Source.Reader<T> reader;
        private @UnknownKeyFor @NonNull @Initialized int readerIndex = 0;
        private @UnknownKeyFor @NonNull @Initialized boolean started = false;
        private @UnknownKeyFor @NonNull @Initialized boolean closed = false;
        private @UnknownKeyFor @NonNull @Initialized WindowedValue<T> next = null;

        public ReaderToIteratorAdapter(@UnknownKeyFor @NonNull @Initialized List<// Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized Source.Reader<T>> readers) {
            this.readers = readers;
            this.readerIndex = 0;
            this.reader = readers.get(this.readerIndex);
        }

        private @UnknownKeyFor @NonNull @Initialized boolean tryProduceNext() {
            try {
                if (this.seekNext()) {
                    this.next = WindowedValue.timestampedValueInGlobalWindow((Object)this.reader.getCurrent(), (Instant)this.reader.getCurrentTimestamp());
                    return true;
                }
                return false;
            }
            catch (Exception e) {
                throw new RuntimeException("Failed to read data.", e);
            }
        }

        private void close() {
            try {
                this.reader.close();
                ++this.readerIndex;
                if (this.readerIndex == this.readers.size()) {
                    this.closed = true;
                } else {
                    this.reader = this.readers.get(this.readerIndex);
                    this.started = false;
                }
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        private @UnknownKeyFor @NonNull @Initialized boolean seekNext() throws @UnknownKeyFor @NonNull @Initialized IOException {
            if (!this.started) {
                this.started = true;
                return this.reader.start() || this.advance(true);
            }
            return !this.closed && this.advance(false);
        }

        private @UnknownKeyFor @NonNull @Initialized boolean advance(@UnknownKeyFor @NonNull @Initialized boolean calledAfterStart) throws @UnknownKeyFor @NonNull @Initialized IOException {
            if (!calledAfterStart && this.reader.advance()) {
                return true;
            }
            this.close();
            return this.seekNext();
        }

        private @UnknownKeyFor @NonNull @Initialized WindowedValue<T> consumeCurrent() {
            if (this.next == null) {
                throw new NoSuchElementException();
            }
            WindowedValue<T> current = this.next;
            this.next = null;
            return current;
        }

        private @UnknownKeyFor @NonNull @Initialized WindowedValue<T> consumeNext() {
            if (this.next == null) {
                this.tryProduceNext();
            }
            return this.consumeCurrent();
        }

        @Override
        @Pure
        public @UnknownKeyFor @NonNull @Initialized boolean hasNext() {
            return this.next != null || this.tryProduceNext();
        }

        @Override
        public @UnknownKeyFor @NonNull @Initialized WindowedValue<T> next() {
            return this.consumeNext();
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException();
        }
    }
}

