/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.samples.stress;

import io.aeron.Aeron;
import io.aeron.FragmentAssembler;
import io.aeron.Publication;
import io.aeron.Subscription;
import io.aeron.logbuffer.Header;
import io.aeron.samples.stress.CRC64;
import io.aeron.samples.stress.StressUtil;
import java.nio.ByteOrder;
import java.util.Random;
import org.agrona.CloseHelper;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.concurrent.Agent;
import org.agrona.concurrent.EpochClock;
import org.agrona.concurrent.SystemEpochClock;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.YieldingIdleStrategy;

public class StressUnicastClient
implements Agent {
    private static final long TIMEOUT_MS = 5000L;
    private final String serverAddress;
    private final String clientAddress;
    private final EpochClock clock;
    private final MutableDirectBuffer msg;
    private final Long2ObjectHashMap<Boolean> inflightMessages = new Long2ObjectHashMap();
    private final FragmentAssembler unicastRspAssembler = new FragmentAssembler(this::unicastRspHandler);
    private final int maxInflight;
    private final int totalToSend;
    private final int mtu;
    private final byte[] buffer = new byte[131008];
    private int messageLength;
    private final CRC64 crc = new CRC64();
    private Aeron aeron;
    private Publication unicastPublication;
    private Subscription unicastSubscription;
    private long correlationId = 0L;
    private long lastMessageSent = 0L;

    public StressUnicastClient(String serverAddress, String clientAddress, EpochClock clock, int maxInflight, int totalToSend, int mtu) {
        this.serverAddress = serverAddress;
        this.clientAddress = clientAddress;
        this.clock = clock;
        this.maxInflight = maxInflight;
        this.totalToSend = totalToSend;
        this.mtu = mtu;
        Random r = new Random(42L);
        r.nextBytes(this.buffer);
        this.msg = new UnsafeBuffer(this.buffer);
    }

    @Override
    public void onStart() {
        StressUtil.info("server=" + this.serverAddress + ", client=" + this.clientAddress);
        this.aeron = Aeron.connect(new Aeron.Context());
        StressUtil.info("Connected to Aeron dir=" + this.aeron.context().aeronDirectoryName());
        this.unicastPublication = this.aeron.addExclusivePublication(StressUtil.unicastReqChannel(this.serverAddress).mtu(this.mtu).linger(0L).build(), 10001);
        this.unicastSubscription = this.aeron.addSubscription(StressUtil.unicastRspChannel(this.clientAddress).build(), 10001);
        StressUtil.info("publications and subscriptions created");
    }

    @Override
    public int doWork() {
        long timeSinceLastMessageMs;
        int count;
        if (0 == this.messageLength) {
            throw new IllegalStateException("messageLength has not been set");
        }
        int sendCount = 0;
        while (this.inflightMessages.size() < this.maxInflight && this.correlationId < (long)this.totalToSend && 0L < this.unicastPublication.offer(this.msg, 0, this.messageLength, this::currentCorrelationId)) {
            this.inflightMessages.put(this.correlationId, Boolean.TRUE);
            ++this.correlationId;
            ++sendCount;
            this.lastMessageSent = this.clock.time();
        }
        int recvCount = 0;
        while (0 != (count = this.unicastSubscription.poll(this.unicastRspAssembler, this.maxInflight))) {
            recvCount += count;
        }
        if (0L < this.correlationId && 0 == recvCount && !this.inflightMessages.isEmpty() && 5000L < (timeSinceLastMessageMs = this.clock.time() - this.lastMessageSent)) {
            throw new RuntimeException("No response received for " + timeSinceLastMessageMs + "ms, client=" + this);
        }
        return sendCount;
    }

    private boolean isComplete() {
        return (long)this.totalToSend <= this.correlationId && this.inflightMessages.isEmpty();
    }

    private void unicastRspHandler(DirectBuffer msg, int offset, int length, Header header) {
        long correlationId = header.reservedValue();
        StressUtil.validateMessage(this.crc, msg, offset, length, correlationId);
        this.inflightMessages.remove(correlationId);
    }

    private long currentCorrelationId(DirectBuffer message, int offset, int length) {
        return this.correlationId;
    }

    void reset(int messageLength) {
        this.messageLength = messageLength;
        this.correlationId = 0L;
        int payloadLength = messageLength - 8;
        long crcValue = this.crc.recalculate(this.buffer, 8, payloadLength);
        this.msg.putLong(0, crcValue, ByteOrder.LITTLE_ENDIAN);
    }

    @Override
    public void onClose() {
        CloseHelper.quietCloseAll(this.aeron);
    }

    @Override
    public String roleName() {
        return null;
    }

    public String toString() {
        return "StressClient{inflightMessages=" + this.inflightMessages + ", mtu=" + this.mtu + ", messageLength=" + this.messageLength + ", correlationId=" + this.correlationId + ", lastMessageSent=" + this.lastMessageSent + '}';
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String[] args) {
        YieldingIdleStrategy idleStrategy = new YieldingIdleStrategy();
        int maxMessageLength = 131008;
        int totalToSend = 100;
        for (int mtu : StressUtil.MTU_LENGTHS) {
            StressUnicastClient stressClient = new StressUnicastClient(StressUtil.serverAddress(), StressUtil.clientAddress(), new SystemEpochClock(), 20, 100, mtu);
            stressClient.onStart();
            try {
                for (int messageLength = 32; messageLength <= 131008; messageLength += 1024) {
                    stressClient.reset(messageLength);
                    while (!stressClient.isComplete()) {
                        idleStrategy.idle(stressClient.doWork());
                    }
                }
                StressUtil.info("Complete mtu=" + mtu);
            }
            finally {
                stressClient.onClose();
            }
        }
    }
}

