/*
 * Decompiled with CFR 0.152.
 */
package io.zeebe.transport.impl.actor;

import io.zeebe.dispatcher.BlockPeek;
import io.zeebe.dispatcher.Subscription;
import io.zeebe.transport.Loggers;
import io.zeebe.transport.NotConnectedException;
import io.zeebe.transport.impl.ControlMessages;
import io.zeebe.transport.impl.SendFailureHandler;
import io.zeebe.transport.impl.TransportChannel;
import io.zeebe.transport.impl.TransportContext;
import io.zeebe.transport.impl.actor.ActorContext;
import io.zeebe.util.actor.Actor;
import io.zeebe.util.state.ComposedState;
import io.zeebe.util.state.SimpleStateMachineContext;
import io.zeebe.util.state.State;
import io.zeebe.util.state.StateMachine;
import io.zeebe.util.state.StateMachineAgent;
import io.zeebe.util.time.ClockUtil;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.agrona.DirectBuffer;
import org.agrona.collections.Int2ObjectHashMap;

public class Sender
implements Actor {
    private final Int2ObjectHashMap<TransportChannel> channelMap = new Int2ObjectHashMap();
    private final Subscription senderSubscription;
    private final int maxPeekSize;
    protected final long keepAlivePeriod;
    protected final SendFailureHandler sendFailureHandler;
    protected long lastKeepAlive = 0L;
    private static final int DEFAULT = 0;
    private static final int DISCARD = 1;
    private static final int SEND_NEXT_KEEP_ALIVE = 2;
    private final PollState pollState = new PollState();
    private final ProcessState processState = new ProcessState();
    private final DiscardState discardState = new DiscardState();
    private final SendKeepAliveState keepAliveState = new SendKeepAliveState();
    private final StateMachine<SenderContext> stateMachine = StateMachine.builder(s -> new SenderContext((StateMachine<?>)s)).initialState((State)this.pollState).from((State)this.pollState).take(0).to((State)this.processState).from((State)this.pollState).take(2).to((State)this.keepAliveState).from((State)this.keepAliveState).take(2).to((State)this.keepAliveState).from((State)this.keepAliveState).take(0).to((State)this.pollState).from((State)this.processState).take(1).to((State)this.discardState).from((State)this.processState).take(0).to((State)this.pollState).from((State)this.discardState).take(0).to((State)this.pollState).build();
    private StateMachineAgent<SenderContext> stateMachineAgent = new StateMachineAgent(this.stateMachine);

    public Sender(ActorContext actorContext, TransportContext context) {
        this.senderSubscription = context.getSenderSubscription();
        this.maxPeekSize = context.getMessageMaxLength() * 16;
        this.sendFailureHandler = context.getSendFailureHandler();
        this.keepAlivePeriod = context.getChannelKeepAlivePeriod();
        actorContext.setSender(this);
    }

    public int doWork() throws Exception {
        return this.stateMachineAgent.doWork();
    }

    public void removeChannel(TransportChannel c) {
        this.stateMachineAgent.addCommand(ctx -> this.channelMap.remove(c.getStreamId()));
    }

    public void registerChannel(TransportChannel c) {
        long now = ClockUtil.getCurrentTimeInMillis();
        this.stateMachineAgent.addCommand(ctx -> {
            if (this.channelMap.isEmpty()) {
                this.lastKeepAlive = now;
            }
            this.channelMap.put(c.getStreamId(), (Object)c);
            Loggers.TRANSPORT_LOGGER.debug("Channel opened to remote {}", (Object)c.getRemoteAddress());
        });
    }

    static class SenderContext
    extends SimpleStateMachineContext {
        final BlockPeek blockPeek = new BlockPeek();
        TransportChannel writeChannel;
        int bytesWritten;
        String failure;
        Exception failureCause;
        Iterator<TransportChannel> channelIt;
        final ByteBuffer keepAliveBuffer = ByteBuffer.allocate(ControlMessages.KEEP_ALIVE.capacity());

        SenderContext(StateMachine<?> stateMachine) {
            super(stateMachine);
            ControlMessages.KEEP_ALIVE.getBytes(0, this.keepAliveBuffer, ControlMessages.KEEP_ALIVE.capacity());
            this.keepAliveBuffer.flip();
        }

        public void reset() {
            this.writeChannel = null;
            this.bytesWritten = 0;
            this.channelIt = null;
            this.keepAliveBuffer.clear();
            this.failure = null;
            this.failureCause = null;
        }
    }

    class SendKeepAliveState
    extends ComposedState<SenderContext> {
        protected final SelectChannelStep selectStep = new SelectChannelStep();
        protected final SendKeepAliveOnChannelStep sendStep = new SendKeepAliveOnChannelStep();

        SendKeepAliveState() {
        }

        protected List<ComposedState.Step<SenderContext>> steps() {
            return Arrays.asList(this.selectStep, this.sendStep);
        }

        public boolean isInterruptable() {
            return false;
        }

        class SendKeepAliveOnChannelStep
        implements ComposedState.Step<SenderContext> {
            SendKeepAliveOnChannelStep() {
            }

            public boolean doWork(SenderContext context) {
                if (context.writeChannel != null) {
                    int bytesSent;
                    boolean continueWithNextChannel = false;
                    if (context.keepAliveBuffer.remaining() > 0 && (bytesSent = context.writeChannel.write(context.keepAliveBuffer)) < 0) {
                        continueWithNextChannel = true;
                    }
                    if (context.keepAliveBuffer.remaining() == 0) {
                        continueWithNextChannel = true;
                    }
                    if (continueWithNextChannel) {
                        context.take(2);
                    }
                    return continueWithNextChannel;
                }
                context.take(0);
                return true;
            }
        }

        class SelectChannelStep
        implements ComposedState.Step<SenderContext> {
            SelectChannelStep() {
            }

            public boolean doWork(SenderContext context) {
                if (context.channelIt == null) {
                    context.channelIt = Sender.this.channelMap.values().iterator();
                }
                context.writeChannel = context.channelIt.hasNext() ? context.channelIt.next() : null;
                context.keepAliveBuffer.clear();
                return true;
            }
        }
    }

    class DiscardState
    implements State<SenderContext> {
        DiscardState() {
        }

        public int doWork(SenderContext context) throws Exception {
            BlockPeek blockPeek = context.blockPeek;
            if (Sender.this.sendFailureHandler != null) {
                for (DirectBuffer nextMessage : blockPeek) {
                    Sender.this.sendFailureHandler.onFragment(nextMessage, 0, nextMessage.capacity(), blockPeek.getStreamId(), context.failure, context.failureCause);
                }
            }
            blockPeek.markFailed();
            context.take(0);
            return 1;
        }
    }

    class ProcessState
    extends ComposedState<SenderContext> {
        private final AwaitChannelState awaitChannelState = new AwaitChannelState();
        private final WriteState writeState = new WriteState();

        ProcessState() {
        }

        protected List<ComposedState.Step<SenderContext>> steps() {
            return Arrays.asList(this.awaitChannelState, this.writeState);
        }

        class WriteState
        implements ComposedState.Step<SenderContext> {
            WriteState() {
            }

            public boolean doWork(SenderContext context) {
                TransportChannel writeChannel = context.writeChannel;
                BlockPeek blockPeek = context.blockPeek;
                int bytesWritten = writeChannel.write(blockPeek.getRawBuffer());
                if (bytesWritten == -1) {
                    context.failure = "Could not write to channel";
                    context.take(1);
                    return false;
                }
                context.bytesWritten += bytesWritten;
                if (context.bytesWritten == blockPeek.getBlockLength()) {
                    blockPeek.markCompleted();
                    context.take(0);
                    return true;
                }
                return false;
            }
        }

        class AwaitChannelState
        implements ComposedState.Step<SenderContext> {
            AwaitChannelState() {
            }

            public boolean doWork(SenderContext context) {
                BlockPeek blockPeek = context.blockPeek;
                TransportChannel ch = (TransportChannel)Sender.this.channelMap.get(blockPeek.getStreamId());
                if (ch != null && !ch.isClosed()) {
                    context.writeChannel = ch;
                    return true;
                }
                context.failure = "No available channel for remote";
                context.failureCause = new NotConnectedException(context.failure);
                context.take(1);
                return false;
            }
        }
    }

    class PollState
    implements State<SenderContext> {
        PollState() {
        }

        public int doWork(SenderContext context) {
            context.reset();
            long now = ClockUtil.getCurrentTimeInMillis();
            if (Sender.this.keepAlivePeriod > 0L && now - Sender.this.lastKeepAlive > Sender.this.keepAlivePeriod && !Sender.this.channelMap.isEmpty()) {
                context.take(2);
                Sender.this.lastKeepAlive = now;
                return 1;
            }
            int blockSize = Sender.this.senderSubscription.peekBlock(context.blockPeek, Sender.this.maxPeekSize, true);
            if (blockSize > 0) {
                context.take(0);
            }
            return blockSize;
        }
    }
}

