/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.spinnaker.orca.pipeline.persistence.jedis;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import com.netflix.spectator.api.Id;
import com.netflix.spectator.api.Registry;
import com.netflix.spinnaker.kork.jedis.RedisClientDelegate;
import com.netflix.spinnaker.kork.jedis.RedisClientSelector;
import com.netflix.spinnaker.orca.api.pipeline.SyntheticStageOwner;
import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus;
import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType;
import com.netflix.spinnaker.orca.api.pipeline.models.PipelineExecution;
import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution;
import com.netflix.spinnaker.orca.api.pipeline.models.TaskExecution;
import com.netflix.spinnaker.orca.api.pipeline.models.Trigger;
import com.netflix.spinnaker.orca.jackson.OrcaObjectMapper;
import com.netflix.spinnaker.orca.pipeline.model.PipelineExecutionImpl;
import com.netflix.spinnaker.orca.pipeline.model.StageExecutionImpl;
import com.netflix.spinnaker.orca.pipeline.model.SystemNotification;
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionNotFoundException;
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository;
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepositoryUtil;
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionSerializationException;
import com.netflix.spinnaker.orca.pipeline.persistence.StageSerializationException;
import com.netflix.spinnaker.orca.pipeline.persistence.UnpausablePipelineException;
import com.netflix.spinnaker.orca.pipeline.persistence.UnresumablePipelineException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import net.logstash.logback.argument.StructuredArguments;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.ListPosition;
import redis.clients.jedis.Response;
import redis.clients.jedis.ScanParams;
import redis.clients.jedis.ScanResult;
import rx.Observable;
import rx.Scheduler;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.schedulers.Schedulers;

