/*
 * Decompiled with CFR 0.152.
 */
package org.apache.apex.malhar.lib.state.managed;

import com.datatorrent.lib.fileaccess.FileAccess;
import com.datatorrent.netlet.util.Slice;
import com.google.common.base.Preconditions;
import com.google.common.collect.Queues;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.lib.state.BucketedState;
import org.apache.apex.malhar.lib.state.managed.AbstractManagedStateImpl;
import org.apache.apex.malhar.lib.state.managed.Bucket;
import org.apache.apex.malhar.lib.state.managed.BucketsFileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.RemoteIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ManagedTimeUnifiedStateImpl
extends AbstractManagedStateImpl
implements BucketedState {
    private final transient LinkedBlockingQueue<Long> purgedTimeBuckets = Queues.newLinkedBlockingQueue();
    private final transient Set<Bucket> bucketsForTeardown = Sets.newHashSet();
    private static transient Logger LOG = LoggerFactory.getLogger(ManagedTimeUnifiedStateImpl.class);

    public ManagedTimeUnifiedStateImpl() {
        this.bucketsFileSystem = new TimeUnifiedBucketsFileSystem();
    }

    @Override
    public int getNumBuckets() {
        return this.timeBucketAssigner.getNumBuckets();
    }

    @Override
    public void put(long time, @NotNull Slice key, @NotNull Slice value) {
        long timeBucket = this.timeBucketAssigner.getTimeBucketFor(time);
        this.putInBucket(timeBucket, timeBucket, key, value);
    }

    @Override
    public Slice getSync(long time, @NotNull Slice key) {
        long timeBucket = this.timeBucketAssigner.getTimeBucketFor(time);
        if (timeBucket == -1L) {
            return BucketedState.EXPIRED;
        }
        return this.getValueFromBucketSync(timeBucket, timeBucket, key);
    }

    @Override
    public Future<Slice> getAsync(long time, @NotNull Slice key) {
        long timeBucket = this.timeBucketAssigner.getTimeBucketFor(time);
        if (timeBucket == -1L) {
            return Futures.immediateFuture((Object)BucketedState.EXPIRED);
        }
        return this.getValueFromBucketAsync(timeBucket, timeBucket, key);
    }

    @Override
    public void endWindow() {
        Long purgedTimeBucket;
        super.endWindow();
        while (null != (purgedTimeBucket = this.purgedTimeBuckets.poll())) {
            int purgedTimeBucketIdx = this.getBucketIdx(purgedTimeBucket);
            if (this.buckets[purgedTimeBucketIdx] == null || this.buckets[purgedTimeBucketIdx].getBucketId() != purgedTimeBucket.longValue()) continue;
            this.bucketsForTeardown.add(this.buckets[purgedTimeBucketIdx]);
            this.buckets[purgedTimeBucketIdx] = null;
        }
        Iterator<Bucket> bucketIterator = this.bucketsForTeardown.iterator();
        while (bucketIterator.hasNext()) {
            Bucket bucket = bucketIterator.next();
            if (this.tasksPerBucketId.containsKey((Object)bucket.getBucketId())) continue;
            bucket.teardown();
            bucketIterator.remove();
        }
    }

    @Override
    protected void handleBucketConflict(int bucketIdx, long newBucketId) {
        Preconditions.checkArgument((this.buckets[bucketIdx].getBucketId() < newBucketId ? 1 : 0) != 0, (Object)"new time bucket should have a value greater than the old time bucket");
        this.bucketsForTeardown.add(this.buckets[bucketIdx]);
        this.buckets[bucketIdx] = this.newBucket(newBucketId);
        this.buckets[bucketIdx].setup(this);
    }

    @Override
    public void purgeTimeBucketsLessThanEqualTo(long timeBucket) {
        this.purgedTimeBuckets.add(timeBucket);
        super.purgeTimeBucketsLessThanEqualTo(timeBucket);
    }

    private static class TimeUnifiedBucketsFileSystem
    extends BucketsFileSystem {
        private TimeUnifiedBucketsFileSystem() {
        }

        @Override
        protected FileAccess.FileWriter getWriter(long bucketId, String fileName) throws IOException {
            return this.managedStateContext.getFileAccess().getWriter(this.managedStateContext.getOperatorContext().getId(), fileName);
        }

        @Override
        protected FileAccess.FileReader getReader(long bucketId, String fileName) throws IOException {
            return this.managedStateContext.getFileAccess().getReader(this.managedStateContext.getOperatorContext().getId(), fileName);
        }

        @Override
        protected void rename(long bucketId, String fromName, String toName) throws IOException {
            this.managedStateContext.getFileAccess().rename(this.managedStateContext.getOperatorContext().getId(), fromName, toName);
        }

        @Override
        protected DataOutputStream getOutputStream(long bucketId, String fileName) throws IOException {
            return this.managedStateContext.getFileAccess().getOutputStream(this.managedStateContext.getOperatorContext().getId(), fileName);
        }

        @Override
        protected DataInputStream getInputStream(long bucketId, String fileName) throws IOException {
            return this.managedStateContext.getFileAccess().getInputStream(this.managedStateContext.getOperatorContext().getId(), fileName);
        }

        @Override
        protected boolean exists(long bucketId, String fileName) throws IOException {
            return this.managedStateContext.getFileAccess().exists(this.managedStateContext.getOperatorContext().getId(), fileName);
        }

        @Override
        protected RemoteIterator<LocatedFileStatus> listFiles(long bucketId) throws IOException {
            return this.managedStateContext.getFileAccess().listFiles(this.managedStateContext.getOperatorContext().getId());
        }

        @Override
        protected void delete(long bucketId, String fileName) throws IOException {
            this.managedStateContext.getFileAccess().delete(this.managedStateContext.getOperatorContext().getId(), fileName);
        }

        @Override
        protected void deleteBucket(long bucketId) throws IOException {
            this.managedStateContext.getFileAccess().deleteBucket(this.managedStateContext.getOperatorContext().getId());
        }

        @Override
        protected void addBucketName(long bucketId) {
            long operatorId = this.managedStateContext.getOperatorContext().getId();
            if (!this.bucketNamesOnFS.contains(operatorId)) {
                this.bucketNamesOnFS.add(operatorId);
            }
        }
    }
}

