/*
 * Decompiled with CFR 0.152.
 */
package com.avanza.astrix.gs;

import com.avanza.astrix.beans.async.ContextPropagation;
import com.avanza.astrix.config.DynamicBooleanProperty;
import com.avanza.astrix.config.DynamicConfig;
import com.avanza.astrix.config.DynamicIntProperty;
import com.avanza.astrix.core.ServiceUnavailableException;
import com.avanza.astrix.core.util.NamedThreadFactory;
import com.avanza.astrix.remoting.util.GsUtil;
import com.gigaspaces.async.AsyncFuture;
import com.gigaspaces.internal.client.spaceproxy.SpaceProxyImpl;
import com.j_spaces.core.IJSpace;
import java.io.Serializable;
import java.util.Objects;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.UnaryOperator;
import org.openspaces.core.GigaSpace;
import org.openspaces.core.executor.DistributedTask;
import org.openspaces.core.executor.Task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;

public final class SpaceTaskDispatcher {
    private static final Logger log = LoggerFactory.getLogger(SpaceTaskDispatcher.class);
    private final GigaSpace gigaSpace;
    private final ContextPropagation contextPropagation;
    private final ThreadPoolExecutor executorService;
    private final DynamicBooleanProperty propagateAsyncContexts;

    @Deprecated
    public SpaceTaskDispatcher(GigaSpace gigaSpace, DynamicConfig config) {
        this(gigaSpace, config, ContextPropagation.NONE);
    }

    public SpaceTaskDispatcher(GigaSpace gigaSpace, DynamicConfig config, ContextPropagation contextPropagation) {
        this.gigaSpace = gigaSpace;
        this.contextPropagation = Objects.requireNonNull(contextPropagation);
        String spaceInstanceName = gigaSpace.getName();
        DynamicIntProperty poolSize = config.getIntProperty("astrix.beans.gigaspace." + spaceInstanceName + ".spaceTaskDispatcher.poolsize", 10);
        this.executorService = new ThreadPoolExecutor(poolSize.get(), poolSize.get(), 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), (ThreadFactory)new NamedThreadFactory(String.format("SpaceTaskDispatcher[%s]", spaceInstanceName)));
        poolSize.addListener(newValue -> {
            log.info(String.format("Changing pool-size for SpaceTaskDispatcher. space=%s newSize=%s, oldSize=%s", this.gigaSpace.getName(), newValue, this.executorService.getMaximumPoolSize()));
            this.executorService.setCorePoolSize((int)newValue);
            this.executorService.setMaximumPoolSize((int)newValue);
        });
        this.propagateAsyncContexts = config.getBooleanProperty("com.avanza.astrix.gs.SpaceTaskDispatcher.propagateAsyncContexts", true);
    }

    public IJSpace getSpace() {
        return this.gigaSpace.getSpace();
    }

    public int partitionCount() {
        IJSpace space = this.gigaSpace.getSpace();
        if (space instanceof SpaceProxyImpl) {
            return ((SpaceProxyImpl)SpaceProxyImpl.class.cast(space)).getSpaceClusterInfo().getNumberOfPartitions();
        }
        throw new IllegalStateException("Cant decide cluster topology on clustered proxy: " + this.gigaSpace.getName());
    }

    public <T extends Serializable> Observable<T> observe(Task<T> task, Object routingKey) {
        return Observable.unsafeCreate(subscriber -> this.usingErrorReporter((Subscriber<?>)subscriber, this.serviceUnavailable()).accept(() -> {
            Runnable command = this.contextPropagation.wrap(() -> this.submitRoutedTaskExecution((Subscriber)subscriber, task, routingKey));
            this.executorService.execute(command);
        }));
    }

    private <T extends Serializable> void submitRoutedTaskExecution(Subscriber<? super T> subscriber, Task<T> task, Object routingKey) {
        this.usingErrorReporter(subscriber, this.serviceUnavailable()).accept(() -> {
            AsyncFuture taskResult = this.gigaSpace.execute(task, routingKey);
            if (this.propagateAsyncContexts.get()) {
                GsUtil.subscribe(taskResult, subscriber, this.contextPropagation);
            } else {
                GsUtil.subscribe(taskResult, subscriber);
            }
        });
    }

    public <T extends Serializable, R> Observable<R> observe(DistributedTask<T, R> distributedTask) {
        return Observable.unsafeCreate(t1 -> {
            Runnable command = this.contextPropagation.wrap(() -> this.submitDistributedTaskExecution(distributedTask, (Subscriber)t1));
            this.usingErrorReporter((Subscriber<?>)t1, this.serviceUnavailable()).accept(() -> this.executorService.execute(command));
        });
    }

    private <R, T extends Serializable> void submitDistributedTaskExecution(DistributedTask<T, R> distributedTask, Subscriber<? super R> t1) {
        this.usingErrorReporter(t1, this.serviceUnavailable()).accept(() -> {
            AsyncFuture taskResult = this.gigaSpace.execute(distributedTask);
            if (this.propagateAsyncContexts.get()) {
                GsUtil.subscribe(taskResult, t1, this.contextPropagation);
            } else {
                GsUtil.subscribe(taskResult, t1);
            }
        });
    }

    private Consumer<Runnable> usingErrorReporter(Subscriber<?> subscriber, UnaryOperator<Exception> exceptionTranslator) {
        return command -> {
            try {
                command.run();
            }
            catch (Exception e) {
                subscriber.onError((Throwable)exceptionTranslator.apply(e));
            }
        };
    }

    private UnaryOperator<Exception> serviceUnavailable() {
        return e -> new ServiceUnavailableException("Failed to submit remote invocation task", (Throwable)e);
    }

    public void destroy() {
        this.executorService.shutdown();
    }

    public String getSpaceName() {
        return this.gigaSpace.getName();
    }
}