public class RedisExecutionRepository
implements ExecutionRepository {
    private static final TypeReference<List<TaskExecution>> LIST_OF_TASKS = new TypeReference<List<TaskExecution>>(){};
    private static final TypeReference<Map<String, Object>> MAP_STRING_TO_OBJECT = new TypeReference<Map<String, Object>>(){};
    private static final TypeReference<List<SystemNotification>> LIST_OF_SYSTEM_NOTIFICATIONS = new TypeReference<List<SystemNotification>>(){};
    private final RedisClientDelegate redisClientDelegate;
    private final Optional<RedisClientDelegate> previousRedisClientDelegate;
    private final ObjectMapper mapper = OrcaObjectMapper.getInstance();
    private final int chunkSize;
    private final Scheduler queryAllScheduler;
    private final Scheduler queryByAppScheduler;
    private final Registry registry;
    private static String bufferedPrefix;
    private final Logger log = LoggerFactory.getLogger(this.getClass());

    public RedisExecutionRepository(Registry registry, RedisClientSelector redisClientSelector, Scheduler queryAllScheduler, Scheduler queryByAppScheduler, Integer threadPoolChunkSize, String bufferedPrefix) {
        this.registry = registry;
        this.redisClientDelegate = redisClientSelector.primary("executionRepository");
        this.previousRedisClientDelegate = redisClientSelector.previous("executionRepository");
        this.queryAllScheduler = queryAllScheduler;
        this.queryByAppScheduler = queryByAppScheduler;
        this.chunkSize = threadPoolChunkSize;
        RedisExecutionRepository.bufferedPrefix = bufferedPrefix;
    }

    public RedisExecutionRepository(Registry registry, RedisClientSelector redisClientSelector, Integer threadPoolSize, Integer threadPoolChunkSize) {
        this.registry = registry;
        this.redisClientDelegate = redisClientSelector.primary("executionRepository");
        this.previousRedisClientDelegate = redisClientSelector.previous("executionRepository");
        this.queryAllScheduler = Schedulers.from((Executor)Executors.newFixedThreadPool(10));
        this.queryByAppScheduler = Schedulers.from((Executor)Executors.newFixedThreadPool(threadPoolSize));
        this.chunkSize = threadPoolChunkSize;
    }

    public void store(@Nonnull PipelineExecution execution) {
        RedisClientDelegate delegate = this.getRedisDelegate(execution);
        this.storeExecutionInternal(delegate, execution);
        if (execution.getType() == ExecutionType.PIPELINE) {
            delegate.withCommandsClient(c -> c.zadd(RedisExecutionRepository.executionsByPipelineKey(execution.getPipelineConfigId()), execution.getBuildTime() != null ? (double)execution.getBuildTime().longValue() : (double)System.currentTimeMillis(), execution.getId()));
        }
    }

    public void storeStage(@Nonnull StageExecution stage) {
        this.storeStageInternal(this.getRedisDelegate(stage), stage, false);
    }

    public void updateStageContext(@Nonnull StageExecution stage) {
        RedisClientDelegate delegate = this.getRedisDelegate(stage);
        String key = this.executionKey(stage);
        String contextKey = String.format("stage.%s.context", stage.getId());
        delegate.withCommandsClient(c -> {
            try {
                c.hset(key, contextKey, this.mapper.writeValueAsString((Object)stage.getContext()));
            }
            catch (JsonProcessingException e) {
                throw new StageSerializationException(String.format("Failed serializing stage, executionId: %s, stageId: %s", stage.getExecution().getId(), stage.getId()), (Throwable)e);
            }
        });
    }

    public void removeStage(@Nonnull PipelineExecution execution, @Nonnull String stageId) {
        RedisClientDelegate delegate = this.getRedisDelegate(execution);
        String key = this.executionKey(execution);
        String indexKey = String.format("%s:stageIndex", key);
        List stageKeys = (List)delegate.withCommandsClient(c -> c.hkeys(key).stream().filter(k -> k.startsWith("stage." + stageId)).collect(Collectors.toList()));
        delegate.withTransaction(tx -> {
            tx.lrem(indexKey, 0L, stageId);
            tx.hdel(key, stageKeys.toArray(new String[0]));
            tx.exec();
        });
    }

    public void addStage(@Nonnull StageExecution stage) {
        if (stage.getSyntheticStageOwner() == null || stage.getParentStageId() == null) {
            throw new IllegalArgumentException("Only synthetic stages can be inserted ad-hoc");
        }
        this.storeStageInternal(this.getRedisDelegate(stage), stage, true);
    }

    public void cancel(ExecutionType type, @Nonnull String id) {
        this.cancel(type, id, null, null);
    }

    public void cancel(ExecutionType type, @Nonnull String id, String user, String reason) {
        ImmutablePair<String, RedisClientDelegate> pair = this.fetchKey(id);
        RedisClientDelegate delegate = pair.getRight();
        delegate.withCommandsClient(c -> {
            ExecutionStatus currentStatus;
            HashMap<String, String> data = new HashMap<String, String>();
            data.put("canceled", "true");
            if (StringUtils.isNotEmpty((CharSequence)user)) {
                data.put("canceledBy", user);
            }
            if (StringUtils.isNotEmpty((CharSequence)reason)) {
                data.put("cancellationReason", reason);
            }
            if ((currentStatus = ExecutionStatus.valueOf((String)c.hget((String)pair.getLeft(), "status"))) == ExecutionStatus.NOT_STARTED) {
                data.put("status", ExecutionStatus.CANCELED.name());
            }
            c.hmset((String)pair.getLeft(), data);
            c.srem(RedisExecutionRepository.allBufferedExecutionsKey(type), new String[]{id});
        });
    }

    public void pause(ExecutionType type, @Nonnull String id, String user) {
        ImmutablePair<String, RedisClientDelegate> pair = this.fetchKey(id);
        RedisClientDelegate delegate = pair.getRight();
        delegate.withCommandsClient(c -> {
            ExecutionStatus currentStatus = ExecutionStatus.valueOf((String)c.hget((String)pair.getLeft(), "status"));
            if (currentStatus != ExecutionStatus.RUNNING) {
                throw new UnpausablePipelineException(String.format("Unable to pause pipeline that is not RUNNING (executionId: %s, currentStatus: %s)", id, currentStatus));
            }
            PipelineExecution.PausedDetails pausedDetails = new PipelineExecution.PausedDetails();
            pausedDetails.setPausedBy(user);
            pausedDetails.setPauseTime(Long.valueOf(System.currentTimeMillis()));
            HashMap<String, String> data = new HashMap<String, String>();
            try {
                data.put("paused", this.mapper.writeValueAsString((Object)pausedDetails));
            }
            catch (JsonProcessingException e) {
                throw new ExecutionSerializationException("Failed converting pausedDetails to json", (Throwable)e);
            }
            data.put("status", ExecutionStatus.PAUSED.toString());
            c.hmset((String)pair.getLeft(), data);
            c.srem(RedisExecutionRepository.allBufferedExecutionsKey(type), new String[]{id});
        });
    }

    public void resume(ExecutionType type, @Nonnull String id, String user) {
        this.resume(type, id, user, false);
    }

    public void resume(ExecutionType type, @Nonnull String id, String user, boolean ignoreCurrentStatus) {
        ImmutablePair<String, RedisClientDelegate> pair = this.fetchKey(id);
        RedisClientDelegate delegate = pair.getRight();
        delegate.withCommandsClient(c -> {
            ExecutionStatus currentStatus = ExecutionStatus.valueOf((String)c.hget((String)pair.getLeft(), "status"));
            if (!ignoreCurrentStatus && currentStatus != ExecutionStatus.PAUSED) {
                throw new UnresumablePipelineException(String.format("Unable to resume pipeline that is not PAUSED (executionId: %s, currentStatus: %s)", id, currentStatus));
            }
            try {
                PipelineExecution.PausedDetails pausedDetails = (PipelineExecution.PausedDetails)this.mapper.readValue(c.hget((String)pair.getLeft(), "paused"), PipelineExecution.PausedDetails.class);
                pausedDetails.setResumedBy(user);
                pausedDetails.setResumeTime(Long.valueOf(System.currentTimeMillis()));
                HashMap<String, String> data = new HashMap<String, String>();
                data.put("paused", this.mapper.writeValueAsString((Object)pausedDetails));
                data.put("status", ExecutionStatus.RUNNING.toString());
                c.hmset((String)pair.getLeft(), data);
                c.srem(RedisExecutionRepository.allBufferedExecutionsKey(type), new String[]{id});
            }
            catch (IOException e) {
                throw new ExecutionSerializationException("Failed converting pausedDetails to json", (Throwable)e);
            }
        });
    }

    public boolean isCanceled(ExecutionType type, @Nonnull String id) {
        ImmutablePair<String, RedisClientDelegate> pair = this.fetchKey(id);
        RedisClientDelegate delegate = pair.getRight();
        return (Boolean)delegate.withCommandsClient(c -> Boolean.valueOf(c.hget((String)pair.getLeft(), "canceled")));
    }

    public void updateStatus(ExecutionType type, @Nonnull String id, @Nonnull ExecutionStatus status) {
        ImmutablePair<String, RedisClientDelegate> pair = this.fetchKey(id);
        RedisClientDelegate delegate = pair.getRight();
        String key = pair.getLeft();
        delegate.withCommandsClient(c -> {
            HashMap<String, String> data = new HashMap<String, String>();
            data.put("status", status.name());
            if (status == ExecutionStatus.RUNNING) {
                data.put("canceled", "false");
                data.put("startTime", String.valueOf(System.currentTimeMillis()));
            } else if (status.isComplete() && c.hget(key, "startTime") != null) {
                data.put("endTime", String.valueOf(System.currentTimeMillis()));
            }
            if (status == ExecutionStatus.BUFFERED) {
                c.sadd(RedisExecutionRepository.allBufferedExecutionsKey(type), new String[]{id});
            } else {
                c.srem(RedisExecutionRepository.allBufferedExecutionsKey(type), new String[]{id});
            }
            c.hmset(key, data);
        });
    }

    @Nonnull
    public PipelineExecution retrieve(@Nonnull ExecutionType type, @Nonnull String id) {
        RedisClientDelegate delegate = this.getRedisDelegate(type, id);
        return this.retrieveInternal(delegate, type, id);
    }

    @Nonnull
    public Observable<PipelineExecution> retrieve(@Nonnull ExecutionType type) {
        List observables = this.allRedisDelegates().stream().map(d -> this.all(type, (RedisClientDelegate)d)).collect(Collectors.toList());
        return Observable.merge(observables);
    }

    @Nonnull
    public Observable<PipelineExecution> retrieve(@Nonnull ExecutionType type, @Nonnull ExecutionRepository.ExecutionCriteria criteria) {
        List observables = this.allRedisDelegates().stream().map(d -> {
            Observable observable = this.all(type, (RedisClientDelegate)d);
            if (!criteria.getStatuses().isEmpty()) {
                observable = observable.filter(execution -> criteria.getStatuses().contains(execution.getStatus()));
            }
            if (criteria.getPageSize() > 0) {
                observable = observable.limit(criteria.getPageSize());
            }
            return observable;
        }).collect(Collectors.toList());
        return Observable.merge(observables);
    }

    public void delete(@Nonnull ExecutionType type, @Nonnull String id) {
        RedisClientDelegate delegate = this.getRedisDelegate(type, id);
        this.deleteInternal(delegate, type, id);
    }

    public void delete(@Nonnull ExecutionType type, @Nonnull List<String> idsToDelete) {
        idsToDelete.forEach(id -> this.delete(type, (String)id));
    }

    @Nonnull
    public Observable<PipelineExecution> retrievePipelinesForApplication(@Nonnull String application) {
        List observables = this.allRedisDelegates().stream().map(d -> this.allForApplication(ExecutionType.PIPELINE, application, (RedisClientDelegate)d)).collect(Collectors.toList());
        return Observable.merge(observables);
    }

    @Nonnull
    public Observable<PipelineExecution> retrievePipelinesForPipelineConfigId(@Nonnull String pipelineConfigId, @Nonnull ExecutionRepository.ExecutionCriteria criteria) {
        HashMap filteredPipelineIdsByDelegate = new HashMap();
        if (!criteria.getStatuses().isEmpty()) {
            this.allRedisDelegates().forEach(d -> d.withCommandsClient(c -> {
                ArrayList pipelineKeys = new ArrayList(c.zrevrange(RedisExecutionRepository.executionsByPipelineKey(pipelineConfigId), 0L, -1L));
                if (pipelineKeys.isEmpty()) {
                    return;
                }
                HashSet allowedExecutionStatuses = new HashSet(criteria.getStatuses());
                List<ExecutionStatus> statuses = this.fetchMultiExecutionStatus((RedisClientDelegate)d, pipelineKeys.stream().map(key -> this.pipelineKey((String)key)).collect(Collectors.toList()));
                AtomicInteger index = new AtomicInteger();
                statuses.forEach(s -> {
                    if (allowedExecutionStatuses.contains(s)) {
                        filteredPipelineIdsByDelegate.computeIfAbsent(d, p -> new ArrayList()).add((String)pipelineKeys.get(index.get()));
                    }
                    index.incrementAndGet();
                });
            }));
        }
        Func2 fnBuilder = (redisClientDelegate, pipelineIds) -> key -> !criteria.getStatuses().isEmpty() ? pipelineIds : (Iterable)redisClientDelegate.withCommandsClient(p -> p.zrevrange(key, 0L, (long)(criteria.getPageSize() - 1)));
        List currentPipelineIds = filteredPipelineIdsByDelegate.getOrDefault(this.redisClientDelegate, new ArrayList());
        currentPipelineIds = currentPipelineIds.subList(0, Math.min(criteria.getPageSize(), currentPipelineIds.size()));
        Observable<PipelineExecution> currentObservable = this.retrieveObservable(ExecutionType.PIPELINE, RedisExecutionRepository.executionsByPipelineKey(pipelineConfigId), (Func1<String, Iterable<String>>)((Func1)fnBuilder.call((Object)this.redisClientDelegate, currentPipelineIds)), this.queryByAppScheduler, this.redisClientDelegate);
        if (this.previousRedisClientDelegate.isPresent()) {
            List previousPipelineIds = filteredPipelineIdsByDelegate.getOrDefault(this.previousRedisClientDelegate.get(), new ArrayList());
            previousPipelineIds.removeAll(currentPipelineIds);
            previousPipelineIds = previousPipelineIds.subList(0, Math.min(criteria.getPageSize(), previousPipelineIds.size()));
            Observable<PipelineExecution> previousObservable = this.retrieveObservable(ExecutionType.PIPELINE, RedisExecutionRepository.executionsByPipelineKey(pipelineConfigId), (Func1<String, Iterable<String>>)((Func1)fnBuilder.call((Object)this.previousRedisClientDelegate.get(), previousPipelineIds)), this.queryByAppScheduler, this.previousRedisClientDelegate.get());
            return Observable.merge(currentObservable, previousObservable);
        }
        return currentObservable;
    }

    @Nonnull
    public List<PipelineExecution> retrievePipelinesForPipelineConfigIdsBetweenBuildTimeBoundary(@Nonnull List<String> pipelineConfigIds, long buildTimeStartBoundary, long buildTimeEndBoundary, ExecutionRepository.ExecutionCriteria executionCriteria) {
        ArrayList executions = new ArrayList();
        this.allRedisDelegates().forEach(d -> {
            List<PipelineExecution> pipelines = this.getPipelinesForPipelineConfigIdsBetweenBuildTimeBoundaryFromRedis((RedisClientDelegate)d, pipelineConfigIds, buildTimeStartBoundary, buildTimeEndBoundary);
            executions.addAll(pipelines);
        });
        return executions.stream().filter(it -> {
            if (executionCriteria.getStatuses().isEmpty()) {
                return true;
            }
            return executionCriteria.getStatuses().contains(it.getStatus());
        }).collect(Collectors.toList());
    }

    public List<PipelineExecution> retrieveAllPipelinesForPipelineConfigIdsBetweenBuildTimeBoundary(@Nonnull List<String> pipelineConfigIds, long buildTimeStartBoundary, long buildTimeEndBoundary, ExecutionRepository.ExecutionCriteria executionCriteria) {
        List<PipelineExecution> executions = this.retrievePipelinesForPipelineConfigIdsBetweenBuildTimeBoundary(pipelineConfigIds, buildTimeStartBoundary, buildTimeEndBoundary, executionCriteria);
        executions.sort((Comparator<PipelineExecution>)executionCriteria.getSortType());
        return executions;
    }

    @Nonnull
    public Observable<PipelineExecution> retrieveOrchestrationsForApplication(@Nonnull String application, @Nonnull ExecutionRepository.ExecutionCriteria criteria) {
        String allOrchestrationsKey = RedisExecutionRepository.appKey(ExecutionType.ORCHESTRATION, application);
        HashMap filteredOrchestrationIdsByDelegate = new HashMap();
        if (!criteria.getStatuses().isEmpty()) {
            this.allRedisDelegates().forEach(d -> d.withCommandsClient(c -> {
                ArrayList orchestrationKeys = new ArrayList(c.smembers(allOrchestrationsKey));
                if (orchestrationKeys.isEmpty()) {
                    return;
                }
                HashSet allowedExecutionStatuses = new HashSet(criteria.getStatuses());
                List<ExecutionStatus> statuses = this.fetchMultiExecutionStatus((RedisClientDelegate)d, orchestrationKeys.stream().map(key -> this.orchestrationKey((String)key)).collect(Collectors.toList()));
                AtomicInteger index = new AtomicInteger();
                statuses.forEach(e -> {
                    if (allowedExecutionStatuses.contains(e)) {
                        filteredOrchestrationIdsByDelegate.computeIfAbsent(d, o -> new ArrayList()).add((String)orchestrationKeys.get(index.get()));
                    }
                    index.incrementAndGet();
                });
            }));
        }
        Func2 fnBuilder = (redisClientDelegate, orchestrationIds) -> key -> (Iterable)redisClientDelegate.withCommandsClient(c -> {
            if (!criteria.getStatuses().isEmpty()) {
                return orchestrationIds;
            }
            ArrayList unfiltered = new ArrayList(c.smembers(key));
            return unfiltered.subList(0, Math.min(criteria.getPageSize(), unfiltered.size()));
        });
        List currentOrchestrationIds = filteredOrchestrationIdsByDelegate.getOrDefault(this.redisClientDelegate, new ArrayList());
        currentOrchestrationIds = currentOrchestrationIds.subList(0, Math.min(criteria.getPageSize(), currentOrchestrationIds.size()));
        Observable<PipelineExecution> currentObservable = this.retrieveObservable(ExecutionType.ORCHESTRATION, allOrchestrationsKey, (Func1<String, Iterable<String>>)((Func1)fnBuilder.call((Object)this.redisClientDelegate, currentOrchestrationIds)), this.queryByAppScheduler, this.redisClientDelegate);
        if (this.previousRedisClientDelegate.isPresent()) {
            List previousOrchestrationIds = filteredOrchestrationIdsByDelegate.getOrDefault(this.previousRedisClientDelegate.get(), new ArrayList());
            previousOrchestrationIds.removeAll(currentOrchestrationIds);
            previousOrchestrationIds = previousOrchestrationIds.subList(0, Math.min(criteria.getPageSize(), previousOrchestrationIds.size()));
            Observable<PipelineExecution> previousObservable = this.retrieveObservable(ExecutionType.ORCHESTRATION, allOrchestrationsKey, (Func1<String, Iterable<String>>)((Func1)fnBuilder.call((Object)this.previousRedisClientDelegate.get(), previousOrchestrationIds)), this.queryByAppScheduler, this.previousRedisClientDelegate.get());
            return Observable.merge(currentObservable, previousObservable);
        }
        return currentObservable;
    }

    @Nonnull
    public List<PipelineExecution> retrieveOrchestrationsForApplication(@Nonnull String application, @Nonnull ExecutionRepository.ExecutionCriteria criteria, ExecutionRepository.ExecutionComparator sorter) {
        List executions = (List)this.retrieveOrchestrationsForApplication(application, criteria).filter(orchestration -> {
            if (criteria.getStartTimeCutoff() != null) {
                long startTime = Optional.ofNullable(orchestration.getStartTime()).orElse(0L);
                return startTime == 0L || startTime > criteria.getStartTimeCutoff().toEpochMilli();
            }
            return true;
        }).subscribeOn(Schedulers.io()).toList().toBlocking().single();
        if (sorter != null) {
            executions.sort(sorter);
        }
        return executions.subList(0, Math.min(executions.size(), criteria.getPageSize()));
    }

    @Nonnull
    public PipelineExecution retrieveByCorrelationId(@Nonnull ExecutionType executionType, @Nonnull String correlationId) throws ExecutionNotFoundException {
        if (executionType == ExecutionType.PIPELINE) {
            return this.retrievePipelineForCorrelationId(correlationId);
        }
        return this.retrieveOrchestrationForCorrelationId(correlationId);
    }

    @Nonnull
    public PipelineExecution retrieveOrchestrationForCorrelationId(@Nonnull String correlationId) throws ExecutionNotFoundException {
        String key = String.format("correlation:%s", correlationId);
        return (PipelineExecution)this.getRedisDelegate(key).withCommandsClient(correlationRedis -> {
            String orchestrationId = correlationRedis.get(key);
            if (orchestrationId != null) {
                PipelineExecution orchestration = this.retrieveInternal(this.getRedisDelegate(this.orchestrationKey(orchestrationId)), ExecutionType.ORCHESTRATION, orchestrationId);
                if (!orchestration.getStatus().isComplete()) {
                    return orchestration;
                }
                correlationRedis.del(key);
            }
            throw new ExecutionNotFoundException(String.format("No Orchestration found for correlation ID %s", correlationId));
        });
    }

    @Nonnull
    public PipelineExecution retrievePipelineForCorrelationId(@Nonnull String correlationId) throws ExecutionNotFoundException {
        String key = String.format("pipelineCorrelation:%s", correlationId);
        return (PipelineExecution)this.getRedisDelegate(key).withCommandsClient(correlationRedis -> {
            String pipelineId = correlationRedis.get(key);
            if (pipelineId != null) {
                PipelineExecution pipeline = this.retrieveInternal(this.getRedisDelegate(this.pipelineKey(pipelineId)), ExecutionType.PIPELINE, pipelineId);
                if (!pipeline.getStatus().isComplete()) {
                    return pipeline;
                }
                correlationRedis.del(key);
            }
            throw new ExecutionNotFoundException(String.format("No Pipeline found for correlation ID %s", correlationId));
        });
    }

    @Nonnull
    public List<PipelineExecution> retrieveBufferedExecutions() {
        List observables = this.allRedisDelegates().stream().map(d -> Arrays.asList(this.retrieveObservable(ExecutionType.PIPELINE, RedisExecutionRepository.allBufferedExecutionsKey(ExecutionType.PIPELINE), this.queryAllScheduler, (RedisClientDelegate)d), this.retrieveObservable(ExecutionType.ORCHESTRATION, RedisExecutionRepository.allBufferedExecutionsKey(ExecutionType.ORCHESTRATION), this.queryAllScheduler, (RedisClientDelegate)d))).flatMap(Collection::stream).collect(Collectors.toList());
        return (List)Observable.merge(observables).filter(e -> e.getStatus() == ExecutionStatus.BUFFERED).toList().toBlocking().single();
    }

    @Nonnull
    public List<String> retrieveAllApplicationNames(@Nullable ExecutionType type) {
        return this.retrieveAllApplicationNames(type, 0);
    }

    @Nonnull
    public List<String> retrieveAllApplicationNames(@Nullable ExecutionType type, int minExecutions) {
        return (List)this.redisClientDelegate.withMultiClient(mc -> {
            ScanResult chunk;
            ScanParams scanParams = new ScanParams().match(RedisExecutionRepository.executionKeyPattern(type)).count(Integer.valueOf(2000));
            String cursor = "0";
            HashMap apps = new HashMap();
            do {
                String finalCursor = cursor;
                chunk = mc.scan(finalCursor, scanParams);
                if (this.redisClientDelegate.supportsMultiKeyPipelines()) {
                    ArrayList pipelineResults = new ArrayList();
                    this.redisClientDelegate.withMultiKeyPipeline(p -> {
                        chunk.getResult().forEach(id -> {
                            String app = id.split(":")[2];
                            pipelineResults.add(new ImmutablePair<String, Response>(app, p.scard(id)));
                        });
                        p.sync();
                    });
                    pipelineResults.forEach(p -> apps.compute((String)p.left, (app, numExecutions) -> Optional.ofNullable(numExecutions).orElse(0L) + (Long)((Response)p.right).get()));
                    continue;
                }
                this.redisClientDelegate.withCommandsClient(cc -> chunk.getResult().forEach(id -> {
                    String[] parts = id.split(":");
                    String app = parts[2];
                    long cardinality = cc.scard(id);
                    apps.compute(app, (appKey, numExecutions) -> Optional.ofNullable(numExecutions).orElse(0L) + cardinality);
                }));
            } while (!(cursor = chunk.getCursor()).equals("0"));
            return apps.entrySet().stream().filter(e -> ((Long)e.getValue()).intValue() >= minExecutions).map(Map.Entry::getKey).collect(Collectors.toList());
        });
    }

    public boolean hasExecution(@Nonnull ExecutionType type, @Nonnull String id) {
        return (Boolean)this.redisClientDelegate.withCommandsClient(c -> c.exists(this.executionKey(type, id)));
    }

    public List<String> retrieveAllExecutionIds(@Nonnull ExecutionType type) {
        return new ArrayList<String>((Collection)this.redisClientDelegate.withCommandsClient(c -> c.smembers(RedisExecutionRepository.alljobsKey(type))));
    }

    private Map<String, String> buildExecutionMapFromRedisResponse(List<String> entries) {
        if (entries.size() % 2 != 0) {
            throw new RuntimeException("Failed to convert Redis response to map because the number of entries is not even");
        }
        HashMap<String, String> map = new HashMap<String, String>();
        String nextKey = null;
        for (int i = 0; i < entries.size(); ++i) {
            if (i % 2 == 0) {
                nextKey = entries.get(i);
                continue;
            }
            map.put(nextKey, entries.get(i));
        }
        return map;
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    protected PipelineExecution buildExecution(@Nonnull PipelineExecution execution, @Nonnull Map<String, String> map, List<String> stageIds) {
        Id serializationErrorId = this.registry.createId("executions.deserialization.error").withTag("executionType", execution.getType().toString()).withTag("application", execution.getApplication());
        try {
            execution.setCanceled(Boolean.parseBoolean(map.get("canceled")));
            execution.setCanceledBy(map.get("canceledBy"));
            execution.setCancellationReason(map.get("cancellationReason"));
            execution.setLimitConcurrent(Boolean.parseBoolean(map.get("limitConcurrent")));
            execution.setMaxConcurrentExecutions(map.containsKey("maxConcurrentExecutions") ? Integer.parseInt(map.get("maxConcurrentExecutions")) : 0);
            execution.setBuildTime(NumberUtils.createLong((String)map.get("buildTime")));
            execution.setStartTime(NumberUtils.createLong((String)map.get("startTime")));
            if (map.get("startTimeExpiry") != null) {
                execution.setStartTimeExpiry(Long.valueOf(map.get("startTimeExpiry")));
            }
            execution.setEndTime(NumberUtils.createLong((String)map.get("endTime")));
            if (map.get("status") != null) {
                execution.setStatus(ExecutionStatus.valueOf((String)map.get("status")));
            }
            execution.setAuthentication((PipelineExecution.AuthenticationDetails)this.mapper.readValue(map.get("authentication"), PipelineExecution.AuthenticationDetails.class));
            if (map.get("paused") != null) {
                execution.setPaused((PipelineExecution.PausedDetails)this.mapper.readValue(map.get("paused"), PipelineExecution.PausedDetails.class));
            }
            execution.setKeepWaitingPipelines(Boolean.parseBoolean(map.get("keepWaitingPipelines")));
            if (map.get("spelEvaluator") != null) {
                execution.setSpelEvaluator(map.get("spelEvaluator"));
            }
            execution.setOrigin(map.get("origin"));
            if (map.get("source") != null) {
                execution.setSource((PipelineExecution.PipelineSource)this.mapper.readValue(map.get("source"), PipelineExecution.PipelineSource.class));
            }
            execution.setTrigger((Trigger)(map.get("trigger") != null ? (Trigger)this.mapper.readValue(map.get("trigger"), Trigger.class) : PipelineExecutionImpl.NO_TRIGGER));
            if (map.get("systemNotifications") != null) {
                ((PipelineExecutionImpl)execution).getSystemNotifications().addAll((Collection)this.mapper.readValue(map.get("systemNotifications"), LIST_OF_SYSTEM_NOTIFICATIONS));
            }
        }
        catch (Exception e) {
            this.registry.counter(serializationErrorId).increment();
            throw new ExecutionSerializationException(String.format("Failed serializing execution json, id: %s", execution.getId()), (Throwable)e);
        }
        ArrayList stages = new ArrayList();
        stageIds.forEach(stageId -> {
            String prefix = String.format("stage.%s.", stageId);
            StageExecutionImpl stage = new StageExecutionImpl();
            try {
                stage.setId(stageId);
                stage.setRefId((String)map.get(prefix + "refId"));
                stage.setType((String)map.get(prefix + "type"));
                stage.setName((String)map.get(prefix + "name"));
                stage.setStartTime(NumberUtils.createLong((String)((String)map.get(prefix + "startTime"))));
                stage.setEndTime(NumberUtils.createLong((String)((String)map.get(prefix + "endTime"))));
                stage.setStatus(ExecutionStatus.valueOf((String)((String)map.get(prefix + "status"))));
                if (map.get(prefix + "startTimeExpiry") != null) {
                    stage.setStartTimeExpiry(Long.valueOf((String)map.get(prefix + "startTimeExpiry")));
                }
                if (map.get(prefix + "syntheticStageOwner") != null) {
                    stage.setSyntheticStageOwner(SyntheticStageOwner.valueOf((String)((String)map.get(prefix + "syntheticStageOwner"))));
                }
                stage.setParentStageId((String)map.get(prefix + "parentStageId"));
                String requisiteStageRefIds = (String)map.get(prefix + "requisiteStageRefIds");
                if (StringUtils.isNotEmpty((CharSequence)requisiteStageRefIds)) {
                    stage.setRequisiteStageRefIds(Arrays.asList(requisiteStageRefIds.split(",")));
                } else {
                    stage.setRequisiteStageRefIds(Collections.emptySet());
                }
                stage.setScheduledTime(NumberUtils.createLong((String)((String)map.get(prefix + "scheduledTime"))));
                if (map.get(prefix + "context") != null) {
                    stage.setContext((Map)this.mapper.readValue((String)map.get(prefix + "context"), MAP_STRING_TO_OBJECT));
                } else {
                    stage.setContext(Collections.emptyMap());
                }
                if (map.get(prefix + "outputs") != null) {
                    stage.setOutputs((Map)this.mapper.readValue((String)map.get(prefix + "outputs"), MAP_STRING_TO_OBJECT));
                } else {
                    stage.setOutputs(Collections.emptyMap());
                }
                if (map.get(prefix + "tasks") != null) {
                    stage.setTasks((List)this.mapper.readValue((String)map.get(prefix + "tasks"), LIST_OF_TASKS));
                } else {
                    stage.setTasks(Collections.emptyList());
                }
                if (map.get(prefix + "lastModified") != null) {
                    stage.setLastModified((StageExecution.LastModifiedDetails)this.mapper.readValue((String)map.get(prefix + "lastModified"), StageExecution.LastModifiedDetails.class));
                }
                stage.setExecution(execution);
                stages.add(stage);
            }
            catch (IOException e) {
                this.registry.counter(serializationErrorId).increment();
                throw new StageSerializationException(String.format("Failed serializing stage json, executionId: %s, stageId: %s", execution.getId(), stageId), (Throwable)e);
            }
        });
        ExecutionRepositoryUtil.sortStagesByReference((PipelineExecution)execution, stages);
        if (execution.getType() == ExecutionType.PIPELINE) {
            execution.setName(map.get("name"));
            execution.setPipelineConfigId(map.get("pipelineConfigId"));
            try {
                if (map.get("notifications") != null) {
                    execution.getNotifications().addAll((Collection)this.mapper.readValue(map.get("notifications"), List.class));
                }
                if (map.get("initialConfig") == null) return execution;
                ((PipelineExecutionImpl)execution).getInitialConfig().putAll((Map)this.mapper.readValue(map.get("initialConfig"), Map.class));
                return execution;
            }
            catch (IOException e) {
                this.registry.counter(serializationErrorId).increment();
                throw new ExecutionSerializationException("Failed serializing execution json", (Throwable)e);
            }
        } else {
            if (execution.getType() != ExecutionType.ORCHESTRATION) return execution;
            execution.setDescription(map.get("description"));
        }
        return execution;
    }

    protected Map<String, String> serializeExecution(@Nonnull PipelineExecution execution) {
        HashMap<String, String> map = new HashMap<String, String>();
        try {
            map.put("application", execution.getApplication());
            map.put("canceled", String.valueOf(execution.isCanceled()));
            map.put("limitConcurrent", String.valueOf(execution.isLimitConcurrent()));
            map.put("maxConcurrentExecutions", String.valueOf(execution.getMaxConcurrentExecutions()));
            map.put("buildTime", String.valueOf(execution.getBuildTime() != null ? execution.getBuildTime() : 0L));
            map.put("startTime", execution.getStartTime() != null ? execution.getStartTime().toString() : null);
            map.put("endTime", execution.getEndTime() != null ? execution.getEndTime().toString() : null);
            map.put("startTimeExpiry", execution.getStartTimeExpiry() != null ? String.valueOf(execution.getStartTimeExpiry()) : null);
            map.put("status", execution.getStatus().name());
            map.put("authentication", this.mapper.writeValueAsString((Object)execution.getAuthentication()));
            map.put("paused", this.mapper.writeValueAsString((Object)execution.getPaused()));
            map.put("keepWaitingPipelines", String.valueOf(execution.isKeepWaitingPipelines()));
            map.put("spelEvaluator", execution.getSpelEvaluator());
            map.put("origin", execution.getOrigin());
            map.put("source", this.mapper.writeValueAsString((Object)execution.getSource()));
            map.put("trigger", this.mapper.writeValueAsString((Object)execution.getTrigger()));
            map.put("systemNotifications", this.mapper.writeValueAsString((Object)((PipelineExecutionImpl)execution).getSystemNotifications()));
        }
        catch (JsonProcessingException e) {
            throw new ExecutionSerializationException("Failed serializing execution", (Throwable)e);
        }
        execution.getStages().forEach(s -> map.putAll(this.serializeStage((StageExecution)s)));
        if (execution.getType() == ExecutionType.PIPELINE) {
            try {
                map.put("name", execution.getName());
                map.put("pipelineConfigId", execution.getPipelineConfigId());
                map.put("notifications", this.mapper.writeValueAsString((Object)execution.getNotifications()));
                map.put("initialConfig", this.mapper.writeValueAsString((Object)((PipelineExecutionImpl)execution).getInitialConfig()));
            }
            catch (JsonProcessingException e) {
                throw new ExecutionSerializationException("Failed serializing execution", (Throwable)e);
            }
        } else if (execution.getType() == ExecutionType.ORCHESTRATION) {
            map.put("description", execution.getDescription());
        }
        return map;
    }

    protected Map<String, String> serializeStage(StageExecution stage) {
        String prefix = String.format("stage.%s.", stage.getId());
        HashMap<String, String> map = new HashMap<String, String>();
        map.put(prefix + "refId", stage.getRefId());
        map.put(prefix + "type", stage.getType());
        map.put(prefix + "name", stage.getName());
        map.put(prefix + "startTime", stage.getStartTime() != null ? stage.getStartTime().toString() : null);
        map.put(prefix + "endTime", stage.getEndTime() != null ? stage.getEndTime().toString() : null);
        map.put(prefix + "startTimeExpiry", stage.getStartTimeExpiry() != null ? String.valueOf(stage.getStartTimeExpiry()) : null);
        map.put(prefix + "status", stage.getStatus().name());
        map.put(prefix + "syntheticStageOwner", stage.getSyntheticStageOwner() != null ? stage.getSyntheticStageOwner().name() : null);
        map.put(prefix + "parentStageId", stage.getParentStageId());
        if (!stage.getRequisiteStageRefIds().isEmpty()) {
            map.put(prefix + "requisiteStageRefIds", stage.getRequisiteStageRefIds().stream().collect(Collectors.joining(",")));
        }
        map.put(prefix + "scheduledTime", stage.getScheduledTime() != null ? stage.getScheduledTime().toString() : null);
        try {
            map.put(prefix + "context", this.mapper.writeValueAsString((Object)stage.getContext()));
            map.put(prefix + "outputs", this.mapper.writeValueAsString((Object)stage.getOutputs()));
            map.put(prefix + "tasks", this.mapper.writeValueAsString((Object)stage.getTasks()));
            map.put(prefix + "lastModified", stage.getLastModified() != null ? this.mapper.writeValueAsString((Object)stage.getLastModified()) : null);
        }
        catch (JsonProcessingException e) {
            throw new StageSerializationException(String.format("Failed converting stage to json, executionId: %s, stageId: %s", stage.getExecution().getId(), stage.getId()), (Throwable)e);
        }
        return map;
    }

    protected List<String> extractStages(Map<String, String> map) {
        HashSet stageIds = new HashSet();
        Pattern pattern = Pattern.compile("^stage\\.([-\\w]+)\\.");
        map.keySet().forEach(k -> {
            Matcher matcher = pattern.matcher((CharSequence)k);
            if (matcher.find()) {
                stageIds.add(matcher.group(1));
            }
        });
        return new ArrayList<String>(stageIds);
    }

    private void deleteInternal(RedisClientDelegate delegate, ExecutionType type, String id) {
        delegate.withCommandsClient(c -> {
            String key;
            block4: {
                key = this.executionKey(type, id);
                try {
                    String application = c.hget(key, "application");
                    String appKey = RedisExecutionRepository.appKey(type, application);
                    c.srem(appKey, new String[]{id});
                    c.srem(RedisExecutionRepository.allBufferedExecutionsKey(type), new String[]{id});
                    if (type != ExecutionType.PIPELINE) break block4;
                    String pipelineConfigId = c.hget(key, "pipelineConfigId");
                    c.zrem(RedisExecutionRepository.executionsByPipelineKey(pipelineConfigId), new String[]{id});
                }
                catch (ExecutionNotFoundException executionNotFoundException) {
                    c.del(key);
                    c.del(key + ":stageIndex");
                    c.srem(RedisExecutionRepository.alljobsKey(type), new String[]{id});
                    catch (Throwable throwable) {
                        c.del(key);
                        c.del(key + ":stageIndex");
                        c.srem(RedisExecutionRepository.alljobsKey(type), new String[]{id});
                        throw throwable;
                    }
                }
            }
            c.del(key);
            c.del(key + ":stageIndex");
            c.srem(RedisExecutionRepository.alljobsKey(type), new String[]{id});
        });
    }

    private Stream<PipelineExecution> getExecutionForPipelineConfigId(RedisClientDelegate delegate, String pipelineConfigId, Long buildTimeStartBoundary, Long buildTimeEndBoundary) {
        String executionsKey = RedisExecutionRepository.executionsByPipelineKey(pipelineConfigId);
        Set executionIds = (Set)delegate.withCommandsClient(c -> c.zrangeByScore(executionsKey, (double)buildTimeStartBoundary.longValue(), (double)buildTimeEndBoundary.longValue()));
        return executionIds.stream().map(executionId -> {
            String executionKey = this.pipelineKey((String)executionId);
            Map executionMap = (Map)delegate.withCommandsClient(c -> c.hgetAll(executionKey));
            String stageIdsKey = String.format("%s:stageIndex", executionKey);
            List stageIds = (List)delegate.withCommandsClient(c -> c.lrange(stageIdsKey, 0L, -1L));
            PipelineExecutionImpl execution = new PipelineExecutionImpl(ExecutionType.PIPELINE, executionId, (String)executionMap.get("application"));
            return this.buildExecution((PipelineExecution)execution, executionMap, stageIds);
        });
    }

    private List<PipelineExecution> getPipelinesForPipelineConfigIdsBetweenBuildTimeBoundaryFromRedis(RedisClientDelegate redisClientDelegate, List<String> pipelineConfigIds, long buildTimeStartBoundary, long buildTimeEndBoundary) {
        return pipelineConfigIds.stream().flatMap(pipelineConfigId -> this.getExecutionForPipelineConfigId(redisClientDelegate, (String)pipelineConfigId, buildTimeStartBoundary, buildTimeEndBoundary)).collect(Collectors.toList());
    }

    protected Observable<PipelineExecution> all(ExecutionType type, RedisClientDelegate redisClientDelegate) {
        return this.retrieveObservable(type, RedisExecutionRepository.alljobsKey(type), this.queryAllScheduler, redisClientDelegate);
    }

    protected Observable<PipelineExecution> allForApplication(ExecutionType type, String application, RedisClientDelegate redisClientDelegate) {
        return this.retrieveObservable(type, RedisExecutionRepository.appKey(type, application), this.queryByAppScheduler, redisClientDelegate);
    }

    protected Observable<PipelineExecution> retrieveObservable(ExecutionType type, String lookupKey, Scheduler scheduler, RedisClientDelegate redisClientDelegate) {
        Func0 fnBuilder = () -> key -> (Iterable)redisClientDelegate.withCommandsClient(c -> c.smembers(key));
        return this.retrieveObservable(type, lookupKey, (Func1<String, Iterable<String>>)((Func1)fnBuilder.call()), scheduler, redisClientDelegate);
    }

    protected Observable<PipelineExecution> retrieveObservable(ExecutionType type, String lookupKey, Func1<String, Iterable<String>> lookupKeyFetcher, Scheduler scheduler, RedisClientDelegate redisClientDelegate) {
        return Observable.just((Object)lookupKey).flatMapIterable(lookupKeyFetcher).buffer(this.chunkSize).flatMap(ids -> Observable.from((Iterable)ids).flatMap(executionId -> {
            try {
                return Observable.just((Object)this.retrieveInternal(redisClientDelegate, type, (String)executionId));
            }
            catch (ExecutionNotFoundException ignored) {
                this.log.info("Execution ({}) does not exist", (Object)StructuredArguments.value((String)"executionId", (Object)executionId));
                redisClientDelegate.withCommandsClient(c -> {
                    if (c.type(lookupKey).equals("zset")) {
                        c.zrem(lookupKey, new String[]{executionId});
                    } else {
                        c.srem(lookupKey, new String[]{executionId});
                    }
                });
            }
            catch (Exception e) {
                this.log.error("Failed to retrieve execution '{}'", (Object)StructuredArguments.value((String)"executionId", (Object)executionId), (Object)e);
            }
            return Observable.empty();
        })).subscribeOn(scheduler);
    }

    protected ImmutablePair<String, RedisClientDelegate> fetchKey(String id) {
        ImmutablePair pair = (ImmutablePair)this.redisClientDelegate.withCommandsClient(c -> {
            if (c.exists(this.pipelineKey(id)).booleanValue()) {
                return ImmutablePair.of(this.pipelineKey(id), this.redisClientDelegate);
            }
            if (c.exists(this.orchestrationKey(id)).booleanValue()) {
                return ImmutablePair.of(this.orchestrationKey(id), this.redisClientDelegate);
            }
            return ImmutablePair.nullPair();
        });
        if (pair.getLeft() == null && this.previousRedisClientDelegate.isPresent()) {
            RedisClientDelegate delegate = this.previousRedisClientDelegate.get();
            pair = (ImmutablePair)delegate.withCommandsClient(c -> {
                if (c.exists(this.pipelineKey(id)).booleanValue()) {
                    return ImmutablePair.of(this.pipelineKey(id), delegate);
                }
                if (c.exists(this.orchestrationKey(id)).booleanValue()) {
                    return ImmutablePair.of(this.orchestrationKey(id), delegate);
                }
                return null;
            });
        }
        if (pair.getLeft() == null) {
            throw new ExecutionNotFoundException("No execution found with id " + id);
        }
        return pair;
    }

    protected static String alljobsKey(ExecutionType type) {
        return String.format("allJobs:%s", type);
    }

    protected static String appKey(ExecutionType type, String app) {
        return String.format("%s:app:%s", type, app);
    }

    protected static String executionsByPipelineKey(String pipelineConfigId) {
        String id = pipelineConfigId != null ? pipelineConfigId : "---";
        return String.format("pipeline:executions:%s", id);
    }

    protected static String allBufferedExecutionsKey(ExecutionType type) {
        if (bufferedPrefix == null || bufferedPrefix.isEmpty()) {
            return String.format("buffered:%s", type);
        }
        return String.format("%s:buffered:%s", bufferedPrefix, type);
    }

    protected static String executionKeyPattern(@Nullable ExecutionType type) {
        String all = "*:app:*";
        if (type == null) {
            return "*:app:*";
        }
        switch (type) {
            case PIPELINE: {
                return "pipeline:app:*";
            }
            case ORCHESTRATION: {
                return "orchestration:app:*";
            }
        }
        return "*:app:*";
    }

    private void storeExecutionInternal(RedisClientDelegate delegate, PipelineExecution execution) {
        String key = this.executionKey(execution);
        String indexKey = String.format("%s:stageIndex", key);
        Map<String, String> map = this.serializeExecution(execution);
        delegate.withCommandsClient(c -> {
            c.sadd(RedisExecutionRepository.alljobsKey(execution.getType()), new String[]{execution.getId()});
            c.sadd(RedisExecutionRepository.appKey(execution.getType(), execution.getApplication()), new String[]{execution.getId()});
            if (execution.getStatus() == ExecutionStatus.BUFFERED) {
                c.sadd(RedisExecutionRepository.allBufferedExecutionsKey(execution.getType()), new String[]{execution.getId()});
            } else {
                c.srem(RedisExecutionRepository.allBufferedExecutionsKey(execution.getType()), new String[]{execution.getId()});
            }
            delegate.withTransaction(tx -> {
                tx.hdel(key, new String[]{"config"});
                tx.hmset(key, Maps.filterValues((Map)map, Objects::nonNull));
                if (!execution.getStages().isEmpty()) {
                    tx.del(indexKey);
                    tx.rpush(indexKey, (String[])execution.getStages().stream().map(StageExecution::getId).toArray(String[]::new));
                }
                tx.exec();
            });
            if (execution.getTrigger().getCorrelationId() != null) {
                c.set(String.format("correlation:%s", execution.getTrigger().getCorrelationId()), execution.getId());
            }
        });
    }

    private void storeStageInternal(RedisClientDelegate delegate, StageExecution stage, Boolean updateIndex) {
        String key = this.executionKey(stage);
        String indexKey = String.format("%s:stageIndex", key);
        Map<String, String> serializedStage = this.serializeStage(stage);
        List keysToRemove = serializedStage.entrySet().stream().filter(e -> e.getValue() == null).map(Map.Entry::getKey).collect(Collectors.toList());
        serializedStage.values().removeIf(Objects::isNull);
        delegate.withTransaction(tx -> {
            tx.hmset(key, serializedStage);
            if (!keysToRemove.isEmpty()) {
                tx.hdel(key, keysToRemove.toArray(new String[0]));
            }
            if (updateIndex.booleanValue()) {
                ListPosition pos = stage.getSyntheticStageOwner() == SyntheticStageOwner.STAGE_BEFORE ? ListPosition.BEFORE : ListPosition.AFTER;
                tx.linsert(indexKey, pos, stage.getParentStageId(), stage.getId());
            }
            tx.exec();
        });
    }

    protected PipelineExecution retrieveInternal(RedisClientDelegate delegate, ExecutionType type, String id) throws ExecutionNotFoundException {
        String key = this.executionKey(type, id);
        String indexKey = String.format("%s:stageIndex", key);
        boolean exists = (Boolean)delegate.withCommandsClient(c -> c.exists(key));
        if (!exists) {
            throw new ExecutionNotFoundException("No " + type + " found for " + id);
        }
        HashMap<String, String> map = new HashMap<String, String>();
        ArrayList<String> stageIds = new ArrayList<String>();
        delegate.withTransaction(tx -> {
            Response execResponse = tx.hgetAll(key);
            Response indexResponse = tx.lrange(indexKey, 0L, -1L);
            tx.exec();
            map.putAll((Map)execResponse.get());
            if (!((List)indexResponse.get()).isEmpty()) {
                stageIds.addAll((Collection)indexResponse.get());
            } else {
                stageIds.addAll(this.extractStages(map));
            }
        });
        PipelineExecutionImpl execution = new PipelineExecutionImpl(type, id, (String)map.get("application"));
        return this.buildExecution((PipelineExecution)execution, map, stageIds);
    }

    protected List<ExecutionStatus> fetchMultiExecutionStatus(RedisClientDelegate redisClientDelegate, List<String> keys) {
        return (List)redisClientDelegate.withMultiKeyPipeline(p -> {
            List responses = keys.stream().map(k -> p.hget(k, "status")).collect(Collectors.toList());
            p.sync();
            return responses.stream().map(Response::get).filter(Objects::nonNull).map(ExecutionStatus::valueOf).collect(Collectors.toList());
        });
    }

    protected Collection<RedisClientDelegate> allRedisDelegates() {
        ArrayList<RedisClientDelegate> delegates = new ArrayList<RedisClientDelegate>();
        delegates.add(this.redisClientDelegate);
        this.previousRedisClientDelegate.ifPresent(delegates::add);
        return delegates;
    }

    protected String executionKey(PipelineExecution execution) {
        return String.format("%s:%s", execution.getType(), execution.getId());
    }

    private String executionKey(StageExecution stage) {
        return String.format("%s:%s", stage.getExecution().getType(), stage.getExecution().getId());
    }

    private String executionKey(ExecutionType type, String id) {
        return String.format("%s:%s", type, id);
    }

    private String pipelineKey(String id) {
        return String.format("%s:%s", ExecutionType.PIPELINE, id);
    }

    private String orchestrationKey(String id) {
        return String.format("%s:%s", ExecutionType.ORCHESTRATION, id);
    }

    private RedisClientDelegate getRedisDelegate(PipelineExecution execution) {
        return this.getRedisDelegate(execution.getType(), execution.getId());
    }

    private RedisClientDelegate getRedisDelegate(StageExecution stage) {
        return this.getRedisDelegate(stage.getExecution().getType(), stage.getExecution().getId());
    }

    private RedisClientDelegate getRedisDelegate(ExecutionType type, String id) {
        if (StringUtils.isBlank((CharSequence)id) || !this.previousRedisClientDelegate.isPresent()) {
            return this.redisClientDelegate;
        }
        RedisClientDelegate delegate = (RedisClientDelegate)this.redisClientDelegate.withCommandsClient(c -> {
            if (c.exists(this.executionKey(type, id)).booleanValue()) {
                return this.redisClientDelegate;
            }
            return null;
        });
        if (delegate == null) {
            delegate = (RedisClientDelegate)this.previousRedisClientDelegate.get().withCommandsClient(c -> {
                if (c.exists(this.executionKey(type, id)).booleanValue()) {
                    return this.previousRedisClientDelegate.get();
                }
                return null;
            });
        }
        return delegate == null ? this.redisClientDelegate : delegate;
    }

    private RedisClientDelegate getRedisDelegate(String key) {
        if (Strings.isNullOrEmpty((String)key) || !this.previousRedisClientDelegate.isPresent()) {
            return this.redisClientDelegate;
        }
        RedisClientDelegate delegate = (RedisClientDelegate)this.redisClientDelegate.withCommandsClient(c -> {
            if (c.exists(key).booleanValue()) {
                return this.redisClientDelegate;
            }
            return null;
        });
        if (delegate == null) {
            delegate = (RedisClientDelegate)this.previousRedisClientDelegate.get().withCommandsClient(c -> {
                if (c.exists(key).booleanValue()) {
                    return this.previousRedisClientDelegate.get();
                }
                return null;
            });
        }
        return delegate == null ? this.redisClientDelegate : delegate;
    }

    private static class ImmutablePair<L, R> {
        private final L left;
        private final R right;

        public ImmutablePair(L left, R right) {
            this.left = left;
            this.right = right;
        }

        public static <T, U> ImmutablePair<T, U> of(T left, U right) {
            return new ImmutablePair<T, U>(left, right);
        }

        public static <T, U> ImmutablePair<T, U> nullPair() {
            return new ImmutablePair<Object, Object>(null, null);
        }

        public L getLeft() {
            return this.left;
        }

        public R getRight() {
            return this.right;
        }
    }
}

