/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.driver;

import io.aeron.driver.DriverConductorProxy;
import io.aeron.driver.PublicationImage;
import io.aeron.driver.Receiver;
import io.aeron.driver.exceptions.UnknownSubscriptionException;
import io.aeron.driver.media.ReceiveChannelEndpoint;
import io.aeron.exceptions.AeronEvent;
import io.aeron.protocol.DataHeaderFlyweight;
import io.aeron.protocol.RttMeasurementFlyweight;
import io.aeron.protocol.SetupFlyweight;
import java.net.InetSocketAddress;
import org.agrona.collections.Int2ObjectHashMap;
import org.agrona.collections.IntHashSet;
import org.agrona.concurrent.UnsafeBuffer;

public final class DataPacketDispatcher {
    private final Int2ObjectHashMap<StreamInterest> streamInterestByIdMap = new Int2ObjectHashMap();
    private final DriverConductorProxy conductorProxy;
    private final Receiver receiver;
    private final int streamSessionLimit;

    DataPacketDispatcher(DriverConductorProxy conductorProxy, Receiver receiver, int streamSessionLimit) {
        this.conductorProxy = conductorProxy;
        this.receiver = receiver;
        this.streamSessionLimit = streamSessionLimit;
    }

    public void addSubscription(int streamId) {
        StreamInterest streamInterest = (StreamInterest)this.streamInterestByIdMap.get(streamId);
        if (null == streamInterest) {
            this.streamInterestByIdMap.put(streamId, (Object)new StreamInterest(true));
        } else if (!streamInterest.isAllSessions) {
            streamInterest.isAllSessions = true;
            Int2ObjectHashMap.ValueIterator iterator = streamInterest.sessionInterestByIdMap.values().iterator();
            while (iterator.hasNext()) {
                if (SessionState.NO_INTEREST != iterator.next()) continue;
                iterator.remove();
            }
        }
    }

    public void addSubscription(int streamId, int sessionId) {
        StreamInterest streamInterest = (StreamInterest)this.streamInterestByIdMap.get(streamId);
        if (null == streamInterest) {
            streamInterest = new StreamInterest(false);
            this.streamInterestByIdMap.put(streamId, (Object)streamInterest);
        }
        streamInterest.subscribedSessionIds.add(sessionId);
        SessionState sessionState = (SessionState)((Object)streamInterest.sessionInterestByIdMap.get(sessionId));
        if (SessionState.NO_INTEREST == sessionState) {
            streamInterest.sessionInterestByIdMap.remove(sessionId);
        }
    }

    public void removeSubscription(int streamId) {
        StreamInterest streamInterest = (StreamInterest)this.streamInterestByIdMap.get(streamId);
        if (null == streamInterest) {
            throw new UnknownSubscriptionException("no subscription for stream " + streamId);
        }
        streamInterest.removeNonSessionSpecificInterest();
        streamInterest.isAllSessions = false;
        if (streamInterest.subscribedSessionIds.isEmpty()) {
            this.streamInterestByIdMap.remove(streamId);
        }
    }

    public void removeSubscription(int streamId, int sessionId) {
        StreamInterest streamInterest = (StreamInterest)this.streamInterestByIdMap.get(streamId);
        if (null == streamInterest) {
            throw new UnknownSubscriptionException("no subscription for stream " + streamId);
        }
        if (!streamInterest.isAllSessions) {
            PublicationImage publicationImage = (PublicationImage)streamInterest.activeImageByIdMap.remove(sessionId);
            if (null != publicationImage) {
                publicationImage.deactivate();
            }
            streamInterest.sessionInterestByIdMap.remove(sessionId);
        }
        streamInterest.subscribedSessionIds.remove(sessionId);
        if (!streamInterest.isAllSessions && streamInterest.subscribedSessionIds.isEmpty()) {
            this.streamInterestByIdMap.remove(streamId);
        }
    }

    public void addPublicationImage(PublicationImage image) {
        StreamInterest streamInterest = (StreamInterest)this.streamInterestByIdMap.get(image.streamId());
        if (null != streamInterest) {
            streamInterest.sessionInterestByIdMap.remove(image.sessionId());
            streamInterest.activeImageByIdMap.put(image.sessionId(), (Object)image);
            image.activate();
        }
    }

