/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.streaming;

import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.RateLimiter;
import java.net.InetAddress;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Executor;
import javax.management.ListenerNotFoundException;
import javax.management.MBeanNotificationInfo;
import javax.management.NotificationFilter;
import javax.management.NotificationListener;
import javax.management.openmbean.CompositeData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.streaming.StreamEventHandler;
import org.apache.cassandra.streaming.StreamManagerMBean;
import org.apache.cassandra.streaming.StreamResultFuture;
import org.apache.cassandra.streaming.StreamState;
import org.apache.cassandra.streaming.management.StreamEventJMXNotifier;
import org.apache.cassandra.streaming.management.StreamStateCompositeData;
import org.cliffc.high_scale_lib.NonBlockingHashMap;

public class StreamManager
implements StreamManagerMBean {
    public static final StreamManager instance = new StreamManager();
    private final StreamEventJMXNotifier notifier = new StreamEventJMXNotifier();
    private final Map<UUID, StreamResultFuture> initiatedStreams = new NonBlockingHashMap();
    private final Map<UUID, StreamResultFuture> receivingStreams = new NonBlockingHashMap();

    public static StreamRateLimiter getRateLimiter(InetAddress inetAddress) {
        return new StreamRateLimiter(inetAddress);
    }

    public Set<CompositeData> getCurrentStreams() {
        return Sets.newHashSet((Iterable)Iterables.transform((Iterable)Iterables.concat(this.initiatedStreams.values(), this.receivingStreams.values()), (Function)new Function<StreamResultFuture, CompositeData>(){

            public CompositeData apply(StreamResultFuture streamResultFuture) {
                return StreamStateCompositeData.toCompositeData((StreamState)streamResultFuture.getCurrentState());
            }
        }));
    }

    public void register(final StreamResultFuture streamResultFuture) {
        streamResultFuture.addEventListener((StreamEventHandler)this.notifier);
        streamResultFuture.addListener(new Runnable(){

            @Override
            public void run() {
                StreamManager.this.initiatedStreams.remove(streamResultFuture.planId);
            }
        }, (Executor)MoreExecutors.sameThreadExecutor());
        this.initiatedStreams.put(streamResultFuture.planId, streamResultFuture);
    }

    public void registerReceiving(final StreamResultFuture streamResultFuture) {
        streamResultFuture.addEventListener((StreamEventHandler)this.notifier);
        streamResultFuture.addListener(new Runnable(){

            @Override
            public void run() {
                StreamManager.this.receivingStreams.remove(streamResultFuture.planId);
            }
        }, (Executor)MoreExecutors.sameThreadExecutor());
        this.receivingStreams.put(streamResultFuture.planId, streamResultFuture);
    }

    public StreamResultFuture getReceivingStream(UUID uUID) {
        return this.receivingStreams.get(uUID);
    }

    public void addNotificationListener(NotificationListener notificationListener, NotificationFilter notificationFilter, Object object) {
        this.notifier.addNotificationListener(notificationListener, notificationFilter, object);
    }

    public void removeNotificationListener(NotificationListener notificationListener) throws ListenerNotFoundException {
        this.notifier.removeNotificationListener(notificationListener);
    }

    public void removeNotificationListener(NotificationListener notificationListener, NotificationFilter notificationFilter, Object object) throws ListenerNotFoundException {
        this.notifier.removeNotificationListener(notificationListener, notificationFilter, object);
    }

    public MBeanNotificationInfo[] getNotificationInfo() {
        return this.notifier.getNotificationInfo();
    }

    public static class StreamRateLimiter {
        private static final double BYTES_PER_MEGABIT = 131072.0;
        private static final RateLimiter limiter = RateLimiter.create((double)Double.MAX_VALUE);
        private static final RateLimiter interDCLimiter = RateLimiter.create((double)Double.MAX_VALUE);
        private final boolean isLocalDC;

        public StreamRateLimiter(InetAddress inetAddress) {
            double d = (double)DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec() * 131072.0;
            this.mayUpdateThroughput(d, limiter);
            double d2 = (double)DatabaseDescriptor.getInterDCStreamThroughputOutboundMegabitsPerSec() * 131072.0;
            this.mayUpdateThroughput(d2, interDCLimiter);
            this.isLocalDC = DatabaseDescriptor.getLocalDataCenter() != null && DatabaseDescriptor.getEndpointSnitch() != null ? DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(inetAddress)) : true;
        }

        private void mayUpdateThroughput(double d, RateLimiter rateLimiter) {
            if (d == 0.0) {
                d = Double.MAX_VALUE;
            }
            if (rateLimiter.getRate() != d) {
                rateLimiter.setRate(d);
            }
        }

        public void acquire(int n) {
            limiter.acquire(n);
            if (!this.isLocalDC) {
                interDCLimiter.acquire(n);
            }
        }
    }
}

