/*
 * Decompiled with CFR 0.152.
 */
package io.fluxcapacitor.javaclient.publishing;

import io.fluxcapacitor.common.MessageType;
import io.fluxcapacitor.common.Registration;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.javaclient.common.ClientUtils;
import io.fluxcapacitor.javaclient.configuration.client.Client;
import io.fluxcapacitor.javaclient.publishing.RequestHandler;
import io.fluxcapacitor.javaclient.tracking.ConsumerConfiguration;
import io.fluxcapacitor.javaclient.tracking.client.DefaultTracker;
import java.beans.ConstructorProperties;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultRequestHandler
implements RequestHandler {
    private static final Logger log = LoggerFactory.getLogger(DefaultRequestHandler.class);
    private final Client client;
    private final MessageType resultType;
    private final Map<Integer, CompletableFuture<SerializedMessage>> callbacks = new ConcurrentHashMap<Integer, CompletableFuture<SerializedMessage>>();
    private final AtomicInteger nextId = new AtomicInteger();
    private final AtomicBoolean started = new AtomicBoolean();
    private volatile Registration registration;

    @Override
    public CompletableFuture<SerializedMessage> sendRequest(SerializedMessage request, Consumer<SerializedMessage> requestSender) {
        this.ensureStarted();
        CompletableFuture<SerializedMessage> result2 = new CompletableFuture<SerializedMessage>();
        int requestId = this.nextId.getAndIncrement();
        this.callbacks.put(requestId, result2);
        request.setRequestId(requestId);
        request.setSource(this.client.id());
        requestSender.accept(request);
        return result2;
    }

    @Override
    public List<CompletableFuture<SerializedMessage>> sendRequests(List<SerializedMessage> requests, Consumer<List<SerializedMessage>> requestSender) {
        this.ensureStarted();
        ArrayList<CompletableFuture<SerializedMessage>> futures = new ArrayList<CompletableFuture<SerializedMessage>>();
        requestSender.accept(requests.stream().peek(request -> {
            CompletableFuture result2 = new CompletableFuture();
            int requestId = this.nextId.getAndIncrement();
            this.callbacks.put(requestId, result2);
            request.setRequestId(requestId);
            request.setSource(this.client.id());
            futures.add(result2);
        }).collect(Collectors.toList()));
        return futures;
    }

    protected void handleMessages(List<SerializedMessage> messages) {
        messages.forEach(m -> {
            CompletableFuture<SerializedMessage> future = this.callbacks.remove(m.getRequestId());
            if (future == null) {
                log.warn("Received response with index {} for unknown request {}", (Object)m.getIndex(), (Object)m.getRequestId());
                return;
            }
            future.complete((SerializedMessage)m);
        });
    }

    protected void ensureStarted() {
        if (this.started.compareAndSet(false, true)) {
            this.registration = DefaultTracker.start(this::handleMessages, ConsumerConfiguration.getDefault(this.resultType).toBuilder().name("$request-handler").ignoreSegment(true).filterMessageTarget(true).build(), this.client);
        }
    }

    @Override
    public void close() {
        ClientUtils.waitForResults(Duration.ofSeconds(2L), this.callbacks.values());
        if (this.registration != null) {
            this.registration.cancel();
        }
    }

    @ConstructorProperties(value={"client", "resultType"})
    public DefaultRequestHandler(Client client, MessageType resultType) {
        this.client = client;
        this.resultType = resultType;
    }
}

