/*
 * Decompiled with CFR 0.152.
 */
package cacheperf.comparisons.newWan;

import cacheperf.CachePerfClient;
import cacheperf.CachePerfPrms;
import cacheperf.comparisons.newWan.NewWanPerfBlackboard;
import com.gemstone.gemfire.cache.wan.GatewaySender;
import com.gemstone.gemfire.internal.cache.RegionQueue;
import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
import hydra.GatewaySenderHelper;
import hydra.HydraThreadLocal;
import hydra.Log;
import hydra.MasterController;
import hydra.RemoteTestModule;
import hydra.blackboard.SharedMap;
import java.util.Set;

public class NewWanPSTClient
extends CachePerfClient {
    private static HydraThreadLocal queueMonitorStarted = new HydraThreadLocal();

    public static void putDataGWSenderTask() {
        NewWanPSTClient c = new NewWanPSTClient();
        c.initialize(3);
        c.putDataGWSender();
    }

    private void putDataGWSender() {
        if (this.useTransactions) {
            this.begin();
        }
        int gatewayQueueEntries = CachePerfPrms.getGatewayQueueEntries();
        boolean batchDone = false;
        do {
            this.executeTaskTerminator();
            this.executeWarmupTerminator();
            for (int i = 0; i < gatewayQueueEntries; ++i) {
                int key = this.getNextKey();
                this.put(key);
                ++this.batchCount;
                ++this.count;
                ++this.keyCount;
                ++this.iterationsSinceTxEnd;
            }
        } while (!(batchDone = this.executeBatchTerminator()));
    }

    private void waitForGWSenderQueuesToDrain() {
        long start = this.statistics.startGatewayQueueDrain();
        Set<GatewaySender> senders = GatewaySenderHelper.getGatewaySenders();
        if (senders != null) {
            while (senders != null) {
                MasterController.sleepForMs(1);
                int size = 0;
                int totalBatchSize = 0;
                for (GatewaySender sender : senders) {
                    totalBatchSize += sender.getBatchSize();
                    Set rqs = ((AbstractGatewaySender)sender).getQueues();
                    for (RegionQueue rq : rqs) {
                        size += rq.size();
                    }
                }
                if (size > totalBatchSize) continue;
                break;
            }
        } else {
            long size;
            SharedMap sharedMap = NewWanPerfBlackboard.getInstance().getSharedMap();
            do {
                MasterController.sleepForMs(100);
                size = 0L;
                Set keySet = sharedMap.getMap().keySet();
                for (String k : keySet) {
                    if (!k.startsWith("EVENT_QUEUE")) continue;
                    Long value = (Long)sharedMap.get(k);
                    size += value.longValue();
                }
            } while (size > 500L);
        }
        this.statistics.endGatewayQueueDrain(start, 1, this.isMainWorkload, this.histogram);
    }

    public static synchronized void startQueueMonitorTask() {
        NewWanPSTClient c = new NewWanPSTClient();
        c.initialize();
        if (queueMonitorStarted.get() == null || ((Boolean)queueMonitorStarted.get()).equals(new Boolean(false))) {
            c.startQueueMonitor();
            queueMonitorStarted.set(new Boolean(true));
        }
    }

    protected void startQueueMonitor() {
        final String key = "EVENT_QUEUE_SIZE: for vm_" + RemoteTestModule.getMyVmid();
        Log.getLogWriter().info("Started event queue monitor with key: " + key);
        final SharedMap bb = NewWanPerfBlackboard.getInstance().getSharedMap();
        Thread queueMonitor = new Thread(new Runnable(){

            @Override
            public void run() {
                while (true) {
                    Set<GatewaySender> senders = GatewaySenderHelper.getGatewaySenders();
                    long size = 0L;
                    if (senders != null) {
                        for (GatewaySender sender : senders) {
                            Set rqs = ((AbstractGatewaySender)sender).getQueues();
                            for (RegionQueue rq : rqs) {
                                size += (long)rq.size();
                            }
                        }
                    }
                    try {
                        bb.put(key, new Long(size));
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                    MasterController.sleepForMs(5);
                }
            }

            protected void finalize() throws Throwable {
                Object o = bb.remove(key);
                super.finalize();
            }
        }, "Sender Queue Monitor");
        queueMonitor.setDaemon(true);
        queueMonitor.start();
    }
}

