/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.lib.join;

import com.datatorrent.lib.join.Bucket;
import com.datatorrent.lib.join.TimeEvent;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import javax.validation.constraints.Min;
import org.apache.hadoop.classification.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Unstable
public class TimeBasedStore<T extends TimeEvent> {
    private static final Logger logger = LoggerFactory.getLogger(TimeBasedStore.class);
    private final transient Lock lock;
    @Min(value=1L)
    protected int noOfBuckets;
    protected Bucket<T>[] buckets;
    @Min(value=1L)
    protected long expiryTimeInMillis;
    @Min(value=1L)
    protected long spanTimeInMillis;
    protected int bucketSpanInMillis;
    protected long startOfBucketsInMillis;
    protected long endOBucketsInMillis;
    protected transient Map<Long, Bucket> dirtyBuckets = new HashMap<Long, Bucket>();
    private boolean isOuter = false;
    private List<T> unmatchedEvents = new ArrayList<T>();
    private Map<Object, Set<Long>> key2Buckets = new ConcurrentHashMap<Object, Set<Long>>();
    private transient Timer bucketSlidingTimer;

    public TimeBasedStore() {
        this.lock = new Lock();
    }

    private void recomputeNumBuckets() {
        Calendar calendar = Calendar.getInstance();
        long now = calendar.getTimeInMillis();
        this.expiryTimeInMillis = this.startOfBucketsInMillis = now - this.spanTimeInMillis;
        this.endOBucketsInMillis = now;
        this.noOfBuckets = (int)Math.ceil((double)(now - this.startOfBucketsInMillis) / ((double)this.bucketSpanInMillis * 1.0));
        this.buckets = (Bucket[])Array.newInstance(Bucket.class, this.noOfBuckets);
    }

    public void setup() {
        this.setBucketSpanInMillis((int)(this.spanTimeInMillis > (long)this.bucketSpanInMillis ? (long)this.bucketSpanInMillis : this.spanTimeInMillis));
        if (this.buckets == null) {
            this.recomputeNumBuckets();
        }
        this.startService();
    }

    public List<TimeEvent> getValidTuples(T tuple) {
        Object key = tuple.getEventKey();
        Set<Long> keyBuckets = this.key2Buckets.get(key);
        if (keyBuckets == null) {
            return null;
        }
        ArrayList<TimeEvent> validTuples = new ArrayList<TimeEvent>();
        for (Long idx : keyBuckets) {
            List<T> events;
            int bucketIdx = (int)(idx % (long)this.noOfBuckets);
            Bucket<T> tb = this.buckets[bucketIdx];
            if (tb == null || tb.bucketKey != idx || (events = tb.get(key)) == null) continue;
            validTuples.addAll(events);
        }
        return validTuples;
    }

