/*
 * Decompiled with CFR 0.152.
 */
package org.apache.apex.malhar.lib.state.managed;

import com.datatorrent.api.Component;
import com.datatorrent.api.Context;
import com.datatorrent.api.Operator;
import com.datatorrent.common.util.NameableThreadFactory;
import com.datatorrent.lib.fileaccess.FileAccess;
import com.datatorrent.lib.fileaccess.TFileImpl;
import com.datatorrent.lib.util.comparator.SliceComparator;
import com.datatorrent.netlet.util.Slice;
import com.esotericsoftware.kryo.serializers.FieldSerializer;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimaps;
import com.google.common.util.concurrent.Futures;
import java.io.IOException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicReference;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.lib.state.managed.Bucket;
import org.apache.apex.malhar.lib.state.managed.BucketsFileSystem;
import org.apache.apex.malhar.lib.state.managed.IncrementalCheckpointManager;
import org.apache.apex.malhar.lib.state.managed.ManagedState;
import org.apache.apex.malhar.lib.state.managed.ManagedStateContext;
import org.apache.apex.malhar.lib.state.managed.StateTracker;
import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractManagedStateImpl
implements ManagedState,
Component<Context.OperatorContext>,
Operator.CheckpointNotificationListener,
ManagedStateContext,
TimeBucketAssigner.PurgeListener {
    private long maxMemorySize;
    protected int numBuckets;
    @NotNull
    private FileAccess fileAccess = new TFileImpl.DTFileImpl();
    @NotNull
    protected TimeBucketAssigner timeBucketAssigner = new TimeBucketAssigner();
    protected Bucket[] buckets;
    @Min(value=1L)
    private int numReaders = 1;
    @NotNull
    protected transient ExecutorService readerService;
    @NotNull
    protected IncrementalCheckpointManager checkpointManager = new IncrementalCheckpointManager();
    @NotNull
    protected BucketsFileSystem bucketsFileSystem = new BucketsFileSystem();
    protected transient Context.OperatorContext operatorContext;
    @NotNull
    protected Comparator<Slice> keyComparator = new SliceComparator();
    protected final transient AtomicReference<Throwable> throwable = new AtomicReference();
    @NotNull
    @FieldSerializer.Bind(value=JavaSerializer.class)
    private Duration checkStateSizeInterval;
    @FieldSerializer.Bind(value=JavaSerializer.class)
    private Duration durationPreventingFreeingSpace;
    private transient StateTracker stateTracker;
    final transient Object commitLock;
    protected final transient ListMultimap<Long, ValueFetchTask> tasksPerBucketId;
    private static final transient Logger LOG = LoggerFactory.getLogger(AbstractManagedStateImpl.class);

    public AbstractManagedStateImpl() {
        this.checkStateSizeInterval = Duration.millis((long)((Integer)Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS.defaultValue * (Integer)Context.OperatorContext.APPLICATION_WINDOW_COUNT.defaultValue));
        this.stateTracker = new StateTracker();
        this.commitLock = new Object();
        this.tasksPerBucketId = Multimaps.synchronizedListMultimap((ListMultimap)ArrayListMultimap.create());
    }

    public void setup(Context.OperatorContext context) {
        this.operatorContext = context;
        this.fileAccess.init();
        this.timeBucketAssigner.setPurgeListener(this);
        this.timeBucketAssigner.setup(this);
        this.checkpointManager.setup(this);
        this.bucketsFileSystem.setup(this);
        if (this.buckets == null) {
            this.numBuckets = this.getNumBuckets();
            this.buckets = new Bucket[this.numBuckets];
        }
        for (Bucket bucket : this.buckets) {
            if (bucket == null) continue;
            bucket.setup(this);
        }
        this.stateTracker.setup(this);
        long activationWindow = (Long)context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID);
        if (activationWindow != -1L) {
            try {
                for (long recoveredWindow : this.checkpointManager.getWindowIds(this.operatorContext.getId())) {
                    if (recoveredWindow <= activationWindow) {
                        Map recoveredData = (Map)this.checkpointManager.load(this.operatorContext.getId(), recoveredWindow);
                        if (recoveredData != null && !recoveredData.isEmpty()) {
                            for (Map.Entry entry : recoveredData.entrySet()) {
                                int bucketIdx = this.prepareBucket((Long)entry.getKey());
                                this.buckets[bucketIdx].recoveredData(recoveredWindow, (Map)entry.getValue());
                            }
                        }
                        this.checkpointManager.save(recoveredData, this.operatorContext.getId(), recoveredWindow, true);
                        continue;
                    }
                    this.checkpointManager.delete(this.operatorContext.getId(), recoveredWindow);
                }
            }
            catch (IOException e) {
                throw new RuntimeException("recovering", e);
            }
        }
        this.readerService = Executors.newFixedThreadPool(this.numReaders, (ThreadFactory)new NameableThreadFactory("managedStateReaders"));
    }

    public abstract int getNumBuckets();

    public void beginWindow(long windowId) {
        if (this.throwable.get() != null) {
            Throwables.propagate((Throwable)this.throwable.get());
        }
        this.timeBucketAssigner.beginWindow(windowId);
    }

    protected int prepareBucket(long bucketId) {
        this.stateTracker.bucketAccessed(bucketId);
        int bucketIdx = this.getBucketIdx(bucketId);
        Bucket bucket = this.buckets[bucketIdx];
        if (bucket == null) {
            bucket = this.newBucket(bucketId);
            bucket.setup(this);
            this.buckets[bucketIdx] = bucket;
        } else if (bucket.getBucketId() != bucketId) {
            this.handleBucketConflict(bucketIdx, bucketId);
        }
        return bucketIdx;
    }

    protected void putInBucket(long bucketId, long timeBucket, @NotNull Slice key, @NotNull Slice value) {
        Preconditions.checkNotNull((Object)key, (Object)"key");
        Preconditions.checkNotNull((Object)value, (Object)"value");
        if (timeBucket != -1L) {
            int bucketIdx = this.prepareBucket(bucketId);
            this.buckets[bucketIdx].put(key, timeBucket, value);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Slice getValueFromBucketSync(long bucketId, long timeBucket, @NotNull Slice key) {
        Bucket bucket;
        Preconditions.checkNotNull((Object)key, (Object)"key");
        int bucketIdx = this.prepareBucket(bucketId);
        Bucket bucket2 = bucket = this.buckets[bucketIdx];
        synchronized (bucket2) {
            return bucket.get(key, timeBucket, Bucket.ReadSource.ALL);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Future<Slice> getValueFromBucketAsync(long bucketId, long timeBucket, @NotNull Slice key) {
        Bucket bucket;
        Preconditions.checkNotNull((Object)key, (Object)"key");
        int bucketIdx = this.prepareBucket(bucketId);
        Bucket bucket2 = bucket = this.buckets[bucketIdx];
        synchronized (bucket2) {
            Slice cachedVal = bucket.get(key, timeBucket, Bucket.ReadSource.MEMORY);
            if (cachedVal != null) {
                return Futures.immediateFuture((Object)cachedVal);
            }
            ValueFetchTask valueFetchTask = new ValueFetchTask(bucket, key, timeBucket, this);
            this.tasksPerBucketId.put((Object)bucket.getBucketId(), (Object)valueFetchTask);
            return this.readerService.submit(valueFetchTask);
        }
    }

    protected void handleBucketConflict(int bucketIdx, long newBucketId) {
        throw new IllegalArgumentException("bucket conflict " + this.buckets[bucketIdx].getBucketId() + " " + newBucketId);
    }

    protected int getBucketIdx(long bucketId) {
        return (int)(bucketId % (long)this.numBuckets);
    }

    Bucket getBucket(long bucketId) {
        return this.buckets[this.getBucketIdx(bucketId)];
    }

    protected Bucket newBucket(long bucketId) {
        return new Bucket.DefaultBucket(bucketId);
    }

    public void endWindow() {
        this.timeBucketAssigner.endWindow();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void beforeCheckpoint(long windowId) {
        HashMap flashData = Maps.newHashMap();
        for (Bucket bucket : this.buckets) {
            if (bucket == null) continue;
            Bucket bucket2 = bucket;
            synchronized (bucket2) {
                Map<Slice, Bucket.BucketedValue> flashDataForBucket = bucket.checkpoint(windowId);
                if (!flashDataForBucket.isEmpty()) {
                    flashData.put(bucket.getBucketId(), flashDataForBucket);
                }
            }
        }
        if (!flashData.isEmpty()) {
            try {
                this.checkpointManager.save(flashData, this.operatorContext.getId(), windowId, false);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public void checkpointed(long windowId) {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void committed(long windowId) {
        Object object = this.commitLock;
        synchronized (object) {
            try {
                for (Bucket bucket : this.buckets) {
                    if (bucket == null) continue;
                    Bucket bucket2 = bucket;
                    synchronized (bucket2) {
                        bucket.committed(windowId);
                    }
                }
                this.checkpointManager.committed(this.operatorContext.getId(), windowId);
            }
            catch (IOException | InterruptedException e) {
                throw new RuntimeException("committing " + windowId, e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void teardown() {
        this.checkpointManager.teardown();
        this.bucketsFileSystem.teardown();
        this.timeBucketAssigner.teardown();
        this.readerService.shutdownNow();
        for (Bucket bucket : this.buckets) {
            if (bucket == null) continue;
            Bucket bucket2 = bucket;
            synchronized (bucket2) {
                bucket.teardown();
            }
        }
        this.stateTracker.teardown();
    }

    @Override
    public void purgeTimeBucketsLessThanEqualTo(long timeBucket) {
        this.checkpointManager.setLatestExpiredTimeBucket(timeBucket);
    }

    @Override
    public Context.OperatorContext getOperatorContext() {
        return this.operatorContext;
    }

    @Override
    public void setMaxMemorySize(long bytes) {
        this.maxMemorySize = bytes;
    }

    public long getMaxMemorySize() {
        return this.maxMemorySize;
    }

    public void setFileAccess(@NotNull FileAccess fileAccess) {
        this.fileAccess = (FileAccess)Preconditions.checkNotNull((Object)fileAccess);
    }

    @Override
    public FileAccess getFileAccess() {
        return this.fileAccess;
    }

    public void setTimeBucketAssigner(@NotNull TimeBucketAssigner timeBucketAssigner) {
        this.timeBucketAssigner = (TimeBucketAssigner)Preconditions.checkNotNull((Object)timeBucketAssigner);
    }

    @Override
    public TimeBucketAssigner getTimeBucketAssigner() {
        return this.timeBucketAssigner;
    }

    @Override
    public Comparator<Slice> getKeyComparator() {
        return this.keyComparator;
    }

    public void setKeyComparator(@NotNull Comparator<Slice> keyComparator) {
        this.keyComparator = (Comparator)Preconditions.checkNotNull(keyComparator);
    }

    @Override
    public BucketsFileSystem getBucketsFileSystem() {
        return this.bucketsFileSystem;
    }

    public int getNumReaders() {
        return this.numReaders;
    }

    public void setNumReaders(int numReaders) {
        this.numReaders = numReaders;
    }

    public Duration getCheckStateSizeInterval() {
        return this.checkStateSizeInterval;
    }

    public void setCheckStateSizeInterval(@NotNull Duration checkStateSizeInterval) {
        this.checkStateSizeInterval = (Duration)Preconditions.checkNotNull((Object)checkStateSizeInterval);
    }

    public Duration getDurationPreventingFreeingSpace() {
        return this.durationPreventingFreeingSpace;
    }

    public void setDurationPreventingFreeingSpace(Duration durationPreventingFreeingSpace) {
        this.durationPreventingFreeingSpace = durationPreventingFreeingSpace;
    }

    @VisibleForTesting
    void setStateTracker(@NotNull StateTracker stateTracker) {
        this.stateTracker = (StateTracker)Preconditions.checkNotNull((Object)stateTracker, (Object)"state tracker");
    }

    @VisibleForTesting
    void setBucketsFileSystem(@NotNull BucketsFileSystem bucketsFileSystem) {
        this.bucketsFileSystem = (BucketsFileSystem)Preconditions.checkNotNull((Object)bucketsFileSystem, (Object)"buckets file system");
    }

    static class ValueFetchTask
    implements Callable<Slice> {
        private final Bucket bucket;
        private final long timeBucketId;
        private final Slice key;
        private final AbstractManagedStateImpl managedState;

        ValueFetchTask(@NotNull Bucket bucket, @NotNull Slice key, long timeBucketId, AbstractManagedStateImpl managedState) {
            this.bucket = (Bucket)Preconditions.checkNotNull((Object)bucket);
            this.timeBucketId = timeBucketId;
            this.key = (Slice)Preconditions.checkNotNull((Object)key);
            this.managedState = (AbstractManagedStateImpl)Preconditions.checkNotNull((Object)managedState);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Slice call() throws Exception {
            try {
                Bucket bucket = this.bucket;
                synchronized (bucket) {
                    Slice value = this.bucket.get(this.key, this.timeBucketId, Bucket.ReadSource.ALL);
                    this.managedState.tasksPerBucketId.remove((Object)this.bucket.getBucketId(), (Object)this);
                    return value;
                }
            }
            catch (Throwable t) {
                this.managedState.throwable.set(t);
                throw Throwables.propagate((Throwable)t);
            }
        }
    }
}

