/*
 * Decompiled with CFR 0.152.
 */
package com.spotify.styx.api;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Iterables;
import com.google.common.collect.Streams;
import com.google.common.io.Closer;
import com.spotify.apollo.Client;
import com.spotify.apollo.Request;
import com.spotify.apollo.RequestContext;
import com.spotify.apollo.Response;
import com.spotify.apollo.Status;
import com.spotify.apollo.StatusType;
import com.spotify.apollo.entity.EntityCodec;
import com.spotify.apollo.entity.EntityMiddleware;
import com.spotify.apollo.entity.JacksonEntityCodec;
import com.spotify.apollo.route.AsyncHandler;
import com.spotify.apollo.route.Middleware;
import com.spotify.apollo.route.Route;
import com.spotify.futures.CompletableFutures;
import com.spotify.styx.api.Api;
import com.spotify.styx.api.BackfillPayload;
import com.spotify.styx.api.BackfillsPayload;
import com.spotify.styx.api.Middlewares;
import com.spotify.styx.api.RequestAuthenticator;
import com.spotify.styx.api.ResponseException;
import com.spotify.styx.api.RunStateDataPayload;
import com.spotify.styx.api.WorkflowActionAuthorizer;
import com.spotify.styx.api.WorkflowResource;
import com.spotify.styx.model.Backfill;
import com.spotify.styx.model.BackfillBuilder;
import com.spotify.styx.model.BackfillInput;
import com.spotify.styx.model.EditableBackfillInput;
import com.spotify.styx.model.Event;
import com.spotify.styx.model.Schedule;
import com.spotify.styx.model.Workflow;
import com.spotify.styx.model.WorkflowId;
import com.spotify.styx.model.WorkflowInstance;
import com.spotify.styx.serialization.Json;
import com.spotify.styx.state.RunState;
import com.spotify.styx.state.StateData;
import com.spotify.styx.storage.Storage;
import com.spotify.styx.storage.StorageTransaction;
import com.spotify.styx.util.CloserUtil;
import com.spotify.styx.util.ParameterUtil;
import com.spotify.styx.util.RandomGenerator;
import com.spotify.styx.util.ReplayEvents;
import com.spotify.styx.util.ResourceNotFoundException;
import com.spotify.styx.util.Time;
import com.spotify.styx.util.TimeUtil;
import com.spotify.styx.util.WorkflowValidator;
import java.io.Closeable;
import java.io.IOException;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import okio.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class BackfillResource
implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(WorkflowResource.class);
    static final String BASE = "/backfills";
    private static final String SCHEDULER_BASE_PATH = "/api/v0";
    private static final String UNKNOWN = "UNKNOWN";
    private static final String WAITING = "WAITING";
    private static final int CONCURRENCY = 64;
    private final Closer closer = Closer.create();
    private final Storage storage;
    private final String schedulerServiceBaseUrl;
    private final WorkflowValidator workflowValidator;
    private final Time time;
    private final ForkJoinPool forkJoinPool;
    private final WorkflowActionAuthorizer workflowActionAuthorizer;

    public BackfillResource(String schedulerServiceBaseUrl, Storage storage, WorkflowValidator workflowValidator, Time time, WorkflowActionAuthorizer workflowActionAuthorizer) {
        this.schedulerServiceBaseUrl = Objects.requireNonNull(schedulerServiceBaseUrl, "schedulerServiceBaseUrl");
        this.storage = Objects.requireNonNull(storage, "storage");
        this.workflowValidator = Objects.requireNonNull(workflowValidator, "workflowValidator");
        this.time = Objects.requireNonNull(time, "time");
        this.workflowActionAuthorizer = Objects.requireNonNull(workflowActionAuthorizer, "workflowActionAuthorizer");
        this.forkJoinPool = (ForkJoinPool)CloserUtil.register((Closer)this.closer, (ExecutorService)new ForkJoinPool(64), (String)"backfill-resource");
    }

    public Stream<Route<AsyncHandler<Response<ByteString>>>> routes(RequestAuthenticator authenticator) {
        EntityMiddleware em = EntityMiddleware.forCodec((EntityCodec)JacksonEntityCodec.forMapper((ObjectMapper)Json.OBJECT_MAPPER));
        List entityRoutes = Stream.of(Route.with((Middleware)em.serializerDirect(BackfillsPayload.class), (String)"GET", (String)BASE, this::getBackfills), Route.with((Middleware)Middlewares.authedEntity((RequestAuthenticator)authenticator, (Middleware)em.response(BackfillInput.class, Backfill.class)), (String)"POST", (String)BASE, ac -> rc -> payload -> this.postBackfill((Middlewares.AuthContext)ac, (RequestContext)rc, (BackfillInput)payload)), Route.with((Middleware)em.serializerResponse(BackfillPayload.class), (String)"GET", (String)"/backfills/<bid>", rc -> this.getBackfill(rc, (String)rc.pathArgs().get("bid"))), Route.with((Middleware)Middlewares.authedEntity((RequestAuthenticator)authenticator, (Middleware)em.response(EditableBackfillInput.class, Backfill.class)), (String)"PUT", (String)"/backfills/<bid>", ac -> rc -> payload -> this.updateBackfill((Middlewares.AuthContext)ac, (String)rc.pathArgs().get("bid"), (EditableBackfillInput)payload))).map(r -> r.withMiddleware(Middleware::syncToAsync)).collect(Collectors.toList());
        List<Route> routes = Collections.singletonList(Route.async((String)"DELETE", (String)"/backfills/<bid>", rc -> this.haltBackfill((String)rc.pathArgs().get("bid"), rc, authenticator)));
        return Streams.concat((Stream[])new Stream[]{Api.prefixRoutes(entityRoutes, (Api.Version[])new Api.Version[]{Api.Version.V3}), Api.prefixRoutes(routes, (Api.Version[])new Api.Version[]{Api.Version.V3})});
    }

    @Override
    public void close() throws IOException {
        this.closer.close();
    }

    private BackfillsPayload getBackfills(RequestContext rc) {
        Stream backfills;
        Optional componentOpt = rc.request().parameter("component");
        Optional workflowOpt = rc.request().parameter("workflow");
        boolean includeStatuses = rc.request().parameter("status").orElse("false").equals("true");
        boolean showAll = rc.request().parameter("showAll").orElse("false").equals("true");
        try {
            if (componentOpt.isPresent() && workflowOpt.isPresent()) {
                WorkflowId workflowId = WorkflowId.create((String)((String)componentOpt.get()), (String)((String)workflowOpt.get()));
                backfills = this.storage.backfillsForWorkflowId(showAll, workflowId).stream();
            } else if (componentOpt.isPresent()) {
                String component = (String)componentOpt.get();
                backfills = this.storage.backfillsForComponent(showAll, component).stream();
            } else if (workflowOpt.isPresent()) {
                String workflow = (String)workflowOpt.get();
                backfills = this.storage.backfillsForWorkflow(showAll, workflow).stream();
            } else {
                backfills = this.storage.backfills(showAll).stream();
            }
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        List backfillPayloads = (List)((ForkJoinTask)this.forkJoinPool.submit(() -> ((Stream)backfills.parallel()).map(backfill -> BackfillPayload.create((Backfill)backfill, includeStatuses ? Optional.of(RunStateDataPayload.create(this.retrieveBackfillStatuses((Backfill)backfill))) : Optional.empty())).collect(Collectors.toList()))).join();
        return BackfillsPayload.create((List)backfillPayloads);
    }

    private Response<BackfillPayload> getBackfill(RequestContext rc, String id) {
        Optional backfillOpt;
        boolean includeStatuses = rc.request().parameter("status").orElse("true").equals("true");
        try {
            backfillOpt = this.storage.backfill(id);
        }
        catch (IOException e) {
            String message = String.format("Couldn't read backfill %s. ", id);
            log.warn(message, (Throwable)e);
            return Response.forStatus((StatusType)Status.INTERNAL_SERVER_ERROR.withReasonPhrase("Error in internal storage"));
        }
        if (backfillOpt.isEmpty()) {
            return Response.forStatus((StatusType)Status.NOT_FOUND);
        }
        Backfill backfill = (Backfill)backfillOpt.get();
        if (includeStatuses) {
            List<RunStateDataPayload.RunStateData> statuses = this.retrieveBackfillStatuses(backfill);
            return Response.forPayload((Object)BackfillPayload.create((Backfill)backfill, Optional.of(RunStateDataPayload.create(statuses))));
        }
        return Response.forPayload((Object)BackfillPayload.create((Backfill)backfill, Optional.empty()));
    }

    private String schedulerApiUrl(CharSequence ... parts) {
        return this.schedulerServiceBaseUrl + "/api/v0/" + String.join((CharSequence)"/", parts);
    }

    private CompletionStage<Response<ByteString>> haltBackfill(String id, RequestContext rc, RequestAuthenticator authenticator) {
        Middlewares.AuthContext authContext = authenticator.authenticate(rc.request());
        try {
            Optional backfillOptional = this.storage.backfill(id);
            if (backfillOptional.isPresent()) {
                Backfill backfill = (Backfill)backfillOptional.get();
                this.workflowActionAuthorizer.authorizeWorkflowAction(authContext, backfill.workflowId());
                this.storage.storeBackfill(backfill.builder().halted(true).lastModified((Instant)this.time.get()).build());
                return this.haltActiveBackfillInstances(backfill, rc.requestScopedClient());
            }
            return CompletableFuture.completedFuture(Response.forStatus((StatusType)Status.NOT_FOUND.withReasonPhrase("backfill not found")));
        }
        catch (IOException e) {
            return CompletableFuture.completedFuture(Response.forStatus((StatusType)Status.INTERNAL_SERVER_ERROR.withReasonPhrase("could not halt backfill: " + e.getMessage())));
        }
    }

    private CompletionStage<Response<ByteString>> haltActiveBackfillInstances(Backfill backfill, Client client) {
        return CompletableFutures.allAsList(this.retrieveBackfillStatuses(backfill).stream().filter(BackfillResource::isActiveState).map(RunStateDataPayload.RunStateData::workflowInstance).map(workflowInstance -> this.haltActiveBackfillInstance((WorkflowInstance)workflowInstance, client)).collect(Collectors.toList())).handle((result, throwable) -> {
            if (throwable != null || result.contains(Boolean.FALSE)) {
                return Response.forStatus((StatusType)Status.INTERNAL_SERVER_ERROR.withReasonPhrase("some active instances cannot be halted, however no new ones will be triggered"));
            }
            return Response.ok();
        });
    }

    private CompletionStage<Boolean> haltActiveBackfillInstance(WorkflowInstance workflowInstance, Client client) {
        try {
            ByteString payload = Json.serialize((Object)Event.halt((WorkflowInstance)workflowInstance));
            Request request = Request.forUri((String)this.schedulerApiUrl("events"), (String)"POST").withPayload(payload);
            return client.send(request).thenApply(response -> response.status().family().equals((Object)StatusType.Family.SUCCESSFUL));
        }
        catch (JsonProcessingException e) {
            return CompletableFuture.completedFuture(false);
        }
    }

    private static boolean isActiveState(RunStateDataPayload.RunStateData runStateData) {
        String state;
        switch (state = runStateData.state()) {
            case "UNKNOWN": 
            case "WAITING": {
                return false;
            }
        }
        return !RunState.State.valueOf((String)state).isTerminal();
    }

    private Optional<String> validate(RequestContext rc, BackfillInput input, Workflow workflow) {
        if (workflow.configuration().dockerImage().isEmpty()) {
            return Optional.of("Workflow is missing docker image");
        }
        List errors = this.workflowValidator.validateWorkflow(workflow);
        if (!errors.isEmpty()) {
            return Optional.of("Invalid workflow configuration: " + String.join((CharSequence)", ", errors));
        }
        Schedule schedule = workflow.configuration().schedule();
        if (!input.start().isBefore(input.end())) {
            return Optional.of("start must be before end");
        }
        if (!TimeUtil.isAligned((Instant)input.start(), (Schedule)schedule)) {
            return Optional.of("start parameter not aligned with schedule");
        }
        if (!TimeUtil.isAligned((Instant)input.end(), (Schedule)schedule)) {
            return Optional.of("end parameter not aligned with schedule");
        }
        boolean allowFuture = Boolean.parseBoolean(rc.request().parameter("allowFuture").orElse("false"));
        if (!allowFuture && (input.start().isAfter((Instant)this.time.get()) || TimeUtil.previousInstant((Instant)input.end(), (Schedule)schedule).isAfter((Instant)this.time.get()))) {
            return Optional.of("Cannot backfill future partitions");
        }
        return Optional.empty();
    }

    private Response<Backfill> postBackfill(Middlewares.AuthContext ac, RequestContext rc, BackfillInput input) {
        try {
            return (Response)this.storage.runInTransactionWithRetries(tx -> this.postBackfill0(tx, ac, rc, input));
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private Response<Backfill> postBackfill0(StorageTransaction tx, Middlewares.AuthContext ac, RequestContext rc, BackfillInput input) throws IOException {
        BackfillBuilder builder = Backfill.newBuilder();
        String id = RandomGenerator.DEFAULT.generateUniqueId("backfill");
        WorkflowId workflowId = WorkflowId.create((String)input.component(), (String)input.workflow());
        Workflow workflow = (Workflow)tx.workflow(workflowId).orElseThrow(() -> new ResponseException(Response.forStatus((StatusType)Status.NOT_FOUND.withReasonPhrase("workflow not found"))));
        this.workflowActionAuthorizer.authorizeWorkflowAction(ac, workflow);
        Set activeWorkflowInstances = this.storage.readActiveStates(input.component()).keySet();
        Optional<String> validationError = this.validate(rc, input, workflow);
        if (validationError.isPresent()) {
            return Response.forStatus((StatusType)Status.BAD_REQUEST.withReasonPhrase(validationError.get()));
        }
        Schedule schedule = workflow.configuration().schedule();
        List instants = TimeUtil.instantsInRange((Instant)input.start(), (Instant)input.end(), (Schedule)schedule);
        List alreadyActive = instants.stream().map(instant -> WorkflowInstance.create((WorkflowId)workflowId, (String)ParameterUtil.toParameter((Schedule)schedule, (Instant)instant))).filter(activeWorkflowInstances::contains).collect(Collectors.toList());
        if (!alreadyActive.isEmpty()) {
            String alreadyActiveMessage = alreadyActive.stream().map(WorkflowInstance::parameter).collect(Collectors.joining(", "));
            return Response.forStatus((StatusType)Status.CONFLICT.withReasonPhrase("these partitions are already active: " + alreadyActiveMessage));
        }
        Instant timestamp = (Instant)this.time.get();
        builder.id(id).allTriggered(false).workflowId(workflowId).concurrency(input.concurrency()).start(input.start()).end(input.end()).schedule(schedule).nextTrigger(input.reverse() ? (Instant)Iterables.getLast((Iterable)instants) : input.start()).description(input.description()).reverse(input.reverse()).triggerParameters(input.triggerParameters()).halted(false).created(timestamp).lastModified(timestamp);
        Backfill backfill = builder.build();
        tx.store(backfill);
        return Response.forPayload((Object)backfill);
    }

    private Response<Backfill> updateBackfill(Middlewares.AuthContext ac, String id, EditableBackfillInput backfillInput) {
        Backfill backfill;
        if (!backfillInput.id().equals(id)) {
            return Response.forStatus((StatusType)Status.BAD_REQUEST.withReasonPhrase("ID of payload does not match ID in uri."));
        }
        try {
            backfill = (Backfill)this.storage.runInTransactionWithRetries(tx -> {
                Backfill oldBackfill = (Backfill)tx.backfill(id).orElseThrow(() -> new ResourceNotFoundException(String.format("Backfill %s not found.", id)));
                this.workflowActionAuthorizer.authorizeWorkflowAction(ac, oldBackfill.workflowId());
                BackfillBuilder backfillBuilder = oldBackfill.builder();
                backfillInput.concurrency().ifPresent(arg_0 -> ((BackfillBuilder)backfillBuilder).concurrency(arg_0));
                backfillInput.description().ifPresent(arg_0 -> ((BackfillBuilder)backfillBuilder).description(arg_0));
                backfillBuilder.lastModified((Instant)this.time.get());
                return tx.store(backfillBuilder.build());
            });
        }
        catch (ResourceNotFoundException e) {
            return Response.forStatus((StatusType)Status.NOT_FOUND.withReasonPhrase(e.getMessage()));
        }
        catch (IOException e) {
            return Response.forStatus((StatusType)Status.INTERNAL_SERVER_ERROR.withReasonPhrase("Failed to store backfill."));
        }
        return Response.forStatus((StatusType)Status.OK).withPayload((Object)backfill);
    }

    private List<RunStateDataPayload.RunStateData> retrieveBackfillStatuses(Backfill backfill) {
        List waitingInstants;
        List processedInstants;
        Map activeWorkflowInstances;
        try {
            activeWorkflowInstances = this.storage.readActiveStatesByTriggerId(backfill.id());
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        if (backfill.reverse()) {
            Instant firstInstant = TimeUtil.nextInstant((Instant)backfill.nextTrigger(), (Schedule)backfill.schedule());
            processedInstants = TimeUtil.instantsInRange((Instant)firstInstant, (Instant)backfill.end(), (Schedule)backfill.schedule());
        } else {
            processedInstants = TimeUtil.instantsInRange((Instant)backfill.start(), (Instant)backfill.nextTrigger(), (Schedule)backfill.schedule());
        }
        List processedStates = (List)((ForkJoinTask)this.forkJoinPool.submit(() -> processedInstants.parallelStream().map(instant -> this.getRunStateData(backfill, activeWorkflowInstances, (Instant)instant)).collect(Collectors.toList()))).join();
        if (backfill.reverse()) {
            Instant lastInstant = TimeUtil.nextInstant((Instant)backfill.nextTrigger(), (Schedule)backfill.schedule());
            waitingInstants = TimeUtil.instantsInRange((Instant)backfill.start(), (Instant)lastInstant, (Schedule)backfill.schedule());
        } else {
            waitingInstants = TimeUtil.instantsInRange((Instant)backfill.nextTrigger(), (Instant)backfill.end(), (Schedule)backfill.schedule());
        }
        List waitingStates = waitingInstants.stream().map(instant -> {
            WorkflowInstance wfi = WorkflowInstance.create((WorkflowId)backfill.workflowId(), (String)ParameterUtil.toParameter((Schedule)backfill.schedule(), (Instant)instant));
            return RunStateDataPayload.RunStateData.create((WorkflowInstance)wfi, (String)WAITING, (StateData)StateData.zero());
        }).collect(Collectors.toList());
        return backfill.reverse() ? Stream.concat(waitingStates.stream(), processedStates.stream()).collect(Collectors.toList()) : Stream.concat(processedStates.stream(), waitingStates.stream()).collect(Collectors.toList());
    }

    private RunStateDataPayload.RunStateData getRunStateData(Backfill backfill, Map<WorkflowInstance, RunState> activeWorkflowInstances, Instant instant) {
        WorkflowInstance wfi = WorkflowInstance.create((WorkflowId)backfill.workflowId(), (String)ParameterUtil.toParameter((Schedule)backfill.schedule(), (Instant)instant));
        if (activeWorkflowInstances.containsKey(wfi)) {
            RunState state = activeWorkflowInstances.get(wfi);
            return RunStateDataPayload.RunStateData.newBuilder().workflowInstance(state.workflowInstance()).state(state.state().name()).stateData(state.data()).latestTimestamp(Long.valueOf(state.timestamp())).build();
        }
        return ReplayEvents.getBackfillRunStateData((WorkflowInstance)wfi, (Storage)this.storage, (String)backfill.id()).orElse(RunStateDataPayload.RunStateData.create((WorkflowInstance)wfi, (String)UNKNOWN, (StateData)StateData.zero()));
    }
}

