/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.transport.stream.impl;

import io.atomix.cluster.MemberId;
import io.atomix.cluster.messaging.ClusterCommunicationService;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.ActorControl;
import io.camunda.zeebe.transport.stream.impl.RemoteStreamApiHandler;
import io.camunda.zeebe.transport.stream.impl.messages.AddStreamRequest;
import io.camunda.zeebe.transport.stream.impl.messages.MessageUtil;
import io.camunda.zeebe.transport.stream.impl.messages.RemoveStreamRequest;
import io.camunda.zeebe.transport.stream.impl.messages.StreamTopics;
import io.camunda.zeebe.util.buffer.BufferReader;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class RemoteStreamTransport<M extends BufferReader>
extends Actor {
    private static final byte[] EMPTY_PAYLOAD = new byte[0];
    private static final Duration REQUEST_TIMEOUT = Duration.ofSeconds(5L);
    private static final Logger LOG = LoggerFactory.getLogger(RemoteStreamTransport.class);
    private final ClusterCommunicationService transport;
    private final RemoteStreamApiHandler<M> requestHandler;

    public RemoteStreamTransport(ClusterCommunicationService transport, RemoteStreamApiHandler<M> requestHandler) {
        this.transport = transport;
        this.requestHandler = requestHandler;
    }

    protected void onActorStarting() {
        this.transport.replyTo(StreamTopics.ADD.topic(), MessageUtil::parseAddRequest, this::onAdd, Function.identity(), arg_0 -> ((ActorControl)this.actor).run(arg_0));
        this.transport.replyTo(StreamTopics.REMOVE.topic(), MessageUtil::parseRemoveRequest, this::onRemove, Function.identity(), arg_0 -> ((ActorControl)this.actor).run(arg_0));
        this.transport.replyTo(StreamTopics.REMOVE_ALL.topic(), Function.identity(), this::onRemoveAll, Function.identity(), arg_0 -> ((ActorControl)this.actor).run(arg_0));
    }

    protected void onActorClosing() {
        this.transport.unsubscribe(StreamTopics.ADD.topic());
        this.transport.unsubscribe(StreamTopics.REMOVE.topic());
        this.transport.unsubscribe(StreamTopics.REMOVE_ALL.topic());
        this.requestHandler.close();
    }

    public void removeAll(MemberId member) {
        this.actor.run(() -> this.requestHandler.removeAll(member));
    }

    private byte[] onAdd(MemberId sender, AddStreamRequest request) {
        this.requestHandler.add(sender, request);
        return EMPTY_PAYLOAD;
    }

    private byte[] onRemove(MemberId sender, RemoveStreamRequest request) {
        this.requestHandler.remove(sender, request);
        return EMPTY_PAYLOAD;
    }

    private byte[] onRemoveAll(MemberId sender, byte[] ignored) {
        this.requestHandler.removeAll(sender);
        return EMPTY_PAYLOAD;
    }

    public void recreateStreams(MemberId receiver) {
        try {
            this.sendRestartStreamsCommand(receiver);
            LOG.debug("Tried to restart streams with member: {}", (Object)receiver);
        }
        catch (Exception e) {
            LOG.warn("Failed to restart streams with member: {}", (Object)receiver, (Object)e);
        }
    }

    private CompletableFuture<Void> sendRestartStreamsCommand(MemberId receiver) {
        return this.transport.send(StreamTopics.RESTART_STREAMS.topic(), (Object)EMPTY_PAYLOAD, Function.identity(), Function.identity(), receiver, REQUEST_TIMEOUT).thenApply(ok -> null);
    }
}

