/*
 * Decompiled with CFR 0.152.
 */
package io.kiw.speedy.builder;

import io.kiw.speedy.PublisherBucket;
import io.kiw.speedy.SpeedyConnection;
import io.kiw.speedy.SpeedyHost;
import io.kiw.speedy.SpeedyMessagingImpl;
import io.kiw.speedy.builder.ChannelMode;
import io.kiw.speedy.builder.NoOpTemporaryResponseHandler;
import io.kiw.speedy.builder.PublishingChannel;
import io.kiw.speedy.builder.SubscribingChannel;
import io.kiw.speedy.channel.TemporaryResponseHandler;
import io.kiw.speedy.channel.TemporaryResponseHandlerImpl;
import io.kiw.speedy.helper.ImmutableIntMap;
import io.kiw.speedy.helper.ImmutableMapFactory;
import io.kiw.speedy.marshaller.EventMarshaller;
import io.kiw.speedy.marshaller.PacketHandlerImpl;
import io.kiw.speedy.publisher.PacketFlusher;
import io.kiw.speedy.publisher.SchedulerThread;
import io.kiw.speedy.subscriber.OnMessageErrorHandler;
import io.kiw.speedy.subscriber.SubscriberChannelState;
import io.kiw.speedy.subscriber.SubscriberConsumer;
import io.kiw.speedy.subscriber.SubscriberMultiThreadHandler;
import io.kiw.speedy.subscriber.SubscriberSameThreadHandler;
import io.kiw.speedy.subscriber.SubscriberThreadHandler;
import io.kiw.speedy.wiring.MultiThreadSubscriberHandler;
import io.kiw.speedy.wiring.SpeedyWiring;
import io.kiw.speedy.wiring.UdpSpeedyWiring;
import io.kiw.tetryon.EventHandlerGroup;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class SpeedyMessagingBuilder {
    private final SpeedyHost localhost;
    private OnMessageErrorHandler onMessageErrorHandler = t -> {};
    private SpeedyWiring wiring;
    private final Map<Integer, SubscribingChannel> subscribingChannels;
    private final Map<Integer, PublishingChannel> publishingChannels;
    private final Set<SpeedyHost> remoteHosts;

    SpeedyMessagingBuilder(SpeedyHost localhost, Map<Integer, PublishingChannel> publishingChannels, Map<Integer, SubscribingChannel> subscribingChannels, Set<SpeedyHost> remoteHosts) {
        this.localhost = localhost;
        this.publishingChannels = publishingChannels;
        this.subscribingChannels = subscribingChannels;
        this.remoteHosts = remoteHosts;
    }

    public SpeedyMessagingBuilder withOnMessageErrorHandler(OnMessageErrorHandler onMessageErrorHandler) {
        this.onMessageErrorHandler = onMessageErrorHandler;
        return this;
    }

    public SpeedyMessagingBuilder withWiring(SpeedyWiring wiring) {
        this.wiring = wiring;
        return this;
    }

    public SpeedyMessagingImpl build() {
        if (this.localhost == null) {
            throw new RuntimeException("Local speedy host must be defined.");
        }
        if (this.wiring == null) {
            this.wiring = new UdpSpeedyWiring(this.localhost.getPort());
        }
        SchedulerThread schedulerThread = new SchedulerThread(this.wiring::getNanoTime);
        this.wiring.addPulseHandler(schedulerThread::pulse);
        HashMap<Integer, SpeedyConnection> remoteConnections = new HashMap<Integer, SpeedyConnection>();
        HashMap<Integer, SubscriberChannelState> globalChannelState = new HashMap<Integer, SubscriberChannelState>();
        for (SpeedyHost speedyHost : this.remoteHosts) {
            HashSet<Integer> hashSet = new HashSet<Integer>();
            HashMap<Integer, SubscriberChannelState> channelStateForRemoteHost = new HashMap<Integer, SubscriberChannelState>();
            for (Map.Entry<Integer, SubscribingChannel> subscribingChannelEntry : this.subscribingChannels.entrySet()) {
                SubscriberThreadHandler subscriberThreadHandler;
                TemporaryResponseHandler temporaryResponseHandler;
                SubscribingChannel subscribingChannel = subscribingChannelEntry.getValue();
                PublishingChannel publishingChannel = this.publishingChannels.get(subscribingChannelEntry.getKey());
                if (!subscribingChannel.getPublishingHosts().contains(speedyHost) && !publishingChannel.getSubscribingHosts().contains(speedyHost)) continue;
                for (String subscribingKey : subscribingChannel.getKeys()) {
                    hashSet.add(subscribingKey.hashCode());
                }
                TemporaryResponseHandler temporaryResponseHandler2 = temporaryResponseHandler = publishingChannel.getChannelMode() == ChannelMode.SUBSCRIBER_ONLY ? new NoOpTemporaryResponseHandler() : new TemporaryResponseHandlerImpl(subscribingChannelEntry.getValue().getName().hashCode() + "-" + this.localhost.hashCode() + "-RESPONSE-KEY-SPM");
                if (subscribingChannel.getSubscriberThreads() <= 1) {
                    subscriberThreadHandler = new SubscriberSameThreadHandler(subscribingChannelEntry.getValue().getName());
                } else {
                    List group = EventHandlerGroup.createGroup(() -> new SubscriberConsumer(((SubscribingChannel)subscribingChannelEntry.getValue()).getName()), (int)subscribingChannel.getSubscriberThreads());
                    MultiThreadSubscriberHandler multiThreadSubscriberHandler = this.wiring.buildMultiThreadSubscriberHandler(group);
                    subscriberThreadHandler = new SubscriberMultiThreadHandler(multiThreadSubscriberHandler::handleMessage);
                }
                SubscriberChannelState subscriberChannelState = new SubscriberChannelState(0L, subscribingChannel.getWindowSizeOfRoute(), subscriberThreadHandler, temporaryResponseHandler);
                channelStateForRemoteHost.put(subscribingChannelEntry.getKey(), subscriberChannelState);
                globalChannelState.put(subscribingChannelEntry.getKey(), subscriberChannelState);
            }
            remoteConnections.put(speedyHost.hashCode(), new SpeedyConnection(speedyHost, hashSet, ImmutableMapFactory.initialiseIntMap(channelStateForRemoteHost)));
        }
        HashMap<Integer, PublisherBucket> publisherBucketsMap = new HashMap<Integer, PublisherBucket>();
        for (Map.Entry<Integer, PublishingChannel> entry : this.publishingChannels.entrySet()) {
            this.createSequenceBucketAndMappings(entry, publisherBucketsMap, remoteConnections, globalChannelState);
        }
        ImmutableIntMap<PublisherBucket> immutableIntMap = ImmutableMapFactory.initialiseIntMap(publisherBucketsMap);
        PacketFlusher packetFlusher = new PacketFlusher(this.wiring::sendPacket);
        ImmutableIntMap<SpeedyConnection> tImmutableIntMap = ImmutableMapFactory.initialiseIntMap(remoteConnections);
        return new SpeedyMessagingImpl(this.localhost, this.onMessageErrorHandler, this.wiring, immutableIntMap, tImmutableIntMap, globalChannelState, this.wiring.wrapPacketHandler(new PacketHandlerImpl(packetFlusher, new EventMarshaller(packetFlusher), immutableIntMap.values().toArray(new PublisherBucket[immutableIntMap.size()]), this.wiring)), schedulerThread);
    }

    private void createSequenceBucketAndMappings(Map.Entry<Integer, PublishingChannel> publishingChannelEntry, Map<Integer, PublisherBucket> publisherBuckets, Map<Integer, SpeedyConnection> allRemoteConnections, Map<Integer, SubscriberChannelState> globalSubscriberChannelState) {
        PublishingChannel publishingChannel = publishingChannelEntry.getValue();
        HashSet<String> keysToPublishto = new HashSet<String>();
        keysToPublishto.addAll(publishingChannel.getKeys());
        publisherBuckets.put(publishingChannelEntry.getKey(), new PublisherBucket(keysToPublishto, publishingChannel.getWindowSizeOfRoute(), this.localhost.hashCode(), publishingChannelEntry.getKey(), globalSubscriberChannelState.get(publishingChannelEntry.getKey()), this.getFilteredSpeedyConnections(publishingChannel.getSubscribingHosts(), allRemoteConnections)));
    }

    private SpeedyConnection[] getFilteredSpeedyConnections(Collection<SpeedyHost> publishingHosts, Map<Integer, SpeedyConnection> allRemoteConnections) {
        HashSet<SpeedyConnection> speedyConnections = new HashSet<SpeedyConnection>();
        for (SpeedyHost publishingHost : publishingHosts) {
            speedyConnections.add(allRemoteConnections.get(publishingHost.hashCode()));
        }
        return speedyConnections.toArray(new SpeedyConnection[speedyConnections.size()]);
    }
}

