/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.dyno.demo.redis;

import com.netflix.dyno.demo.redis.DynoJedisDemo;
import com.netflix.dyno.recipes.counter.DynoCounter;
import com.netflix.dyno.recipes.counter.DynoJedisBatchCounter;
import com.netflix.dyno.recipes.counter.DynoJedisCounter;
import com.netflix.dyno.recipes.counter.DynoJedisPipelineCounter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class DynoDistributedCounterDemo
extends DynoJedisDemo {
    private final List<DynoCounter> counters = new ArrayList<DynoCounter>();

    public DynoDistributedCounterDemo(String clusterName, String localDC) {
        super(clusterName, localDC);
    }

    private void runMultiThreadedCounter(int numCounters) throws Exception {
        for (int i = 0; i < numCounters; ++i) {
            this.counters.add((DynoCounter)new DynoJedisCounter(this.client.getConnPool().getName() + "-counter", this.client));
        }
        this.runMultiThreaded(5000, false, 1, 2);
    }

    private void runMultiThreadedPipelineCounter(int numCounters) throws Exception {
        for (int i = 0; i < numCounters; ++i) {
            DynoJedisPipelineCounter counter = new DynoJedisPipelineCounter(this.client.getConnPool().getName() + "-async-counter-" + i, this.client);
            counter.initialize();
            this.counters.add((DynoCounter)counter);
        }
        this.runMultiThreaded(50000, false, 1, 2);
    }

    private void runSingleThreadedPipelineCounter() throws Exception {
        DynoJedisPipelineCounter counter = new DynoJedisPipelineCounter("demo-single-async", this.client);
        counter.initialize();
        System.out.println("counter is currently set at " + counter.get());
        for (int i = 0; i < 10000; ++i) {
            counter.incr();
        }
        counter.sync();
        System.out.println("Total count => " + counter.get());
        counter.close();
        System.out.println("Cleaning up keys");
        this.cleanup((DynoCounter)counter);
    }

    private void runMultiThreadedBatchCounter(int numCounters) throws Exception {
        for (int i = 0; i < numCounters; ++i) {
            DynoJedisBatchCounter counter = new DynoJedisBatchCounter(this.client.getConnPool().getName() + "-batch-counter", this.client, Long.valueOf(500L));
            counter.initialize();
            this.counters.add((DynoCounter)counter);
        }
        this.runMultiThreaded(-1, false, 1, 2);
    }

    @Override
    protected void startWrites(final int ops, int numWriters, ExecutorService threadPool, final AtomicBoolean stop, final CountDownLatch latch, final AtomicInteger success, final AtomicInteger failure) {
        final Random random = new Random(System.currentTimeMillis());
        final int numCounters = this.counters.size() > 1 ? this.counters.size() - 1 : 1;
        for (int i = 0; i < numWriters; ++i) {
            threadPool.submit(new Callable<Void>(){

                @Override
                public Void call() throws Exception {
                    int localCount = 0;
                    while (!(stop.get() || localCount >= ops && ops != -1)) {
                        try {
                            int index = random.nextInt(numCounters);
                            DynoCounter counter = (DynoCounter)DynoDistributedCounterDemo.this.counters.get(index);
                            counter.incr();
                            success.incrementAndGet();
                            if (++localCount % 10000 != 0 || !(counter instanceof DynoJedisPipelineCounter)) continue;
                            System.out.println("WRITE - sync() " + Thread.currentThread().getName());
                            ((DynoJedisPipelineCounter)counter).sync();
                        }
                        catch (Exception e) {
                            System.out.println("WRITE FAILURE: " + Thread.currentThread().getName() + ": " + e.getMessage());
                            e.printStackTrace();
                            failure.incrementAndGet();
                        }
                    }
                    for (DynoCounter counter : DynoDistributedCounterDemo.this.counters) {
                        if (!(counter instanceof DynoJedisPipelineCounter)) continue;
                        System.out.println(Thread.currentThread().getName() + " => localCount = " + localCount);
                        ((DynoJedisPipelineCounter)counter).sync();
                    }
                    latch.countDown();
                    System.out.println(Thread.currentThread().getName() + " => Done writes");
                    return null;
                }
            });
        }
    }

    @Override
    protected void startReads(int nKeys, int numReaders, ExecutorService threadPool, final AtomicBoolean stop, final CountDownLatch latch, AtomicInteger success, AtomicInteger failure, AtomicInteger emptyReads) {
        threadPool.submit(new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                if (DynoDistributedCounterDemo.this.counters != null && DynoDistributedCounterDemo.this.counters.get(0) instanceof DynoJedisPipelineCounter) {
                    latch.countDown();
                    return null;
                }
                while (!stop.get()) {
                    long result = 0L;
                    for (DynoCounter counter : DynoDistributedCounterDemo.this.counters) {
                        result += counter.get().longValue();
                    }
                    System.out.println("counter value ==> " + result);
                    Thread.sleep(1000L);
                }
                latch.countDown();
                return null;
            }
        });
    }

    @Override
    protected void executePostRunActions() {
        long result = 0L;
        for (DynoCounter counter : this.counters) {
            if (counter instanceof DynoJedisPipelineCounter) {
                ((DynoJedisPipelineCounter)counter).sync();
            }
            result += counter.get().longValue();
        }
        System.out.println("COUNTER value ==> " + result);
    }

    @Override
    public void cleanup(int nKeys) {
        try {
            for (DynoCounter counter : this.counters) {
                this.cleanup(counter);
            }
            for (int i = 0; i < this.counters.size(); ++i) {
                this.counters.remove(i);
            }
        }
        catch (Exception ex) {
            ex.printStackTrace();
        }
    }

    public void cleanup(DynoCounter counter) throws Exception {
        for (String key : counter.getGeneratedKeys()) {
            System.out.println("deleting key: " + key);
            this.client.del(key);
            counter.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String[] args) throws IOException {
        String clusterName = args[0];
        DynoDistributedCounterDemo demo = new DynoDistributedCounterDemo(clusterName, "us-east-1e");
        int numCounters = args.length == 2 ? Integer.valueOf(args[1]) : 1;
        Properties props = new Properties();
        props.load(DynoDistributedCounterDemo.class.getResourceAsStream("/demo.properties"));
        for (String name : props.stringPropertyNames()) {
            System.setProperty(name, props.getProperty(name));
        }
        try {
            demo.initWithRemoteClusterFromEurekaUrl(args[0], 8102, false);
            demo.runMultiThreadedBatchCounter(numCounters);
        }
        catch (Exception ex) {
            ex.printStackTrace();
        }
        finally {
            demo.stop();
            System.out.println("Done");
        }
    }
}

