/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.transport.stream.impl;

import io.atomix.cluster.MemberId;
import io.atomix.cluster.messaging.ClusterCommunicationService;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.ActorControl;
import io.camunda.zeebe.transport.stream.api.RemoteStream;
import io.camunda.zeebe.transport.stream.api.RemoteStreamErrorHandler;
import io.camunda.zeebe.transport.stream.api.RemoteStreamMetrics;
import io.camunda.zeebe.transport.stream.api.RemoteStreamer;
import io.camunda.zeebe.transport.stream.impl.AggregatedRemoteStream;
import io.camunda.zeebe.transport.stream.impl.ImmutableStreamRegistry;
import io.camunda.zeebe.transport.stream.impl.RemoteStreamImpl;
import io.camunda.zeebe.transport.stream.impl.RemoteStreamPusher;
import io.camunda.zeebe.transport.stream.impl.messages.PushStreamRequest;
import io.camunda.zeebe.transport.stream.impl.messages.StreamTopics;
import io.camunda.zeebe.util.buffer.BufferReader;
import io.camunda.zeebe.util.buffer.BufferUtil;
import io.camunda.zeebe.util.buffer.BufferWriter;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;

public final class RemoteStreamerImpl<M extends BufferReader, P extends BufferWriter>
extends Actor
implements RemoteStreamer<M, P> {
    private static final Duration REQUEST_TIMEOUT = Duration.ofSeconds(5L);
    private final ClusterCommunicationService transport;
    private final ImmutableStreamRegistry<M> registry;
    private final RemoteStreamPusher<P> remoteStreamPusher;
    private final RemoteStreamErrorHandler<P> errorHandler;

    public RemoteStreamerImpl(ClusterCommunicationService transport, ImmutableStreamRegistry<M> registry, RemoteStreamErrorHandler<P> errorHandler, RemoteStreamMetrics metrics) {
        this.transport = Objects.requireNonNull(transport, "must specify a network transport");
        this.registry = Objects.requireNonNull(registry, "must specify a job stream registry");
        this.errorHandler = Objects.requireNonNull(errorHandler, "must specify an error handler");
        this.remoteStreamPusher = new RemoteStreamPusher(this::send, arg_0 -> ((ActorControl)this.actor).run(arg_0), metrics);
    }

    @Override
    public Optional<RemoteStream<M, P>> streamFor(DirectBuffer streamType, Predicate<M> filter) {
        UnsafeBuffer streamTypeBuffer = new UnsafeBuffer(streamType);
        Set<AggregatedRemoteStream<M>> consumers = this.registry.get(streamTypeBuffer).stream().filter(s -> filter.test((BufferReader)s.metadata())).collect(Collectors.toSet());
        if (consumers.isEmpty()) {
            return Optional.empty();
        }
        return this.pickStream(consumers).map(target -> new RemoteStreamImpl(target, this.remoteStreamPusher, this.errorHandler));
    }

    private Optional<AggregatedRemoteStream<M>> pickStream(Set<AggregatedRemoteStream<M>> consumers) {
        ArrayList<AggregatedRemoteStream<M>> targets = new ArrayList<AggregatedRemoteStream<M>>(consumers);
        Collections.shuffle(targets);
        for (AggregatedRemoteStream<M> target : targets) {
            if (target.streamConsumers().isEmpty()) continue;
            return Optional.of(target);
        }
        return Optional.empty();
    }

    private CompletableFuture<Void> send(PushStreamRequest request, MemberId receiver) {
        return this.transport.send(StreamTopics.PUSH.topic(), (Object)request, BufferUtil::bufferAsArray, Function.identity(), receiver, REQUEST_TIMEOUT).thenApply(ok -> null);
    }
}

