/*
 * Decompiled with CFR 0.152.
 */
package io.kiw.speedy.publisher;

import io.kiw.speedy.PublisherBucket;
import io.kiw.speedy.SpeedyConnection;
import io.kiw.speedy.exception.SpeedyMessagingNotInitiatedException;
import io.kiw.speedy.management.HostRegistrationMessage;
import io.kiw.speedy.management.ManagementKey;
import io.kiw.speedy.marshaller.PacketHandler;
import io.kiw.speedy.publisher.PublishPromise;
import io.kiw.speedy.publisher.SchedulerThread;
import io.kiw.speedy.subscriber.SpeedyMessageHandler;
import io.kiw.speedy.wiring.SpeedyWiring;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class SpeedyMessagingPublisher
implements PublishPromise {
    private final PacketHandler packetHandler;
    private final SpeedyWiring wiring;
    private final SchedulerThread schedulerThread;
    private volatile boolean initialised;

    public SpeedyMessagingPublisher(PacketHandler packetHandler, SpeedyWiring wiring, SchedulerThread schedulerThread) {
        this.packetHandler = packetHandler;
        this.wiring = wiring;
        this.schedulerThread = schedulerThread;
    }

    public void hostRegistration(HostRegistrationMessage message, SpeedyConnection speedyConnection) {
        String key = ManagementKey.HOST_REGISTRATION.getKey();
        byte[] initiateHostRegistration = message.getMarshalled();
        this.wiring.connectToRemoteHost(speedyConnection, () -> this.publish(key, initiateHostRegistration));
    }

    @Override
    public void publish(String key, byte[] data) {
        this.publish(key, data, data.length);
    }

    public void publish(String key, byte[] data, int dataLength) {
        if (!this.initialised) {
            throw new SpeedyMessagingNotInitiatedException("Publisher Non initiated");
        }
        int eventId = this.wiring.startEvent();
        boolean isManagementKey = ManagementKey.isManagementKey(key);
        this.packetHandler.handleEvent(key, data, dataLength, isManagementKey);
        this.wiring.completeEvent(eventId);
    }

    public void request(String key, byte[] data, SpeedyMessageHandler responseHandler) {
        int eventId = this.wiring.startEvent();
        boolean isManagementKey = ManagementKey.isManagementKey(key);
        this.packetHandler.handleEventAndResponseHandler(key, data, data.length, responseHandler, isManagementKey);
        this.wiring.completeEvent(eventId);
    }

    public void start() {
        this.enable();
    }

    public void enable() {
        this.initialised = true;
        this.schedulerThread.addJob(new SchedulerThread.ScheduledJob(TimeUnit.MILLISECONDS.toNanos(1L), this.wiring.getNanoTime(), this::flushBucketsIfRequired));
    }

    private void flushBucketsIfRequired(long nanoTime) {
        this.packetHandler.flushBucketsIfRequired();
    }

    @Override
    public void resend(PublisherBucket sequenceBucket, long firstPacketKeyMissing, long lastPacketKeyMissing) {
        this.packetHandler.resendPackets(sequenceBucket, firstPacketKeyMissing, lastPacketKeyMissing);
    }

    public synchronized void close() throws IOException {
        this.schedulerThread.stop();
        this.wiring.closePublisher();
    }

    public void flush() {
        this.packetHandler.flushBucketsIfRequired();
    }
}

