/*
 * Decompiled with CFR 0.152.
 */
package com.avanza.astrix.gs.remoting;

import com.avanza.astrix.core.remoting.RoutingKey;
import com.avanza.astrix.gs.SpaceTaskDispatcher;
import com.avanza.astrix.gs.remoting.AstrixDistributedServiceInvocationTask;
import com.avanza.astrix.gs.remoting.AstrixServiceInvocationTask;
import com.avanza.astrix.remoting.client.AstrixServiceInvocationRequest;
import com.avanza.astrix.remoting.client.AstrixServiceInvocationResponse;
import com.avanza.astrix.remoting.client.RemotingTransportSpi;
import com.avanza.astrix.remoting.client.RoutedServiceInvocationRequest;
import com.avanza.astrix.remoting.util.GsUtil;
import com.gigaspaces.async.AsyncResult;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import rx.Observable;
import rx.functions.Func1;

public class GsRemotingTransport
implements RemotingTransportSpi {
    private final SpaceTaskDispatcher spaceTaskDispatcher;

    public GsRemotingTransport(SpaceTaskDispatcher spaceTaskDispatcher) {
        this.spaceTaskDispatcher = spaceTaskDispatcher;
    }

    public Observable<AstrixServiceInvocationResponse> submitRoutedRequest(AstrixServiceInvocationRequest request, RoutingKey routingKey) {
        return this.observeRoutedRequest(request, routingKey);
    }

    public Observable<List<AstrixServiceInvocationResponse>> submitRoutedRequests(Collection<RoutedServiceInvocationRequest> requests) {
        if (requests.isEmpty()) {
            return Observable.just(Collections.emptyList());
        }
        return this.observeRoutedReqeuests(requests);
    }

    public Observable<List<AstrixServiceInvocationResponse>> submitBroadcastRequest(AstrixServiceInvocationRequest request) {
        return this.observeBroadcastRequest(request);
    }

    private Observable<AstrixServiceInvocationResponse> observeRoutedRequest(AstrixServiceInvocationRequest request, RoutingKey routingKey) {
        return this.spaceTaskDispatcher.observe(new AstrixServiceInvocationTask(request), routingKey);
    }

    private Observable<List<AstrixServiceInvocationResponse>> observeRoutedReqeuests(Collection<RoutedServiceInvocationRequest> requests) {
        Observable result = Observable.empty();
        for (RoutedServiceInvocationRequest request : requests) {
            result = result.mergeWith(this.spaceTaskDispatcher.observe(new AstrixServiceInvocationTask(request.getRequest()), request.getRoutingkey()));
        }
        return result.toList();
    }

    private Observable<List<AstrixServiceInvocationResponse>> observeBroadcastRequest(AstrixServiceInvocationRequest request) {
        Observable<List<AsyncResult<AstrixServiceInvocationResponse>>> responses = this.spaceTaskDispatcher.observe(new AstrixDistributedServiceInvocationTask(request));
        Func1 listToObservable = GsUtil.asyncResultListToObservable();
        Observable responseStream = responses.flatMap(listToObservable);
        return responseStream.toList();
    }

    public int partitionCount() {
        return this.spaceTaskDispatcher.partitionCount();
    }
}

