/*
 * Decompiled with CFR 0.152.
 */
package com.github.paganini2008.devtools.time;

import com.github.paganini2008.devtools.collection.ConcurrentSortedBoundedMap;
import com.github.paganini2008.devtools.collection.MapUtils;
import com.github.paganini2008.devtools.multithreads.ThreadUtils;
import com.github.paganini2008.devtools.time.AppendableTimeSlotMap;
import com.github.paganini2008.devtools.time.TimeSlot;
import com.github.paganini2008.devtools.time.TimeSlotMap;
import com.github.paganini2008.devtools.time.TimeWindowListener;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;

public class TimeWindowMap<V>
extends ConcurrentSortedBoundedMap<Instant, List<V>> {
    private static final long serialVersionUID = 2633466884704076710L;
    private final TimeSlotMap<List<V>> timeSlotMap;
    private final int batchSize;
    private final TimeWindowListener<V> timeWindowListener;

    public TimeWindowMap(int span, TimeSlot timeSlot, int batchSize, TimeWindowListener<V> timeWindowListener) {
        super(new ConcurrentHashMap(), 1);
        this.timeSlotMap = new AppendableTimeSlotMap(this, span, timeSlot);
        this.batchSize = batchSize;
        this.timeWindowListener = timeWindowListener;
    }

    public List<V> offer(long timeInMs, V payload) {
        return this.offer(Instant.ofEpochMilli(timeInMs), payload);
    }

    public List<V> offer(Instant time, V payload) {
        List values = MapUtils.get(this.timeSlotMap, time, () -> new CopyOnWriteArrayList());
        values.add(payload);
        ThreadUtils.forUpdate(values, () -> values.size() >= this.batchSize, () -> this.onEviction(this.timeSlotMap.mutate(time), values));
        return values;
    }

    @Override
    public void clear() {
        this.timeSlotMap.clear();
    }

    @Override
    public int size() {
        return this.timeSlotMap.size();
    }

    @Override
    public int getMaxSize() {
        return 1;
    }

    public void flush() {
        Map.Entry entry = MapUtils.getLastEntry(this.timeSlotMap);
        this.onEviction((Instant)entry.getKey(), (List)entry.getValue());
    }

    @Override
    public final void onEviction(Instant ins, List<V> values) {
        ArrayList<V> copy = new ArrayList<V>(values);
        this.timeWindowListener.saveCheckPoint(ins, copy);
        values.removeAll(copy);
    }

    public static void main(String[] args) throws Exception {
        AtomicInteger counter = new AtomicInteger();
        TimeWindowMap timeWindowMap = new TimeWindowMap(1, TimeSlot.MINUTE, 10000, (time, values) -> System.out.println("Time: " + time + "\t Size: " + counter.getAndAdd(values.size())));
        ThreadUtils.benchmark(10, 10, 100000, i -> timeWindowMap.offer(System.currentTimeMillis(), UUID.randomUUID().toString()));
        System.out.println("Completed");
        System.in.read();
        timeWindowMap.flush();
        System.out.println(counter.get());
    }
}

