/*
 * Decompiled with CFR 0.152.
 */
package io.kiw.speedy;

import io.kiw.speedy.PublisherBucket;
import io.kiw.speedy.SpeedyConnection;
import io.kiw.speedy.SpeedyHost;
import io.kiw.speedy.SpeedyMessaging;
import io.kiw.speedy.helper.ImmutableIntMap;
import io.kiw.speedy.management.HostRegistrationMessage;
import io.kiw.speedy.marshaller.MessageUnMarshaller;
import io.kiw.speedy.marshaller.PacketHandler;
import io.kiw.speedy.publisher.SchedulerThread;
import io.kiw.speedy.publisher.SpeedyMessagingPublisher;
import io.kiw.speedy.subscriber.GenericHandler;
import io.kiw.speedy.subscriber.OnMessageErrorHandler;
import io.kiw.speedy.subscriber.SpeedyMessageHandler;
import io.kiw.speedy.subscriber.SpeedyMessageReplyHandler;
import io.kiw.speedy.subscriber.SpeedyMessagingSubscriber;
import io.kiw.speedy.subscriber.SpeedyMessagingSubscriberFactory;
import io.kiw.speedy.subscriber.SubscriberChannelState;
import io.kiw.speedy.wiring.SpeedyWiring;
import io.kiw.speedy.wiring.thread.ThreadHandler;
import java.io.IOException;
import java.util.Map;

public class SpeedyMessagingImpl
implements SpeedyMessaging {
    public static final int DATAGRAM_LENGTH = 5920;
    private SpeedyMessagingSubscriber speedyMessagingSubscriber;
    private final SpeedyMessagingPublisher speedyMessagingPublisher;
    private final SpeedyHost localhost;
    private final ImmutableIntMap<PublisherBucket> publisherBuckets;
    private final ImmutableIntMap<SpeedyConnection> remoteConnections;
    private final Map<Integer, SubscriberChannelState> channelStates;
    private final SchedulerThread schedulerThread;
    private final SpeedyWiring wiring;
    private SpeedyMessagingSubscriberFactory speedyMessagingSubscriberFactory;

    public SpeedyMessagingImpl(SpeedyHost localhost, OnMessageErrorHandler subcribeErrorHandler, SpeedyWiring wiring, ImmutableIntMap<PublisherBucket> publisherBuckets, ImmutableIntMap<SpeedyConnection> remoteConnections, Map<Integer, SubscriberChannelState> channelStates, PacketHandler packetHandler, SchedulerThread schedulerThread) {
        this.wiring = wiring;
        this.localhost = localhost;
        this.publisherBuckets = publisherBuckets;
        this.remoteConnections = remoteConnections;
        this.channelStates = channelStates;
        this.schedulerThread = schedulerThread;
        this.speedyMessagingPublisher = new SpeedyMessagingPublisher(packetHandler, wiring, schedulerThread);
        this.speedyMessagingSubscriberFactory = new SpeedyMessagingSubscriberFactory(localhost, new MessageUnMarshaller(), remoteConnections, this.speedyMessagingPublisher, subcribeErrorHandler, wiring, publisherBuckets, schedulerThread, channelStates);
    }

    public void start() {
        this.subscribeToAllResponsesHandlers();
        this.speedyMessagingSubscriber = this.speedyMessagingSubscriberFactory.build();
        this.assertAllKeysHaveBeenSubscribedTo();
        this.wiring.start(this.speedyMessagingPublisher, this.speedyMessagingSubscriber, this.schedulerThread, this.remoteConnections);
        this.connectToRemoteHosts();
    }

    @Override
    public void subscribe(String key, SpeedyMessageHandler messageHandler) {
        this.speedyMessagingSubscriberFactory.addSubscriptionHandler(key, new GenericHandler(messageHandler));
    }

    @Override
    public void subscribe(String key, SpeedyMessageReplyHandler messageHandler) {
        this.speedyMessagingSubscriberFactory.addSubscriptionHandler(key, new GenericHandler(messageHandler));
    }

    @Override
    public void publish(String key, byte[] bytes) {
        this.speedyMessagingPublisher.publish(key, bytes);
    }

    @Override
    public void publish(String key, byte[] bytes, int dataLength) {
        this.speedyMessagingPublisher.publish(key, bytes, dataLength);
    }

    @Override
    public void request(String key, byte[] bytes, SpeedyMessageHandler responseHandler) {
        this.speedyMessagingPublisher.request(key, bytes, responseHandler);
    }

    @Override
    public void close() throws IOException {
        this.speedyMessagingPublisher.flush();
        if (this.speedyMessagingSubscriber != null) {
            this.speedyMessagingSubscriber.close();
        }
        this.speedyMessagingPublisher.close();
    }

    private void connectToRemoteHosts() {
        ThreadHandler threadHandler = this.wiring.getThreadHandler();
        for (SpeedyConnection remoteHost : this.remoteConnections.values().toArray(new SpeedyConnection[this.remoteConnections.size()])) {
            this.speedyMessagingPublisher.hostRegistration(new HostRegistrationMessage(this.localhost, remoteHost.getHost()), remoteHost);
        }
        threadHandler.join();
        this.wiring.connectIfSingleConnection(this.remoteConnections);
    }

    private void assertAllKeysHaveBeenSubscribedTo() {
        this.speedyMessagingSubscriberFactory.assertAllKeysHaveBeenSubscribedTo();
    }

    private void subscribeToAllResponsesHandlers() {
        for (SubscriberChannelState subscriberSequenceState : this.channelStates.values()) {
            if (subscriberSequenceState.getResponseKey() == null) continue;
            this.subscribe(subscriberSequenceState.getResponseKey(), subscriberSequenceState::handleResponse);
        }
    }
}

