/*
 * Decompiled with CFR 0.152.
 */
package io.kiw.speedy.subscriber;

import io.kiw.speedy.SpeedyConnection;
import io.kiw.speedy.channel.NackSchedulerJob;
import io.kiw.speedy.helper.ImmutableIntMap;
import io.kiw.speedy.marshaller.MessageUnMarshaller;
import io.kiw.speedy.publisher.PublishPromise;
import io.kiw.speedy.publisher.SchedulerThread;
import io.kiw.speedy.subscriber.GenericHandler;
import io.kiw.speedy.subscriber.OnMessageErrorHandler;
import io.kiw.speedy.subscriber.SubscriberChannelState;
import io.kiw.speedy.wiring.SpeedyWiring;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;

public class SpeedyMessagingSubscriber
implements Runnable {
    private final MessageUnMarshaller messageUnmarshaller;
    private final ImmutableIntMap<SpeedyConnection> remoteConnections;
    private final PublishPromise publishPromise;
    private final ByteBuffer buffer;
    private final OnMessageErrorHandler onMessageErrorHandler;
    private final SpeedyWiring wiring;
    private final SchedulerThread schedulerThread;
    private final ImmutableIntMap<GenericHandler> subscriptions;
    private final NackSchedulerJob nackSchedulerJob;

    public SpeedyMessagingSubscriber(MessageUnMarshaller messageUnMarshaller, ImmutableIntMap<SpeedyConnection> remoteConnections, PublishPromise publishPromise, OnMessageErrorHandler onMessageErrorHandler, SpeedyWiring wiring, NackSchedulerJob nackSchedulerJob, SchedulerThread schedulerThread, ImmutableIntMap<GenericHandler> subscriptions) {
        this.messageUnmarshaller = messageUnMarshaller;
        this.remoteConnections = remoteConnections;
        this.publishPromise = publishPromise;
        this.onMessageErrorHandler = onMessageErrorHandler;
        this.wiring = wiring;
        this.schedulerThread = schedulerThread;
        this.subscriptions = subscriptions;
        this.buffer = ByteBuffer.allocateDirect(5920);
        this.wiring.registerFragmentHandler(this::handleFragment);
        this.nackSchedulerJob = nackSchedulerJob;
    }

    @Override
    public void run() {
        while (!Thread.interrupted()) {
            this.handleFragment();
        }
    }

    private void handleFragment() {
        this.buffer.clear();
        this.wiring.receive(this.buffer);
        this.buffer.flip();
        SubscriberChannelState channelState = this.handlePacket(this.buffer);
        if (channelState != null) {
            this.handleRecoveryMessages(channelState);
        }
    }

    private void handleRecoveryMessages(SubscriberChannelState channelChannelState) {
        ByteBuffer bufferFromRecover;
        while ((bufferFromRecover = channelChannelState.getNextRecoverMessage()).hasRemaining()) {
            long packetSequenceNumber = bufferFromRecover.getLong();
            int publisherIdentifier = bufferFromRecover.getInt();
            bufferFromRecover.getInt();
            while (bufferFromRecover.remaining() > 0) {
                this.messageUnmarshaller.unmarshallAndPotentiallyHandle(bufferFromRecover, this.subscriptions, this.onMessageErrorHandler, channelChannelState, publisherIdentifier, this.publishPromise);
            }
            channelChannelState.increment(packetSequenceNumber);
        }
    }

    private SubscriberChannelState handlePacket(ByteBuffer buffer) {
        int channelIdentifier;
        int publisherIdentifier;
        SpeedyConnection speedyConnection;
        SubscriberChannelState channelState;
        long expectedPacketSequenceNumber;
        if (buffer.remaining() == 0) {
            return null;
        }
        long packetSequenceNumber = buffer.getLong();
        if (packetSequenceNumber == (expectedPacketSequenceNumber = (channelState = (speedyConnection = this.remoteConnections.get(publisherIdentifier = buffer.getInt())).getChannelSequenceState(channelIdentifier = buffer.getInt())).getSequenceNumber())) {
            while (buffer.remaining() > 0) {
                this.messageUnmarshaller.unmarshallAndPotentiallyHandle(buffer, this.subscriptions, this.onMessageErrorHandler, channelState, publisherIdentifier, this.publishPromise);
            }
            channelState.increment(packetSequenceNumber);
        } else {
            if (packetSequenceNumber > expectedPacketSequenceNumber) {
                buffer.rewind();
                if (expectedPacketSequenceNumber == 0L) {
                    channelState.setSequenceNumber(packetSequenceNumber + 1L);
                    return this.handlePacket(buffer);
                }
                channelState.copyToRecovery(buffer, packetSequenceNumber);
                this.nackSchedulerJob.onNack(channelIdentifier, publisherIdentifier, expectedPacketSequenceNumber, packetSequenceNumber - 1L);
                return null;
            }
            if (packetSequenceNumber < expectedPacketSequenceNumber) {
                return null;
            }
        }
        buffer.clear();
        return channelState;
    }

    public void close() throws IOException {
        this.wiring.closeSubscriber();
    }

    public void addNackScheduledJob() {
        this.schedulerThread.addJob(new SchedulerThread.ScheduledJob(TimeUnit.MICROSECONDS.toNanos(100L), 0L, this.nackSchedulerJob::onPulse));
    }
}

