/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.spinnaker.orca.q;

import com.netflix.spectator.api.Id;
import com.netflix.spectator.api.Registry;
import com.netflix.spinnaker.kork.dynamicconfig.DynamicConfigService;
import com.netflix.spinnaker.orca.api.pipeline.models.PipelineExecution;
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionNotFoundException;
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository;
import com.netflix.spinnaker.orca.q.ExecutionLevel;
import com.netflix.spinnaker.q.Activator;
import com.netflix.spinnaker.q.Message;
import com.netflix.spinnaker.q.Queue;
import java.time.Duration;
import java.time.Instant;
import javax.annotation.PostConstruct;
import kotlin.Metadata;
import kotlin.Unit;
import kotlin.jvm.functions.Function0;
import kotlin.jvm.functions.Function2;
import kotlin.jvm.internal.Intrinsics;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;

@Metadata(mv={1, 4, 2}, bv={1, 0, 3}, k=1, d1={"\u0000P\n\u0002\u0018\u0002\n\u0002\u0010\u0000\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0010\u0002\n\u0000\n\u0002\u0010\u000b\n\u0002\b\u0004\n\u0002\u0018\u0002\n\u0000\u0018\u00002\u00020\u0001B7\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0003\u0012\u0006\u0010\u0005\u001a\u00020\u0006\u0012\u0006\u0010\u0007\u001a\u00020\b\u0012\u0006\u0010\t\u001a\u00020\n\u0012\b\u0010\u000b\u001a\u0004\u0018\u00010\f\u00a2\u0006\u0002\u0010\rJ\b\u0010\u0015\u001a\u00020\u0016H\u0007J\b\u0010\u0017\u001a\u00020\u0018H\u0002J\b\u0010\u0019\u001a\u00020\u0016H\u0007J\u0006\u0010\u001a\u001a\u00020\u0016J\u0010\u0010\u001b\u001a\u00020\u00162\u0006\u0010\u001c\u001a\u00020\u001dH\u0002R\u000e\u0010\u0007\u001a\u00020\bX\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\t\u001a\u00020\nX\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u0010\u0010\u000b\u001a\u0004\u0018\u00010\fX\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u0016\u0010\u000e\u001a\n \u0010*\u0004\u0018\u00010\u000f0\u000fX\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u0016\u0010\u0011\u001a\n \u0010*\u0004\u0018\u00010\u00120\u0012X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0004\u001a\u00020\u0003X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0002\u001a\u00020\u0003X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0005\u001a\u00020\u0006X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u0016\u0010\u0013\u001a\n \u0010*\u0004\u0018\u00010\u00120\u0012X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u0016\u0010\u0014\u001a\n \u0010*\u0004\u0018\u00010\u00120\u0012X\u0082\u0004\u00a2\u0006\u0002\n\u0000\u00a8\u0006\u001e"}, d2={"Lcom/netflix/spinnaker/orca/q/QueueShovel;", "", "queue", "Lcom/netflix/spinnaker/q/Queue;", "previousQueue", "registry", "Lcom/netflix/spectator/api/Registry;", "activator", "Lcom/netflix/spinnaker/q/Activator;", "config", "Lcom/netflix/spinnaker/kork/dynamicconfig/DynamicConfigService;", "executionRepository", "Lcom/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository;", "(Lcom/netflix/spinnaker/q/Queue;Lcom/netflix/spinnaker/q/Queue;Lcom/netflix/spectator/api/Registry;Lcom/netflix/spinnaker/q/Activator;Lcom/netflix/spinnaker/kork/dynamicconfig/DynamicConfigService;Lcom/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository;)V", "log", "Lorg/slf4j/Logger;", "kotlin.jvm.PlatformType", "pollOpsRateId", "Lcom/netflix/spectator/api/Id;", "shovelErrorId", "shoveledMessageId", "confirmShovelUsage", "", "isActive", "", "migrateIfActive", "migrateOne", "transferOwnership", "message", "Lcom/netflix/spinnaker/q/Message;", "orca-queue"})
public final class QueueShovel {
    private final Logger log;
    private final Id pollOpsRateId;
    private final Id shoveledMessageId;
    private final Id shovelErrorId;
    private final Queue queue;
    private final Queue previousQueue;
    private final Registry registry;
    private final Activator activator;
    private final DynamicConfigService config;
    private final ExecutionRepository executionRepository;

    @Scheduled(fixedRate=5000L)
    public final void migrateIfActive() {
        if (!this.isActive()) {
            return;
        }
        this.log.info("Actively shoveling from " + this.previousQueue + " to " + this.queue);
        Duration workDuration = Duration.ofMillis(4750L);
        Instant start = Instant.now();
        while (Duration.between(start, Instant.now()).compareTo(workDuration) < 0) {
            this.migrateOne();
            Thread.sleep(50L);
        }
    }

    private final boolean isActive() {
        Object object = this.config.getConfig(Boolean.TYPE, "queue.shovel.active", (Object)false);
        Intrinsics.checkNotNullExpressionValue((Object)object, (String)"config.getConfig(Boolean\u2026ue.shovel.active\", false)");
        return (Boolean)object != false && this.activator.getEnabled();
    }

