/*
 * Decompiled with CFR 0.152.
 */
package io.fluxcapacitor.javaclient.publishing;

import io.fluxcapacitor.common.MessageType;
import io.fluxcapacitor.common.Registration;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.javaclient.FluxCapacitor;
import io.fluxcapacitor.javaclient.common.ClientUtils;
import io.fluxcapacitor.javaclient.configuration.client.Client;
import io.fluxcapacitor.javaclient.publishing.RequestHandler;
import io.fluxcapacitor.javaclient.tracking.ConsumerConfiguration;
import io.fluxcapacitor.javaclient.tracking.IndexUtils;
import io.fluxcapacitor.javaclient.tracking.client.DefaultTracker;
import java.beans.ConstructorProperties;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultRequestHandler
implements RequestHandler {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(DefaultRequestHandler.class);
    private final Client client;
    private final MessageType resultType;
    private final Duration timeout;
    private final String responseConsumerName;
    private final Map<Integer, CompletableFuture<SerializedMessage>> callbacks = new ConcurrentHashMap<Integer, CompletableFuture<SerializedMessage>>();
    private final AtomicInteger nextId = new AtomicInteger();
    private final AtomicBoolean started = new AtomicBoolean();
    private volatile Registration registration;

    public DefaultRequestHandler(Client client, MessageType resultType) {
        this(client, resultType, Duration.ofSeconds(200L), String.format("%s_%s", client.name(), "$request-handler"));
    }

    @Override
    public CompletableFuture<SerializedMessage> sendRequest(SerializedMessage request, Consumer<SerializedMessage> requestSender) {
        this.ensureStarted();
        int requestId = this.nextId.getAndIncrement();
        CompletionStage result = new CompletableFuture().orTimeout(this.timeout.getSeconds(), TimeUnit.SECONDS).whenComplete((m, e) -> this.callbacks.remove(requestId));
        this.callbacks.put(requestId, (CompletableFuture<SerializedMessage>)result);
        request.setRequestId(Integer.valueOf(requestId));
        request.setSource(this.client.id());
        requestSender.accept(request);
        return result;
    }

    @Override
    public List<CompletableFuture<SerializedMessage>> sendRequests(List<SerializedMessage> requests, Consumer<List<SerializedMessage>> requestSender) {
        this.ensureStarted();
        ArrayList<CompletableFuture<SerializedMessage>> futures = new ArrayList<CompletableFuture<SerializedMessage>>();
        requestSender.accept(requests.stream().peek(request -> {
            int requestId = this.nextId.getAndIncrement();
            CompletionStage result = new CompletableFuture().orTimeout(this.timeout.getSeconds(), TimeUnit.SECONDS).whenComplete((m, e) -> this.callbacks.remove(requestId));
            this.callbacks.put(requestId, (CompletableFuture<SerializedMessage>)result);
            request.setRequestId(Integer.valueOf(requestId));
            request.setSource(this.client.id());
            futures.add((CompletableFuture<SerializedMessage>)result);
        }).collect(Collectors.toList()));
        return futures;
    }

    protected void handleMessages(List<SerializedMessage> messages) {
        messages.stream().filter(m -> m.getRequestId() != null).forEach(m -> {
            CompletableFuture<SerializedMessage> future = this.callbacks.remove(m.getRequestId());
            if (future == null) {
                log.warn("Received response with index {} for unknown request {}", (Object)m.getIndex(), (Object)m.getRequestId());
                return;
            }
            future.complete((SerializedMessage)m);
        });
    }

    protected void ensureStarted() {
        if (this.started.compareAndSet(false, true)) {
            this.registration = DefaultTracker.start(this::handleMessages, this.resultType, ConsumerConfiguration.builder().name(this.responseConsumerName).ignoreSegment(true).clientControlledIndex(true).filterMessageTarget(true).minIndex(IndexUtils.indexFromTimestamp(FluxCapacitor.currentTime().minusSeconds(2L))).build(), this.client);
        }
    }

    @Override
    public void close() {
        ClientUtils.waitForResults(Duration.ofSeconds(2L), this.callbacks.values());
        if (this.registration != null) {
            this.registration.cancel();
        }
    }

    @ConstructorProperties(value={"client", "resultType", "timeout", "responseConsumerName"})
    @Generated
    public DefaultRequestHandler(Client client, MessageType resultType, Duration timeout, String responseConsumerName) {
        this.client = client;
        this.resultType = resultType;
        this.timeout = timeout;
        this.responseConsumerName = responseConsumerName;
    }
}

