/*
 * Decompiled with CFR 0.152.
 */
package one.microstream.storage.types;

import java.util.concurrent.atomic.AtomicBoolean;
import one.microstream.X;
import one.microstream.collections.XArrays;
import one.microstream.collections.types.XGettingEnum;
import one.microstream.concurrency.XThreads;
import one.microstream.persistence.binary.types.Binary;
import one.microstream.storage.exceptions.StorageException;
import one.microstream.storage.exceptions.StorageExceptionImportFailed;
import one.microstream.storage.types.DisruptionCollectorExecuting;
import one.microstream.storage.types.StorageChannel;
import one.microstream.storage.types.StorageChannelImportBatch;
import one.microstream.storage.types.StorageChannelImportEntity;
import one.microstream.storage.types.StorageChannelSynchronizingTask;
import one.microstream.storage.types.StorageChannelTaskStoreEntities;
import one.microstream.storage.types.StorageEntityCache;
import one.microstream.storage.types.StorageEntityType;
import one.microstream.storage.types.StorageImportSource;
import one.microstream.storage.types.StorageObjectIdRangeEvaluator;
import one.microstream.storage.types.StorageOperationController;
import one.microstream.storage.types.StorageRequestTask;

public interface StorageRequestTaskImportData<S>
extends StorageRequestTask {

    public static abstract class Abstract<S>
    extends StorageChannelSynchronizingTask.AbstractCompletingTask<Void>
    implements StorageRequestTaskImportData<S>,
    StorageChannelTaskStoreEntities {
        private static final int SOURCE_WAIT_TIME_MS = 100;
        private final XGettingEnum<S> sources;
        private final StorageEntityCache.Default[] entityCaches;
        private final StorageObjectIdRangeEvaluator objectIdRangeEvaluator;
        private final StorageImportSource.Abstract[] sourceHeads;
        private final StorageImportSource.Abstract[] sourceTails;
        private final AtomicBoolean complete = new AtomicBoolean();
        private volatile long maxObjectId;
        private Thread readThread;

        Abstract(long timestamp, int channelCount, StorageOperationController controller, StorageObjectIdRangeEvaluator objectIdRangeEvaluator, XGettingEnum<S> sources) {
            super(timestamp, channelCount, controller);
            this.sources = sources;
            this.objectIdRangeEvaluator = objectIdRangeEvaluator;
            this.entityCaches = new StorageEntityCache.Default[channelCount];
            this.sourceTails = this.createImportSources(channelCount);
            this.sourceHeads = (StorageImportSource.Abstract[])this.sourceTails.clone();
        }

        private StorageImportSource.Abstract[] createImportSources(int channelCount) {
            StorageImportSource.Abstract[] inputSources = new StorageImportSource.Abstract[channelCount];
            for (int i = 0; i < channelCount; ++i) {
                inputSources[i] = this.createImportSource(i, null, null);
            }
            return inputSources;
        }

        protected abstract StorageImportSource.Abstract createImportSource(int var1, S var2, StorageChannelImportBatch.Default var3);

        private boolean entityCacheCollectionNotComplete() {
            for (StorageEntityCache.Default entityCache : this.entityCaches) {
                if (entityCache != null) continue;
                return true;
            }
            return false;
        }

        private synchronized void ensureReaderThread() {
            if (this.readThread != null || this.entityCacheCollectionNotComplete()) {
                return;
            }
            this.readThread = XThreads.start(this::readSources);
        }

        final void readSources() {
            ItemReader itemReader = new ItemReader(this.entityCaches, this.sourceHeads);
            for (Object source : this.sources) {
                try {
                    itemReader.setSource(source);
                    this.iterateSource(source, itemReader);
                    itemReader.completeCurrentSource();
                }
                catch (Exception e) {
                    throw new StorageExceptionImportFailed("Exception while reading import source " + source, e);
                }
            }
            this.complete.set(true);
        }

        protected abstract void iterateSource(S var1, ItemAcceptor var2);

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected final Void internalProcessBy(StorageChannel channel) {
            StorageEntityCache.Default[] defaultArray = this.entityCaches;
            synchronized (this.entityCaches) {
                block11: {
                    this.entityCaches[channel.channelIndex()] = channel.prepareImportData();
                    // ** MonitorExit[var2_2] (shouldn't be in output)
                    this.ensureReaderThread();
                    StorageImportSource.Abstract currentSource = this.sourceTails[channel.channelIndex()];
                    try {
                        while (true) {
                            StorageImportSource.Abstract abstract_ = currentSource;
                            synchronized (abstract_) {
                                while (currentSource.next == null) {
                                    if (this.complete.get()) {
                                        break block11;
                                    }
                                    currentSource.wait(100L);
                                }
                                currentSource = currentSource.next;
                            }
                            channel.importData(currentSource);
                        }
                    }
                    catch (InterruptedException e) {
                        throw new StorageException(e);
                    }
                }
                return null;
            }
        }

        @Override
        protected final void succeed(StorageChannel channel, Void result) {
            this.objectIdRangeEvaluator.evaluateObjectIdRange(0L, this.maxObjectId);
            channel.commitImportData(this.timestamp());
        }

        @Override
        protected void postCompletionSuccess(StorageChannel channel, Void result) throws InterruptedException {
            this.cleanUpResources();
        }

        @Override
        protected final void fail(StorageChannel channel, Void result) {
            this.cleanUpResources();
            channel.rollbackImportData(this.problemForChannel(channel));
        }

        private void cleanUpResources() {
            DisruptionCollectorExecuting<StorageImportSource.Abstract> closer = DisruptionCollectorExecuting.New(StorageImportSource::close);
            StorageImportSource.Abstract[] abstractArray = this.sourceTails;
            int n = abstractArray.length;
            for (int i = 0; i < n; ++i) {
                StorageImportSource.Abstract tail;
                StorageImportSource.Abstract source = tail = abstractArray[i];
                while ((source = source.next) != null) {
                    closer.executeOn(source);
                }
            }
            if (closer.hasDisruptions()) {
                throw new StorageException((Throwable)closer.toMultiCauseException());
            }
        }

        static final class ChannelItem {
            final StorageChannelImportBatch.Default headBatch = new StorageChannelImportBatch.Default();
            StorageChannelImportBatch.Default tailBatch;
            StorageChannelImportEntity.Default tailEntity;

            ChannelItem() {
            }

            ChannelItem resetChains() {
                this.tailBatch = this.headBatch;
                this.headBatch.next = null;
                this.headBatch.batchNext = null;
                this.tailEntity = null;
                return this;
            }
        }

        private final class ItemReader
        implements ItemAcceptor {
            private final StorageEntityCache.Default[] entityCaches;
            private final StorageImportSource.Abstract[] sourceHeads;
            private final ChannelItem[] channelItems;
            private final int channelHash;
            private S source;
            private int currentBatchChannel;
            private long currentSourcePosition;

            public ItemReader(StorageEntityCache.Default[] entityCaches, StorageImportSource.Abstract[] sourceHeads) {
                this.entityCaches = entityCaches;
                this.sourceHeads = sourceHeads;
                this.channelHash = sourceHeads.length - 1;
                this.channelItems = (ChannelItem[])XArrays.fill((Object[])new ChannelItem[sourceHeads.length], () -> new ChannelItem().resetChains());
            }

            @Override
            public boolean accept(long address, long availableItemLength) {
                long length = Binary.getEntityLengthRawValue((long)address);
                if (length < 0L) {
                    this.currentSourcePosition += (long)X.checkArrayRange((long)(-length));
                    this.currentBatchChannel = -1;
                    return true;
                }
                if (availableItemLength < (long)Binary.entityHeaderLength()) {
                    return false;
                }
                int intLength = X.checkArrayRange((long)length);
                long objectId = Binary.getEntityObjectIdRawValue((long)address);
                int channelIndex = (int)objectId & this.channelHash;
                StorageEntityType.Default type = this.entityCaches[channelIndex].validateEntity(intLength, Binary.getEntityTypeIdRawValue((long)address), objectId);
                if (channelIndex != this.currentBatchChannel) {
                    this.currentBatchChannel = channelIndex;
                    this.startNewBatch(intLength, objectId, type);
                } else {
                    this.addToCurrentBatch(intLength, objectId, type);
                }
                if (objectId >= Abstract.this.maxObjectId) {
                    Abstract.this.maxObjectId = objectId;
                }
                this.currentSourcePosition += (long)intLength;
                return true;
            }

            private void startNewBatch(int length, long objectId, StorageEntityType.Default type) {
                ChannelItem item = this.channelItems[this.currentBatchChannel];
                item.tailBatch = item.tailBatch.batchNext = new StorageChannelImportBatch.Default(this.currentSourcePosition, length, objectId, type);
                item.tailEntity = item.tailBatch.batchNext;
            }

            private void addToCurrentBatch(int length, long objectId, StorageEntityType.Default type) {
                ChannelItem item = this.channelItems[this.currentBatchChannel];
                item.tailEntity = item.tailEntity.next = new StorageChannelImportEntity.Default(length, objectId, type);
                item.tailBatch.batchLength += (long)length;
            }

            final void setSource(S source) {
                this.currentBatchChannel = -1;
                this.currentSourcePosition = 0L;
                this.source = source;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            final void completeCurrentSource() {
                StorageImportSource.Abstract[] sourceHeads = this.sourceHeads;
                ChannelItem[] channelItems = this.channelItems;
                for (int i = 0; i < sourceHeads.length; ++i) {
                    StorageImportSource.Abstract oldSourceHead = sourceHeads[i];
                    ChannelItem currentItem = channelItems[i];
                    sourceHeads[i] = sourceHeads[i].next = Abstract.this.createImportSource(i, this.source, currentItem.headBatch.batchNext);
                    currentItem.resetChains();
                    StorageImportSource.Abstract abstract_ = oldSourceHead;
                    synchronized (abstract_) {
                        oldSourceHead.notifyAll();
                        continue;
                    }
                }
            }
        }

        @FunctionalInterface
        public static interface ItemAcceptor {
            public boolean accept(long var1, long var3);
        }
    }
}

