/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.transport.stream.impl;

import io.atomix.cluster.MemberId;
import io.camunda.zeebe.transport.stream.api.RemoteStreamErrorHandler;
import io.camunda.zeebe.transport.stream.api.RemoteStreamMetrics;
import io.camunda.zeebe.transport.stream.api.StreamResponseException;
import io.camunda.zeebe.transport.stream.impl.AggregatedRemoteStream;
import io.camunda.zeebe.transport.stream.impl.messages.ErrorResponse;
import io.camunda.zeebe.transport.stream.impl.messages.PushStreamRequest;
import io.camunda.zeebe.transport.stream.impl.messages.PushStreamResponse;
import io.camunda.zeebe.transport.stream.impl.messages.StreamResponseDecoder;
import io.camunda.zeebe.util.buffer.BufferWriter;
import io.camunda.zeebe.util.logging.ThrottledLogger;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class RemoteStreamPusher<P extends BufferWriter> {
    private static final Logger LOG = LoggerFactory.getLogger(RemoteStreamPusher.class);
    private final StreamResponseDecoder responseDecoder = new StreamResponseDecoder();
    private final ThrottledLogger pushErrorLogger = new ThrottledLogger(LOG, Duration.ofSeconds(5L));
    private final ThrottledLogger pushWarnLogger = new ThrottledLogger(LOG, Duration.ofSeconds(5L));
    private final RemoteStreamMetrics metrics;
    private final Transport transport;
    private final Executor executor;

    RemoteStreamPusher(Transport transport, Executor executor, RemoteStreamMetrics metrics) {
        this.metrics = Objects.requireNonNull(metrics, "must specify remote stream metrics");
        this.transport = Objects.requireNonNull(transport, "must provide a network transport");
        this.executor = Objects.requireNonNull(executor, "must provide an asynchronous executor");
    }

    public void pushAsync(P payload, RemoteStreamErrorHandler<P> errorHandler, AggregatedRemoteStream.StreamId streamId) {
        Objects.requireNonNull(errorHandler, "must specify a error handler");
        try {
            Objects.requireNonNull(payload, "must specify a payload");
            this.executor.execute(() -> this.push(payload, this.instrumentingErrorHandler(errorHandler, streamId), streamId));
        }
        catch (Exception e) {
            errorHandler.handleError(e, payload);
        }
    }

    private RemoteStreamErrorHandler<P> instrumentingErrorHandler(RemoteStreamErrorHandler<P> errorHandler, AggregatedRemoteStream.StreamId streamId) {
        return (error, payload) -> {
            if (error == null) {
                return;
            }
            if (error instanceof StreamResponseException) {
                StreamResponseException e = (StreamResponseException)((Object)((Object)error));
                this.logResponseError(streamId, payload, e);
                e.details().forEach(d -> this.metrics.pushTryFailed(d.code()));
            } else {
                this.pushWarnLogger.warn("Failed to push (size = {}) to stream {}", new Object[]{payload.getLength(), streamId, error});
            }
            this.metrics.pushFailed();
            errorHandler.handleError(error, payload);
        };
    }

    private void logResponseError(AggregatedRemoteStream.StreamId streamId, P payload, StreamResponseException e) {
        switch (e.code()) {
            case INVALID: 
            case MALFORMED: {
                this.pushErrorLogger.error("Failed to push (size = {}) to stream {}, request could not be parsed", new Object[]{payload.getLength(), streamId, e});
                break;
            }
            case EXHAUSTED: {
                LOG.trace("Failed to push (size = {}) to stream {} after trying all clients", new Object[]{payload.getLength(), streamId, e});
                break;
            }
            default: {
                this.pushWarnLogger.warn("Failed to push (size = {}) to stream {}", new Object[]{payload.getLength(), streamId, e});
            }
        }
    }

    private void push(P payload, RemoteStreamErrorHandler<P> errorHandler, AggregatedRemoteStream.StreamId streamId) {
        PushStreamRequest request = new PushStreamRequest().streamId(streamId.streamId()).payload((BufferWriter)payload);
        try {
            this.transport.send(request, streamId.receiver()).whenCompleteAsync((response, error) -> this.onPush(payload, errorHandler, (byte[])response, (Throwable)error), this.executor);
            LOG.trace("Pushed {} to stream {}", payload, (Object)streamId);
        }
        catch (Exception e) {
            errorHandler.handleError(e, payload);
        }
    }

    private void onPush(P payload, RemoteStreamErrorHandler<P> errorHandler, byte[] responseBuffer, Throwable error) {
        if (error != null) {
            errorHandler.handleError(error, payload);
            return;
        }
        this.responseDecoder.decode(responseBuffer, new PushStreamResponse()).mapLeft(ErrorResponse::asException).ifRightOrLeft(ok -> this.metrics.pushSucceeded(), failure -> errorHandler.handleError((Throwable)((Object)failure), payload));
    }

    static interface Transport {
        public CompletableFuture<byte[]> send(PushStreamRequest var1, MemberId var2) throws Exception;
    }
}

