/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.samples.stress;

import io.aeron.Aeron;
import io.aeron.Counter;
import io.aeron.FragmentAssembler;
import io.aeron.Publication;
import io.aeron.Subscription;
import io.aeron.logbuffer.Header;
import io.aeron.samples.stress.CRC64;
import io.aeron.samples.stress.StressUtil;
import java.nio.ByteOrder;
import java.util.Random;
import org.agrona.CloseHelper;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.Long2LongCounterMap;
import org.agrona.concurrent.Agent;
import org.agrona.concurrent.EpochClock;
import org.agrona.concurrent.SystemEpochClock;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.YieldingIdleStrategy;

public class StressMdcClient
implements Agent {
    private static final long TIMEOUT_MS = 5000L;
    private static final int EXPECTED_RESPONSE_COUNT = 4;
    private final String serverAddress;
    private final String clientAddress;
    private final EpochClock clock;
    private final MutableDirectBuffer msg;
    private final Long2LongCounterMap inflightMessages = new Long2LongCounterMap(-1L);
    private final FragmentAssembler mdcRspAssembler1 = new FragmentAssembler(this::mdcRspHandler);
    private final FragmentAssembler mdcRspAssembler2 = new FragmentAssembler(this::mdcRspHandler);
    private final int maxInflight;
    private final int totalToSend;
    private final int mtu;
    private final byte[] buffer = new byte[131008];
    private final CRC64 crc = new CRC64();
    private Aeron aeron;
    private Publication mdcPublication;
    private Subscription mdcSubscription1;
    private Subscription mdcSubscription2;
    private long correlationId = 0L;
    private long lastMessageSent = 0L;
    private int messageLength;
    private Counter clientReceiveCount;
    private Counter clientSendCount;

    public StressMdcClient(String serverAddress, String clientAddress, EpochClock clock, int maxInflight, int totalToSend, int mtu) {
        this.serverAddress = serverAddress;
        this.clientAddress = clientAddress;
        this.clock = clock;
        this.maxInflight = maxInflight;
        this.totalToSend = totalToSend;
        this.mtu = mtu;
        Random r = new Random(42L);
        r.nextBytes(this.buffer);
        this.msg = new UnsafeBuffer(this.buffer);
    }

    @Override
    public void onStart() {
        this.aeron = Aeron.connect(new Aeron.Context());
        this.mdcPublication = this.aeron.addExclusivePublication(StressUtil.mdcReqPubChannel(this.clientAddress).mtu(this.mtu).linger(0L).build(), 10002);
        this.mdcSubscription1 = this.aeron.addSubscription(StressUtil.mdcRspSubChannel1(this.serverAddress, this.clientAddress).build(), 10002, StressUtil::imageAvailable, StressUtil::imageUnavailable);
        this.mdcSubscription2 = this.aeron.addSubscription(StressUtil.mdcRspSubChannel2(this.serverAddress, this.clientAddress).build(), 10002, StressUtil::imageAvailable, StressUtil::imageUnavailable);
        this.clientReceiveCount = this.aeron.addCounter(1002, "Client Receive Count");
        this.clientSendCount = this.aeron.addCounter(1003, "Client Send Count");
    }

    @Override
    public int doWork() {
        long timeSinceLastMessageMs;
        int count;
        if (!this.mdcSubscription1.isConnected() || !this.mdcSubscription2.isConnected()) {
            return 0;
        }
        if (0 == this.messageLength) {
            throw new IllegalStateException("messageLength has not been set");
        }
        int sendCount = 0;
        while (this.inflightMessages.size() < this.maxInflight && this.correlationId < (long)this.totalToSend && 0L < this.mdcPublication.offer(this.msg, 0, this.messageLength, this::currentCorrelationId)) {
            this.inflightMessages.put(this.correlationId, 0L);
            ++this.correlationId;
            ++sendCount;
            this.lastMessageSent = this.clock.time();
            this.clientSendCount.increment();
        }
        int recvCount = 0;
        while (0 != (count = this.poll(this.maxInflight))) {
            recvCount += count;
        }
        if (0L < this.correlationId && 0 == recvCount && !this.inflightMessages.isEmpty() && 5000L < (timeSinceLastMessageMs = this.clock.time() - this.lastMessageSent)) {
            throw new RuntimeException("No response received for " + timeSinceLastMessageMs + "ms, client=" + String.valueOf(this));
        }
        return sendCount;
    }

    private int poll(int pollLimit) {
        int count = 0;
        count += this.mdcSubscription1.poll(this.mdcRspAssembler1, pollLimit);
        return count += this.mdcSubscription2.poll(this.mdcRspAssembler2, pollLimit);
    }

    private boolean isComplete() {
        return (long)this.totalToSend <= this.correlationId && this.inflightMessages.isEmpty();
    }

    private void mdcRspHandler(DirectBuffer msg, int offset, int length, Header header) {
        long correlationId = header.reservedValue();
        StressUtil.validateMessage(this.crc, msg, offset, length, correlationId);
        this.clientReceiveCount.increment();
        long responseCount = this.inflightMessages.incrementAndGet(correlationId);
        if (4L <= responseCount) {
            this.inflightMessages.remove(correlationId);
        }
    }

    private long currentCorrelationId(DirectBuffer message, int offset, int length) {
        return this.correlationId;
    }

    void reset(int messageLength) {
        this.messageLength = messageLength;
        this.correlationId = 0L;
        int payloadLength = messageLength - 8;
        long crcValue = this.crc.recalculate(this.buffer, 8, payloadLength);
        this.msg.putLong(0, crcValue, ByteOrder.LITTLE_ENDIAN);
    }

    @Override
    public void onClose() {
        CloseHelper.quietCloseAll(this.aeron);
    }

    @Override
    public String roleName() {
        return null;
    }

    public String toString() {
        return "StressClient{inflightMessages=" + String.valueOf(this.inflightMessages) + ", mtu=" + this.mtu + ", messageLength=" + this.messageLength + ", correlationId=" + this.correlationId + ", lastMessageSent=" + this.lastMessageSent + "}";
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String[] args) {
        YieldingIdleStrategy idleStrategy = new YieldingIdleStrategy();
        int startMessageLength = 32;
        int maxMessageLength = 131008;
        int totalToSend = 100;
        for (int mtu : StressUtil.MTU_LENGTHS) {
            StressMdcClient client = new StressMdcClient(StressUtil.serverAddress(), StressUtil.clientAddress(), new SystemEpochClock(), 20, 100, mtu);
            try {
                client.onStart();
                for (int messageLength = 32; messageLength <= 131008; messageLength += 1024) {
                    client.reset(messageLength);
                    while (!client.isComplete()) {
                        idleStrategy.idle(client.doWork());
                    }
                }
                StressUtil.info("Complete mtu=" + mtu);
            }
            finally {
                client.onClose();
            }
        }
    }
}

