/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.dyno.recipes.counter;

import com.netflix.dyno.jedis.DynoJedisClient;
import com.netflix.dyno.jedis.DynoJedisPipeline;
import com.netflix.dyno.recipes.counter.DynoJedisCounter;
import com.netflix.dyno.recipes.util.Tuple;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public class DynoJedisPipelineCounter
extends DynoJedisCounter {
    private static final Logger logger = LoggerFactory.getLogger(DynoJedisPipelineCounter.class);
    private final LinkedBlockingQueue<Command> queue = new LinkedBlockingQueue();
    private final ExecutorService counterThreadPool = Executors.newSingleThreadExecutor(new ThreadFactory(){

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "DynoJedisPipelineCounter-Poller");
        }
    });
    private final AtomicBoolean initialized = new AtomicBoolean(false);
    private final CountDownLatch latch = new CountDownLatch(1);
    private final Consumer consumer = new Consumer(this.queue, this.generatedKeys);

    public DynoJedisPipelineCounter(String key, DynoJedisClient client) {
        super(key, client);
    }

    @Override
    public void initialize() {
        if (this.initialized.compareAndSet(false, true)) {
            super.initialize();
            this.counterThreadPool.submit(this.consumer);
        }
    }

    @Override
    public void incr() {
        if (!this.initialized.get()) {
            throw new IllegalStateException("Counter has not been initialized");
        }
        this.queue.offer(Command.INCR);
    }

    public void sync() {
        if (!this.initialized.get()) {
            throw new IllegalStateException("Counter has not been initialized");
        }
        logger.debug("sending SYNC offer");
        this.queue.offer(Command.SYNC);
    }

    @Override
    public void close() {
        if (!this.initialized.get()) {
            throw new IllegalStateException("Counter has not been initialized");
        }
        this.queue.offer(Command.STOP);
        try {
            this.latch.await(2000L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    class Consumer
    implements Runnable {
        private final LinkedBlockingQueue<Command> queue;
        private final List<String> keys;
        private Long syncCount = 0L;
        private int pipelineOps = 0;
        private List<Tuple<String, DynoJedisPipeline>> keysAndPipelines;

        public Consumer(LinkedBlockingQueue<Command> queue, List<String> keys) {
            this.queue = queue;
            this.keys = keys;
            this.keysAndPipelines = new ArrayList<Tuple<String, DynoJedisPipeline>>(keys.size());
            for (String key : keys) {
                this.keysAndPipelines.add(new Tuple<String, DynoJedisPipeline>(key, DynoJedisPipelineCounter.this.client.pipelined()));
            }
        }

        @Override
        public void run() {
            Command cmd = null;
            do {
                try {
                    cmd = this.queue.take();
                    switch (cmd) {
                        case INCR: {
                            Tuple<String, DynoJedisPipeline> tuple = this.keysAndPipelines.get(DynoJedisPipelineCounter.this.randomIntFrom0toN());
                            tuple._2().incr(tuple._1());
                            ++this.pipelineOps;
                            break;
                        }
                        case SYNC: {
                            Iterator<String> iterator = this.syncCount;
                            Long l = this.syncCount = Long.valueOf(this.syncCount + 1L);
                            logger.debug(Thread.currentThread().getName() + " - SYNC " + this.syncCount + " received");
                            if (this.pipelineOps > 0) {
                                for (Tuple tuple : this.keysAndPipelines) {
                                    ((DynoJedisPipeline)tuple._2()).sync();
                                }
                                this.keysAndPipelines = new ArrayList<Tuple<String, DynoJedisPipeline>>(this.keys.size());
                                for (String string : this.keys) {
                                    this.keysAndPipelines.add(new Tuple<String, DynoJedisPipeline>(string, DynoJedisPipelineCounter.this.client.pipelined()));
                                }
                                this.pipelineOps = 0;
                            }
                            logger.debug(Thread.currentThread().getName() + " - SYNC " + this.syncCount + " done");
                            break;
                        }
                        case STOP: {
                            DynoJedisPipelineCounter.this.counterThreadPool.shutdownNow();
                            DynoJedisPipelineCounter.this.latch.countDown();
                        }
                    }
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            } while (cmd != Command.STOP);
        }
    }

    private static enum Command {
        INCR,
        SYNC,
        STOP;

    }
}

