/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.shade.org.apache.bookkeeper.clients.impl.internal.mr;

import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import org.apache.pulsar.shade.com.google.common.util.concurrent.ListenableFuture;
import org.apache.pulsar.shade.org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
import org.apache.pulsar.shade.org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
import org.apache.pulsar.shade.org.apache.bookkeeper.clients.impl.internal.ProtocolInternalUtils;
import org.apache.pulsar.shade.org.apache.bookkeeper.clients.utils.ListenableFutureRpcProcessor;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.util.Backoff;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.proto.storage.GetActiveRangesRequest;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.proto.storage.StatusCode;

public class MetaRangeRequestProcessor<RespT>
extends ListenableFutureRpcProcessor<GetActiveRangesRequest, GetActiveRangesResponse, RespT> {
    private final GetActiveRangesRequest request;
    private final Function<GetActiveRangesResponse, RespT> responseFunc;

    public static <T> MetaRangeRequestProcessor<T> of(GetActiveRangesRequest request, Function<GetActiveRangesResponse, T> responseFunc, StorageContainerChannel channel, ScheduledExecutorService executor, Backoff.Policy backoffPolicy) {
        return new MetaRangeRequestProcessor<T>(request, responseFunc, channel, executor, backoffPolicy);
    }

    private MetaRangeRequestProcessor(GetActiveRangesRequest request, Function<GetActiveRangesResponse, RespT> responseFunc, StorageContainerChannel channel, ScheduledExecutorService executor, Backoff.Policy backoffPolicy) {
        super(channel, executor, backoffPolicy);
        this.request = request;
        this.responseFunc = responseFunc;
    }

    @Override
    protected GetActiveRangesRequest createRequest() {
        return this.request;
    }

    @Override
    protected ListenableFuture<GetActiveRangesResponse> sendRPC(StorageServerChannel rsChannel, GetActiveRangesRequest request) {
        return rsChannel.getMetaRangeService().getActiveRanges(request);
    }

    private String getIdentifier(GetActiveRangesRequest request) {
        return "" + request.getStreamId();
    }

    @Override
    protected RespT processResponse(GetActiveRangesResponse response) throws Exception {
        if (StatusCode.SUCCESS == response.getCode()) {
            return this.responseFunc.apply(response);
        }
        throw ProtocolInternalUtils.createMetaRangeException(this.getIdentifier(this.request), response.getCode());
    }
}