    public void removePublicationImage(PublicationImage image) {
        PublicationImage activeImage;
        StreamInterest streamInterest = (StreamInterest)this.streamInterestByIdMap.get(image.streamId());
        if (null != streamInterest && null != (activeImage = (PublicationImage)streamInterest.activeImageByIdMap.get(image.sessionId())) && activeImage.correlationId() == image.correlationId()) {
            streamInterest.activeImageByIdMap.remove(image.sessionId());
            if (!image.isEndOfStream()) {
                streamInterest.sessionInterestByIdMap.put(image.sessionId(), (Object)SessionState.ON_COOL_DOWN);
            }
        }
        image.deactivate();
    }

    public void removePendingSetup(int sessionId, int streamId) {
        this.removeByState(sessionId, streamId, SessionState.PENDING_SETUP_FRAME);
    }

    public void removeCoolDown(int sessionId, int streamId) {
        this.removeByState(sessionId, streamId, SessionState.ON_COOL_DOWN);
    }

    public int onDataPacket(ReceiveChannelEndpoint channelEndpoint, DataHeaderFlyweight header, UnsafeBuffer buffer, int length, InetSocketAddress srcAddress, int transportIndex) {
        int streamId = header.streamId();
        StreamInterest streamInterest = (StreamInterest)this.streamInterestByIdMap.get(streamId);
        if (null != streamInterest) {
            int sessionId = header.sessionId();
            PublicationImage image = streamInterest.findActive(sessionId);
            if (null != image) {
                return image.insertPacket(header.termId(), header.termOffset(), buffer, length, transportIndex, srcAddress);
            }
            if (!DataHeaderFlyweight.isEndOfStream((UnsafeBuffer)buffer) && !streamInterest.sessionInterestByIdMap.containsKey(sessionId)) {
                if (streamInterest.isAllSessions || streamInterest.subscribedSessionIds.contains(sessionId)) {
                    streamInterest.sessionInterestByIdMap.put(sessionId, (Object)SessionState.PENDING_SETUP_FRAME);
                    this.elicitSetupMessageFromSource(channelEndpoint, transportIndex, srcAddress, streamId, sessionId);
                } else {
                    streamInterest.sessionInterestByIdMap.put(sessionId, (Object)SessionState.NO_INTEREST);
                }
            }
        }
        return 0;
    }

    public void onSetupMessage(ReceiveChannelEndpoint channelEndpoint, SetupFlyweight msg, InetSocketAddress srcAddress, int transportIndex) {
        int streamId = msg.streamId();
        StreamInterest streamInterest = (StreamInterest)this.streamInterestByIdMap.get(streamId);
        if (null != streamInterest) {
            int sessionId = msg.sessionId();
            if (streamInterest.isSessionLimitExceeded(this.streamSessionLimit)) {
                throw new AeronEvent("exceeded session limit, streamId=" + streamId + " sourceAddress=" + srcAddress);
            }
            PublicationImage image = streamInterest.findActive(sessionId);
            SessionState sessionInterest = (SessionState)((Object)streamInterest.sessionInterestByIdMap.get(sessionId));
            if (null != image) {
                image.addDestinationConnectionIfUnknown(transportIndex, srcAddress);
            } else if (null != sessionInterest) {
                if (SessionState.PENDING_SETUP_FRAME == sessionInterest) {
                    streamInterest.sessionInterestByIdMap.put(sessionId, (Object)SessionState.INIT_IN_PROGRESS);
                    this.createPublicationImage(channelEndpoint, transportIndex, srcAddress, streamId, sessionId, msg.initialTermId(), msg.activeTermId(), msg.termOffset(), msg.termLength(), msg.mtuLength(), msg.ttl(), msg.flags());
                }
            } else if (streamInterest.isAllSessions || streamInterest.subscribedSessionIds.contains(sessionId)) {
                streamInterest.sessionInterestByIdMap.put(sessionId, (Object)SessionState.INIT_IN_PROGRESS);
                this.createPublicationImage(channelEndpoint, transportIndex, srcAddress, streamId, sessionId, msg.initialTermId(), msg.activeTermId(), msg.termOffset(), msg.termLength(), msg.mtuLength(), msg.ttl(), msg.flags());
            } else {
                streamInterest.sessionInterestByIdMap.put(sessionId, (Object)SessionState.NO_INTEREST);
            }
        }
    }

