/*
 * Decompiled with CFR 0.152.
 */
package com.spotify.scratch.persist;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.spotify.apollo.Client;
import com.spotify.apollo.Request;
import com.spotify.apollo.Response;
import com.spotify.apollo.Status;
import com.spotify.flo.Fn;
import com.spotify.flo.Task;
import com.spotify.flo.TaskContext;
import com.spotify.flo.TaskId;
import com.spotify.flo.context.AsyncContext;
import com.spotify.scratch.persist.Encoder;
import com.spotify.scratch.persist.LockHolder;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import okio.ByteString;

public class Dump
extends AsyncContext {
    private static final String LOCK_PATH = "/lock";
    private final ScheduledExecutorService scheduler;
    private final ExecutorService executor;
    private final ConcurrentMap<TaskId, EvalBundle<?>> ongoing = Maps.newConcurrentMap();
    private final Client client = r -> CompletableFuture.completedFuture(null);

    protected Dump(ScheduledExecutorService scheduledExecutorService, ExecutorService executor) {
        super((Executor)executor);
        this.scheduler = Objects.requireNonNull(scheduledExecutorService);
        this.executor = Objects.requireNonNull(executor);
    }

    public <T> TaskContext.Value<T> invokeProcessFn(TaskId taskId, Fn<TaskContext.Value<T>> processFn) {
        EvalBundle<T> evalBundle = this.lookupBundle(taskId);
        TaskContext.Promise promise = this.promise();
        LockHolder lockHolder = new LockHolder(taskId, "/", this.scheduler);
        ProcessBundle<T> processBundle = new ProcessBundle<T>(evalBundle, lockHolder, processFn, promise);
        ((ProcessBundle)processBundle).process();
        return promise.value();
    }

    private Runnable onExecutor(Runnable runnable) {
        return () -> this.executor.submit(runnable);
    }

    private <T> Consumer<T> onExecutor(Consumer<T> consumer) {
        return t -> this.executor.submit(() -> consumer.accept(t));
    }

    private <T> void chain(TaskContext.Value<T> value, TaskContext.Promise<T> promise) {
        value.consume(arg_0 -> promise.set(arg_0));
        value.onFail(arg_0 -> promise.fail(arg_0));
    }

    private static <T> Optional<Function<T, ByteString>> findEncoder(Class<T> type) {
        for (Method method : type.getDeclaredMethods()) {
            if (method.getDeclaredAnnotation(Encoder.class) == null) continue;
            return Optional.of(t -> (ByteString)Dump.invokeAndPropagateException(method, t));
        }
        return Optional.empty();
    }

    private static Object invokeAndPropagateException(Method method, Object ... args) {
        try {
            return method.invoke(null, args);
        }
        catch (IllegalAccessException | InvocationTargetException e) {
            throw Throwables.propagate((Throwable)e);
        }
    }

    private <T> EvalBundle<T> lookupBundle(TaskId taskId) {
        EvalBundle spin;
        while ((spin = (EvalBundle)this.ongoing.get(taskId)) == null) {
        }
        return spin;
    }

    private Request request(String method) {
        return Request.forUri((String)"//lock", (String)method);
    }

    static ByteString json(TaskId taskId) {
        return ByteString.encodeUtf8((String)("{\"task_id\":\"" + taskId.toString() + "\"}"));
    }

    static ByteString json(TaskId taskId, ByteString data) {
        return ByteString.encodeUtf8((String)("{\"task_id\":\"" + taskId.toString() + "\", \"data\":\"" + data.base64() + "\"}"));
    }

    private final class ProcessBundle<T> {
        private final EvalBundle<T> evalBundle;
        private final LockHolder lockHolder;
        private final Fn<TaskContext.Value<T>> processFn;
        private final TaskContext.Promise<T> promise;
        private final Semaphore notified = new Semaphore(1);

        public ProcessBundle(EvalBundle<T> evalBundle, LockHolder lockHolder, Fn<TaskContext.Value<T>> processFn, TaskContext.Promise<T> promise) {
            this.evalBundle = evalBundle;
            this.lockHolder = lockHolder;
            this.processFn = processFn;
            this.promise = promise;
        }

        private void process() {
            Consumer whenLocked = Dump.this.onExecutor(throwable -> {
                TaskId taskId = ((EvalBundle)this.evalBundle).task.id();
                if (throwable != null) {
                    if (throwable instanceof LockHolder.AlreadyLocked) {
                        if (this.notified.tryAcquire()) {
                            // empty if block
                        }
                        ((EvalBundle)this.evalBundle).fetchOrElse(this.promise, () -> Dump.this.scheduler.schedule(Dump.this.onExecutor(() -> this.process()), 1L, TimeUnit.SECONDS));
                    } else {
                        this.promise.fail(throwable);
                    }
                    return;
                }
                this.invoke();
            });
            if (this.shouldLock()) {
                this.lockHolder.lock(whenLocked);
            } else {
                this.invoke();
            }
        }

        private void invoke() {
            TaskId taskId = ((EvalBundle)this.evalBundle).task.id();
            TaskContext.Value value = (TaskContext.Value)this.processFn.get();
            value.consume(v -> this.complete(v, () -> this.promise.set(v), t -> this.promise.fail(t)));
            value.onFail(valueError -> this.fail(() -> this.promise.fail(valueError)));
        }

        private void complete(T value, Runnable callback, Consumer<Throwable> failure) {
            if (this.shouldLock()) {
                Optional<ByteString> encoded;
                try {
                    encoded = Dump.findEncoder(((EvalBundle)this.evalBundle).task.type()).map(fn -> (ByteString)fn.apply(value));
                }
                catch (Throwable t) {
                    this.lockHolder.unlock(() -> failure.accept(t));
                    return;
                }
                if (encoded.isPresent()) {
                    this.lockHolder.unlock(encoded.get(), Dump.this.onExecutor(callback));
                } else {
                    this.lockHolder.unlock(Dump.this.onExecutor(callback));
                }
            } else {
                callback.run();
            }
        }

        private void fail(Runnable callback) {
            if (this.shouldLock()) {
                this.lockHolder.unlock(Dump.this.onExecutor(callback));
            } else {
                callback.run();
            }
        }

        private boolean shouldLock() {
            return ((EvalBundle)this.evalBundle).decoder.isPresent();
        }
    }

    private final class EvalBundle<T> {
        private final Task<T> task;
        private final TaskContext.Promise<T> promise;
        private final Optional<Function<ByteString, T>> decoder;

        private EvalBundle(Task<T> task, TaskContext.Promise<T> promise, Optional<Function<ByteString, T>> decoder) {
            this.task = task;
            this.promise = promise;
            this.decoder = decoder;
        }

        private void fetchOrElse(TaskContext.Promise<T> promise, Runnable orElse) {
            Preconditions.checkState((boolean)this.decoder.isPresent(), (Object)"Must have decoder when fetching existing");
            Request getRequest = Dump.this.request("GET").withPayload(Dump.json(this.task.id()));
            Dump.this.client.send(getRequest).handleAsync(this.parseExisting(this.decoder.get()), Dump.this.executor).thenAcceptAsync(this.consumer(promise, orElse), Dump.this.executor);
        }

        private Consumer<TaskContext.Value<T>> consumer(TaskContext.Promise<T> promise, Runnable orElse) {
            return lookupValue -> {
                lookupValue.consume(value -> promise.set(value));
                lookupValue.onFail(\u02cd -> orElse.run());
            };
        }

        private BiFunction<Response<ByteString>, Throwable, TaskContext.Value<T>> parseExisting(Function<ByteString, T> decoder) {
            return (response, throwable) -> {
                TaskContext.Promise promise = Dump.this.promise();
                if (throwable != null) {
                    promise.fail(throwable);
                } else if (response.status().code() != Status.OK.code()) {
                    promise.fail((Throwable)new RuntimeException("Not OK"));
                } else if (!response.payload().isPresent()) {
                    promise.fail((Throwable)new RuntimeException("Response with no body returned for persisted value from flock"));
                } else {
                    ByteString data = (ByteString)response.payload().get();
                    Object value = decoder.apply(data);
                    promise.set(value);
                }
                return promise.value();
            };
        }
    }
}

