/*
 * Decompiled with CFR 0.152.
 */
package io.kiw.speedy.marshaller;

import io.kiw.speedy.PublisherBucket;
import io.kiw.speedy.SpeedyConnection;
import io.kiw.speedy.channel.PublisherSequenceState;
import io.kiw.speedy.marshaller.EventMarshaller;
import io.kiw.speedy.marshaller.PacketHandler;
import io.kiw.speedy.publisher.PacketFlusher;
import io.kiw.speedy.subscriber.SpeedyMessageHandler;
import io.kiw.speedy.wiring.SpeedyWiring;
import java.nio.ByteBuffer;

public class PacketHandlerImpl
implements PacketHandler {
    public static final int PACKET_HEADER_SIZE = 16;
    public static final int MESSAGE_HEADER_SIZE = 8;
    public static final int MAX_EVENT_DATA_SIZE_PER_PACKET = 5896;
    public static final int MAX_EVENT_SIZE = 188672;
    public static final int EVENT_HEADER_SIZE = 8;
    private final PacketFlusher packetflusher;
    private PublisherBucket[] publisherBuckets;
    private final SpeedyWiring wiring;
    private final EventMarshaller eventMarshaller;

    public PacketHandlerImpl(PacketFlusher packetflusher, EventMarshaller eventMarshaller, PublisherBucket[] publisherBuckets, SpeedyWiring wiring) {
        this.packetflusher = packetflusher;
        this.eventMarshaller = eventMarshaller;
        this.publisherBuckets = publisherBuckets;
        this.wiring = wiring;
    }

    @Override
    public synchronized void handleEvent(String key, byte[] eventData, int dataLength, boolean isManagementKey) {
        for (PublisherBucket publisherBucket : this.publisherBuckets) {
            if (!publisherBucket.hasKey(key)) continue;
            PublisherSequenceState packetChannelState = publisherBucket.getChannelState();
            SpeedyConnection[] connectionsToPublishTo = publisherBucket.getRemoteConnections();
            this.eventMarshaller.push(key, eventData, dataLength, packetChannelState, packetChannelState.getAndIncrementEventSequenceNumber(), connectionsToPublishTo, isManagementKey);
        }
    }

    @Override
    public synchronized void handleEventAndResponseHandler(String key, byte[] eventData, int dataLength, SpeedyMessageHandler responseHandler, boolean isManagementKey) {
        for (PublisherBucket publisherBucket : this.publisherBuckets) {
            if (!publisherBucket.hasKey(key)) continue;
            PublisherSequenceState packetChannelState = publisherBucket.getChannelState();
            SpeedyConnection[] connectionsToPublishTo = publisherBucket.getRemoteConnections();
            long eventSequenceNumber = packetChannelState.getAndIncrementEventSequenceNumber();
            publisherBucket.getSubscriberChannelState().addResponseHandler(responseHandler, connectionsToPublishTo.length, eventSequenceNumber);
            this.eventMarshaller.push(key, eventData, dataLength, packetChannelState, eventSequenceNumber, connectionsToPublishTo, isManagementKey);
        }
    }

    @Override
    public synchronized void resendPackets(PublisherBucket publisherBucket, long firstPacketKeyMissing, long lastPacketKeyMissing) {
        for (long i = firstPacketKeyMissing; i <= lastPacketKeyMissing; ++i) {
            int eventId = this.wiring.startEvent();
            ByteBuffer message = publisherBucket.getChannelState().getMessage(i);
            this.packetflusher.flushResendMessage(message, publisherBucket.getRemoteConnections());
            this.wiring.completeEvent(eventId);
        }
    }

    @Override
    public synchronized void flushBucketsIfRequired() {
        for (PublisherBucket publisherBucket : this.publisherBuckets) {
            ByteBuffer packetBuffer = publisherBucket.getPacketBuffer();
            if (packetBuffer.position() <= 16) continue;
            this.packetflusher.flushNewMessage(packetBuffer, publisherBucket.getRemoteConnections(), publisherBucket.getChannelState());
        }
    }
}