    public void onRttMeasurement(ReceiveChannelEndpoint channelEndpoint, RttMeasurementFlyweight msg, InetSocketAddress srcAddress, int transportIndex) {
        int sessionId;
        PublicationImage image;
        int streamId = msg.streamId();
        StreamInterest streamInterest = (StreamInterest)this.streamInterestByIdMap.get(streamId);
        if (null != streamInterest && null != (image = streamInterest.findActive(sessionId = msg.sessionId()))) {
            if (128 == (msg.flags() & 0x80)) {
                InetSocketAddress controlAddress = channelEndpoint.isMulticast(transportIndex) ? channelEndpoint.udpChannel(transportIndex).remoteControl() : srcAddress;
                channelEndpoint.sendRttMeasurement(transportIndex, controlAddress, sessionId, streamId, msg.echoTimestampNs(), 0L, false);
            } else {
                image.onRttMeasurement(msg, transportIndex, srcAddress);
            }
        }
    }

    public boolean shouldElicitSetupMessage() {
        return !this.streamInterestByIdMap.isEmpty();
    }

    private void removeByState(int sessionId, int streamId, SessionState state) {
        SessionState sessionState;
        StreamInterest streamInterest = (StreamInterest)this.streamInterestByIdMap.get(streamId);
        if (null != streamInterest && null != (sessionState = (SessionState)((Object)streamInterest.sessionInterestByIdMap.get(sessionId))) && state == sessionState) {
            streamInterest.sessionInterestByIdMap.remove(sessionId);
        }
    }

    private void elicitSetupMessageFromSource(ReceiveChannelEndpoint channelEndpoint, int transportIndex, InetSocketAddress srcAddress, int streamId, int sessionId) {
        InetSocketAddress controlAddress = channelEndpoint.isMulticast(transportIndex) ? channelEndpoint.udpChannel(transportIndex).remoteControl() : srcAddress;
        channelEndpoint.sendSetupElicitingStatusMessage(transportIndex, controlAddress, sessionId, streamId);
        this.receiver.addPendingSetupMessage(sessionId, streamId, transportIndex, channelEndpoint, false, controlAddress);
    }

    private void createPublicationImage(ReceiveChannelEndpoint channelEndpoint, int transportIndex, InetSocketAddress srcAddress, int streamId, int sessionId, int initialTermId, int activeTermId, int termOffset, int termLength, int mtuLength, int setupTtl, short flags) {
        InetSocketAddress controlAddress;
        InetSocketAddress inetSocketAddress = controlAddress = channelEndpoint.isMulticast(transportIndex) ? channelEndpoint.udpChannel(transportIndex).remoteControl() : srcAddress;
        if (channelEndpoint.isMulticast(transportIndex) && channelEndpoint.multicastTtl(transportIndex) < setupTtl) {
            channelEndpoint.possibleTtlAsymmetryEncountered();
        }
        this.conductorProxy.createPublicationImage(sessionId, streamId, initialTermId, activeTermId, termOffset, termLength, mtuLength, transportIndex, flags, controlAddress, srcAddress, channelEndpoint);
    }

    static class StreamInterest {
        boolean isAllSessions;
        final Int2ObjectHashMap<SessionState> sessionInterestByIdMap = new Int2ObjectHashMap();
        final Int2ObjectHashMap<PublicationImage> activeImageByIdMap = new Int2ObjectHashMap();
        final IntHashSet subscribedSessionIds = new IntHashSet();

        StreamInterest(boolean isAllSessions) {
            this.isAllSessions = isAllSessions;
        }

        PublicationImage findActive(int sessionId) {
            return (PublicationImage)this.activeImageByIdMap.get(sessionId);
        }

        public boolean isSessionLimitExceeded(int sessionLimit) {
            return sessionLimit <= this.activeImageByIdMap.size();
        }

        void removeNonSessionSpecificInterest() {
            Int2ObjectHashMap.EntryIterator activeIterator = this.activeImageByIdMap.entrySet().iterator();
            while (activeIterator.hasNext()) {
                activeIterator.next();
                int sessionId = activeIterator.getIntKey();
                if (this.subscribedSessionIds.contains(sessionId)) continue;
                ((PublicationImage)activeIterator.getValue()).deactivate();
                activeIterator.remove();
            }
            Int2ObjectHashMap.EntryIterator iterator = this.sessionInterestByIdMap.entrySet().iterator();
            while (iterator.hasNext()) {
                iterator.next();
                int sessionId = iterator.getIntKey();
                if (this.subscribedSessionIds.contains(sessionId)) continue;
                iterator.remove();
            }
        }
    }

    static enum SessionState {
        ACTIVE,
        PENDING_SETUP_FRAME,
        INIT_IN_PROGRESS,
        ON_COOL_DOWN,
        NO_INTEREST;

    }
}