    public final void migrateOne() {
        this.registry.counter(this.pollOpsRateId).increment();
        this.previousQueue.poll((Function2)new Function2<Message, Function0<? extends Unit>, Unit>(this){
            final /* synthetic */ QueueShovel this$0;

            public final void invoke(@NotNull Message message, @NotNull Function0<Unit> ack) {
                Intrinsics.checkNotNullParameter((Object)message, (String)"message");
                Intrinsics.checkNotNullParameter(ack, (String)"ack");
                try {
                    QueueShovel.access$getLog$p(this.this$0).debug("Shoveling message " + message);
                    QueueShovel.access$transferOwnership(this.this$0, message);
                    QueueShovel.access$getQueue$p(this.this$0).push(message);
                    ack.invoke();
                    QueueShovel.access$getRegistry$p(this.this$0).counter(QueueShovel.access$getShoveledMessageId$p(this.this$0)).increment();
                }
                catch (ExecutionNotFoundException e) {
                    QueueShovel.access$getLog$p(this.this$0).error("Failed shoveling message from previous queue to active (message: " + message + ") " + "because of exception " + (Object)((Object)e));
                    QueueShovel.access$getRegistry$p(this.this$0).counter(QueueShovel.access$getShovelErrorId$p(this.this$0)).increment();
                }
                catch (Throwable e) {
                    QueueShovel.access$getLog$p(this.this$0).error("Failed shoveling message from previous queue to active (message: " + message + ')', e);
                    QueueShovel.access$getRegistry$p(this.this$0).counter(QueueShovel.access$getShovelErrorId$p(this.this$0)).increment();
                }
            }
            {
                this.this$0 = queueShovel;
                super(2);
            }
        });
    }

    private final void transferOwnership(Message message) {
        boolean isForeign;
        if (this.executionRepository == null) {
            return;
        }
        if (!(message instanceof ExecutionLevel)) {
            this.log.warn("Message " + message + " does not implement ExecutionLevel, can not inspect partition");
            return;
        }
        PipelineExecution pipelineExecution = this.executionRepository.retrieve(((ExecutionLevel)message).getExecutionType(), ((ExecutionLevel)message).getExecutionId());
        Intrinsics.checkNotNullExpressionValue((Object)pipelineExecution, (String)"executionRepository.retr\u2026ype, message.executionId)");
        PipelineExecution execution = pipelineExecution;
        boolean bl = isForeign = !this.executionRepository.handlesPartition(execution.getPartition());
        if (isForeign) {
            this.log.info("Taking ownership of foreign execution " + execution.getId() + " with partition '" + execution.getPartition() + "'. " + "Setting partition to '" + this.executionRepository.getPartition() + '\'');
            execution.setPartition(this.executionRepository.getPartition());
            this.executionRepository.store(execution);
        }
    }

    @PostConstruct
    public final void confirmShovelUsage() {
        this.log.info(this.getClass().getSimpleName() + " migrator from " + this.previousQueue + " to " + this.queue + " is enabled");
        if (this.executionRepository == null) {
            this.log.warn(this.getClass().getSimpleName() + " configured without an ExecutionRepository, won't be able to transfer ownership");
        }
    }

    public QueueShovel(@NotNull Queue queue, @NotNull Queue previousQueue, @NotNull Registry registry, @NotNull Activator activator, @NotNull DynamicConfigService config, @Nullable ExecutionRepository executionRepository) {
        Intrinsics.checkNotNullParameter((Object)queue, (String)"queue");
        Intrinsics.checkNotNullParameter((Object)previousQueue, (String)"previousQueue");
        Intrinsics.checkNotNullParameter((Object)registry, (String)"registry");
        Intrinsics.checkNotNullParameter((Object)activator, (String)"activator");
        Intrinsics.checkNotNullParameter((Object)config, (String)"config");
        this.queue = queue;
        this.previousQueue = previousQueue;
        this.registry = registry;
        this.activator = activator;
        this.config = config;
        this.executionRepository = executionRepository;
        this.log = LoggerFactory.getLogger(this.getClass());
        this.pollOpsRateId = this.registry.createId("orca.nu.shovel.pollOpsRate");
        this.shoveledMessageId = this.registry.createId("orca.nu.shovel.pushedMessageRate");
        this.shovelErrorId = this.registry.createId("orca.nu.shovel.pushedMessageErrorRate");
    }

    public static final /* synthetic */ Logger access$getLog$p(QueueShovel $this) {
        return $this.log;
    }

    public static final /* synthetic */ void access$transferOwnership(QueueShovel $this, Message message) {
        $this.transferOwnership(message);
    }

    public static final /* synthetic */ Queue access$getQueue$p(QueueShovel $this) {
        return $this.queue;
    }

    public static final /* synthetic */ Registry access$getRegistry$p(QueueShovel $this) {
        return $this.registry;
    }

    public static final /* synthetic */ Id access$getShoveledMessageId$p(QueueShovel $this) {
        return $this.shoveledMessageId;
    }

    public static final /* synthetic */ Id access$getShovelErrorId$p(QueueShovel $this) {
        return $this.shovelErrorId;
    }
}

