/*
 * Decompiled with CFR 0.152.
 */
package org.openucx.jucx.examples;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicReference;
import org.openucx.jucx.UcxCallback;
import org.openucx.jucx.UcxUtils;
import org.openucx.jucx.examples.UcxBenchmark;
import org.openucx.jucx.ucp.UcpEndpoint;
import org.openucx.jucx.ucp.UcpEndpointParams;
import org.openucx.jucx.ucp.UcpListener;
import org.openucx.jucx.ucp.UcpListenerParams;
import org.openucx.jucx.ucp.UcpMemory;
import org.openucx.jucx.ucp.UcpRemoteKey;
import org.openucx.jucx.ucp.UcpRequest;

public class UcxReadBWBenchmarkReceiver
extends UcxBenchmark {
    public static void main(String[] args) throws Exception {
        if (!UcxReadBWBenchmarkReceiver.initializeArguments(args)) {
            return;
        }
        UcxReadBWBenchmarkReceiver.createContextAndWorker();
        String serverHost = (String)argsMap.get("s");
        InetSocketAddress sockaddr = new InetSocketAddress(serverHost, serverPort);
        AtomicReference<Object> connRequest = new AtomicReference<Object>(null);
        UcpListener listener = worker.newListener(new UcpListenerParams().setConnectionHandler(connRequest::set).setSockAddr(sockaddr));
        resources.push(listener);
        System.out.println("Waiting for connections on " + sockaddr + " ...");
        while (connRequest.get() == null) {
            worker.progress();
        }
        UcpEndpoint endpoint = worker.newEndpoint(new UcpEndpointParams().setConnectionRequest(connRequest.get()).setPeerErrorHandlingMode());
        ByteBuffer recvBuffer = ByteBuffer.allocateDirect(4096);
        UcpRequest recvRequest = worker.recvTaggedNonBlocking(recvBuffer, null);
        worker.progressRequest(recvRequest);
        long remoteAddress = recvBuffer.getLong();
        final long remoteSize = recvBuffer.getLong();
        int remoteKeySize = recvBuffer.getInt();
        int rkeyBufferOffset = recvBuffer.position();
        recvBuffer.position(rkeyBufferOffset + remoteKeySize);
        final int remoteHashCode = recvBuffer.getInt();
        System.out.printf("Received connection. Will read %d bytes from remote address %d%n", remoteSize, remoteAddress);
        recvBuffer.position(rkeyBufferOffset);
        UcpRemoteKey remoteKey = endpoint.unpackRemoteKey(recvBuffer);
        resources.push(remoteKey);
        UcpMemory recvMemory = context.memoryMap(allocationParams);
        resources.push(recvMemory);
        final ByteBuffer data = UcxUtils.getByteBufferView(recvMemory.getAddress(), Math.min(Integer.MAX_VALUE, totalSize));
        int i = 0;
        while (i < numIterations) {
            final int iterNum = i++;
            UcpRequest getRequest = endpoint.getNonBlocking(remoteAddress, remoteKey, recvMemory.getAddress(), remoteSize, new UcxCallback(){
                final long startTime = System.nanoTime();

                @Override
                public void onSuccess(UcpRequest request) {
                    long finishTime = System.nanoTime();
                    data.clear();
                    assert (data.hashCode() == remoteHashCode);
                    double bw = UcxBenchmark.getBandwithGbits(finishTime - this.startTime, remoteSize);
                    System.out.printf("Iteration %d, bandwidth: %.4f GB/s%n", iterNum, bw);
                }
            });
            worker.progressRequest(getRequest);
            data.put(0, (byte)1);
        }
        UcpRequest closeRequest = endpoint.closeNonBlockingFlush();
        worker.progressRequest(closeRequest);
        UcxReadBWBenchmarkReceiver.closeResources();
    }
}