    public boolean put(T tuple) {
        long bucketKey = this.getBucketKeyFor(tuple);
        if (bucketKey < 0L) {
            return false;
        }
        this.newEvent(bucketKey, tuple);
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public long getBucketKeyFor(T event) {
        long eventTime = event.getTime();
        if (eventTime < this.expiryTimeInMillis) {
            return -1L;
        }
        long diffFromStart = eventTime - this.startOfBucketsInMillis;
        long key = diffFromStart / (long)this.bucketSpanInMillis;
        Lock lock = this.lock;
        synchronized (lock) {
            if (eventTime > this.endOBucketsInMillis) {
                long move = ((eventTime - this.endOBucketsInMillis) / (long)this.bucketSpanInMillis + 1L) * (long)this.bucketSpanInMillis;
                this.expiryTimeInMillis += move;
                this.endOBucketsInMillis += move;
            }
        }
        return key;
    }

    public void newEvent(long bucketKey, T event) {
        Object key;
        Set<Long> keyBuckets;
        int bucketIdx = (int)(bucketKey % (long)this.noOfBuckets);
        Bucket<T> bucket = this.buckets[bucketIdx];
        if (bucket == null || bucket.bucketKey != bucketKey) {
            if (bucket != null) {
                this.dirtyBuckets.put(bucket.bucketKey, bucket);
            }
            bucket = this.createBucket(bucketKey);
            this.buckets[bucketIdx] = bucket;
        }
        if ((keyBuckets = this.key2Buckets.get(key = event.getEventKey())) == null) {
            keyBuckets = new HashSet<Long>();
            keyBuckets.add(bucketKey);
            this.key2Buckets.put(key, keyBuckets);
        } else {
            keyBuckets.add(bucketKey);
        }
        bucket.addNewEvent(key, event);
    }

    public void startService() {
        this.bucketSlidingTimer = new Timer();
        this.endOBucketsInMillis = this.expiryTimeInMillis + (long)(this.noOfBuckets * this.bucketSpanInMillis);
        logger.debug("bucket properties {}, {}", (Object)this.spanTimeInMillis, (Object)this.bucketSpanInMillis);
        logger.debug("bucket time params: start {}, end {}", (Object)this.startOfBucketsInMillis, (Object)this.endOBucketsInMillis);
        this.bucketSlidingTimer.scheduleAtFixedRate(new TimerTask(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                long time = 0L;
                Lock lock = TimeBasedStore.this.lock;
                synchronized (lock) {
                    time = TimeBasedStore.this.expiryTimeInMillis += (long)TimeBasedStore.this.bucketSpanInMillis;
                    TimeBasedStore.this.endOBucketsInMillis += (long)TimeBasedStore.this.bucketSpanInMillis;
                }
                TimeBasedStore.this.deleteExpiredBuckets(time);
            }
        }, this.bucketSpanInMillis, (long)this.bucketSpanInMillis);
    }

    void deleteExpiredBuckets(long time) {
        Iterator<Long> iterator = this.dirtyBuckets.keySet().iterator();
        while (iterator.hasNext()) {
            long key = iterator.next();
            Bucket t = this.dirtyBuckets.get(key);
            if (this.startOfBucketsInMillis + t.bucketKey * (long)this.bucketSpanInMillis >= time) continue;
            this.deleteBucket(t);
            iterator.remove();
        }
    }

    public List<T> getUnmatchedEvents() {
        ArrayList<T> copyEvents = new ArrayList<T>(this.unmatchedEvents);
        this.unmatchedEvents.clear();
        return copyEvents;
    }

    private void deleteBucket(Bucket bucket) {
        if (bucket == null) {
            return;
        }
        Map writtens = bucket.getEvents();
        if (writtens == null) {
            return;
        }
        for (Map.Entry e : writtens.entrySet()) {
            if (this.isOuter) {
                for (TimeEvent event : e.getValue()) {
                    if (event.isMatch()) continue;
                    this.unmatchedEvents.add(event);
                }
            }
            this.key2Buckets.get(e.getKey()).remove(bucket.bucketKey);
            if (this.key2Buckets.get(e.getKey()).size() != 0) continue;
            this.key2Buckets.remove(e.getKey());
        }
    }

    protected Bucket<T> createBucket(long bucketKey) {
        return new Bucket(bucketKey);
    }

    public void shutdown() {
        this.bucketSlidingTimer.cancel();
    }

    public void isOuterJoin(boolean isOuter) {
        this.isOuter = isOuter;
    }

    public long getSpanTimeInMillis() {
        return this.spanTimeInMillis;
    }

    public void setSpanTimeInMillis(long spanTimeInMillis) {
        this.spanTimeInMillis = spanTimeInMillis;
    }

    public int getBucketSpanInMillis() {
        return this.bucketSpanInMillis;
    }

    public void setBucketSpanInMillis(int bucketSpanInMillis) {
        this.bucketSpanInMillis = bucketSpanInMillis;
    }

    private static class Lock {
        private Lock() {
        }
    }
}

