/*
 * Decompiled with CFR 0.152.
 */
package org.apache.apex.malhar.lib.state.managed;

import com.datatorrent.api.Context;
import com.datatorrent.common.util.NameableThreadFactory;
import com.datatorrent.netlet.util.Slice;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Queues;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.lib.state.managed.Bucket;
import org.apache.apex.malhar.lib.state.managed.ManagedStateComponent;
import org.apache.apex.malhar.lib.state.managed.ManagedStateContext;
import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IncrementalCheckpointManager
extends FSWindowDataManager
implements ManagedStateComponent {
    private static final String WAL_RELATIVE_PATH = "managed_state";
    private final transient Map<Long, Map<Long, Map<Slice, Bucket.BucketedValue>>> savedWindows = new ConcurrentSkipListMap<Long, Map<Long, Map<Slice, Bucket.BucketedValue>>>();
    private transient ExecutorService writerService;
    private volatile transient boolean transfer;
    private final transient LinkedBlockingQueue<Long> windowsToTransfer = Queues.newLinkedBlockingQueue();
    private final transient AtomicReference<Throwable> throwable = new AtomicReference();
    protected transient ManagedStateContext managedStateContext;
    private final transient AtomicLong latestExpiredTimeBucket = new AtomicLong(-1L);
    private transient int waitMillis;
    private volatile long lastTransferredWindow = -1L;
    private transient long largestWindowAddedToTransferQueue = -1L;
    private static final Logger LOG = LoggerFactory.getLogger(IncrementalCheckpointManager.class);

    public IncrementalCheckpointManager() {
        this.setRecoveryPath(WAL_RELATIVE_PATH);
    }

    @Override
    public void setup(Context.OperatorContext context) {
        throw new UnsupportedOperationException("not supported");
    }

    @Override
    public void setup(final @NotNull ManagedStateContext managedStateContext) {
        this.managedStateContext = (ManagedStateContext)Preconditions.checkNotNull((Object)managedStateContext, (Object)"managed state context");
        this.waitMillis = (Integer)managedStateContext.getOperatorContext().getValue(Context.OperatorContext.SPIN_MILLIS);
        super.setup(managedStateContext.getOperatorContext());
        this.writerService = Executors.newSingleThreadExecutor((ThreadFactory)new NameableThreadFactory("managed-state-writer"));
        this.transfer = true;
        this.writerService.submit(new Runnable(){

            @Override
            public void run() {
                while (IncrementalCheckpointManager.this.transfer) {
                    IncrementalCheckpointManager.this.transferWindowFiles();
                    if (IncrementalCheckpointManager.this.latestExpiredTimeBucket.get() <= -1L) continue;
                    try {
                        managedStateContext.getBucketsFileSystem().deleteTimeBucketsLessThanEqualTo(IncrementalCheckpointManager.this.latestExpiredTimeBucket.getAndSet(-1L));
                    }
                    catch (IOException e) {
                        IncrementalCheckpointManager.this.throwable.set(e);
                        LOG.debug("delete files", (Throwable)e);
                        Throwables.propagate((Throwable)e);
                    }
                }
            }
        });
    }

    protected void transferWindowFiles() {
        block6: {
            try {
                Long windowId = this.windowsToTransfer.poll();
                if (windowId != null) {
                    try {
                        LOG.debug("transfer window {}", (Object)windowId);
                        Map<Long, Map<Slice, Bucket.BucketedValue>> buckets = this.savedWindows.remove(windowId);
                        for (Map.Entry<Long, Map<Slice, Bucket.BucketedValue>> singleBucket : buckets.entrySet()) {
                            this.managedStateContext.getBucketsFileSystem().writeBucketData(windowId, singleBucket.getKey(), singleBucket.getValue());
                        }
                        this.storageAgent.delete(this.managedStateContext.getOperatorContext().getId(), windowId.longValue());
                    }
                    catch (Throwable t) {
                        this.throwable.set(t);
                        LOG.debug("transfer window {}", (Object)windowId, (Object)t);
                        Throwables.propagate((Throwable)t);
                    }
                    this.lastTransferredWindow = windowId;
                    break block6;
                }
                Thread.sleep(this.waitMillis);
            }
            catch (InterruptedException ex) {
                LOG.debug("interrupted", (Throwable)ex);
            }
        }
    }

    @Override
    public void save(Object object, int operatorId, long windowId) throws IOException {
        throw new UnsupportedOperationException("doesn't support saving any object");
    }

    public void save(Map<Long, Map<Slice, Bucket.BucketedValue>> unsavedData, int operatorId, long windowId, boolean skipWriteToWindowFile) throws IOException {
        Throwable lthrowable = this.throwable.get();
        if (lthrowable != null) {
            LOG.error("Error while transferring");
            Throwables.propagate((Throwable)lthrowable);
        }
        this.savedWindows.put(windowId, unsavedData);
        if (!skipWriteToWindowFile) {
            super.save(unsavedData, operatorId, windowId);
        }
    }

    protected void committed(int operatorId, long windowId) throws IOException, InterruptedException {
        LOG.debug("data manager committed {}", (Object)windowId);
        for (Long currentWindow : this.savedWindows.keySet()) {
            if (currentWindow <= this.largestWindowAddedToTransferQueue) continue;
            if (currentWindow > windowId) break;
            LOG.debug("to transfer {}", (Object)currentWindow);
            this.largestWindowAddedToTransferQueue = currentWindow;
            this.windowsToTransfer.add(currentWindow);
        }
    }

    @Override
    public void teardown() {
        super.teardown();
        this.transfer = false;
        this.writerService.shutdownNow();
    }

    public void setLatestExpiredTimeBucket(long timeBucket) {
        this.latestExpiredTimeBucket.set(timeBucket);
    }

    public long getLastTransferredWindow() {
        return this.lastTransferredWindow;
    }
}

