/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.process.test.engine;

import io.camunda.zeebe.engine.processing.streamprocessor.JobStreamer;
import io.camunda.zeebe.process.test.engine.CommandWriter;
import io.camunda.zeebe.protocol.impl.record.RecordMetadata;
import io.camunda.zeebe.protocol.impl.stream.job.ActivatedJob;
import io.camunda.zeebe.protocol.impl.stream.job.JobActivationProperties;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.function.Predicate;
import org.agrona.DirectBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class InMemoryJobStreamer
implements JobStreamer {
    private static final Logger LOGGER = LoggerFactory.getLogger(InMemoryJobStreamer.class);
    private final ConcurrentMap<DirectBuffer, InMemoryJobStream> streams = new ConcurrentHashMap<DirectBuffer, InMemoryJobStream>();
    private final CommandWriter yieldWriter;

    InMemoryJobStreamer(CommandWriter yieldWriter) {
        this.yieldWriter = yieldWriter;
    }

    @Override
    public Optional<JobStreamer.JobStream> streamFor(DirectBuffer jobType, Predicate<JobActivationProperties> filter) {
        return Optional.ofNullable((InMemoryJobStream)this.streams.get(jobType)).flatMap(s2 -> filter.test(s2.properties()) ? Optional.of(s2) : Optional.empty());
    }

    void addStream(DirectBuffer jobType, JobActivationProperties properties, JobConsumer consumer) {
        this.streams.compute(jobType, (ignored, s2) -> {
            InMemoryJobStream stream = s2 == null ? new InMemoryJobStream(properties, new CopyOnWriteArraySet<JobConsumer>()) : s2;
            stream.consumers.add(consumer);
            return stream;
        });
    }

    void removeStream(DirectBuffer jobType, JobConsumer consumer) {
        this.streams.compute(jobType, (ignored, stream) -> {
            if (stream == null) {
                return null;
            }
            stream.consumers.remove(consumer);
            if (stream.consumers.isEmpty()) {
                return null;
            }
            return stream;
        });
    }

    private void yieldJob(ActivatedJob job) {
        RecordMetadata metadata = new RecordMetadata().intent(JobIntent.YIELD).recordType(RecordType.COMMAND).valueType(ValueType.JOB);
        this.yieldWriter.writeCommandWithKey(job.jobKey(), job.jobRecord(), metadata);
    }

    private final class InMemoryJobStream
    implements JobStreamer.JobStream {
        private final JobActivationProperties properties;
        private final Set<JobConsumer> consumers;

        InMemoryJobStream(JobActivationProperties properties, Set<JobConsumer> consumers) {
            this.properties = properties;
            this.consumers = consumers;
        }

        @Override
        public JobActivationProperties properties() {
            return this.properties;
        }

        @Override
        public void push(ActivatedJob payload) {
            LinkedList<JobConsumer> shuffled = new LinkedList<JobConsumer>(this.consumers);
            Collections.shuffle(shuffled);
            this.push(shuffled, payload);
        }

        private void push(Queue<JobConsumer> consumers, ActivatedJob job) {
            JobConsumer consumer = consumers.poll();
            if (consumer == null) {
                LOGGER.debug("Failed to push job to clients, exhausted all known clients");
                InMemoryJobStreamer.this.yieldJob(job);
                return;
            }
            try {
                consumer.consumeJob(job).whenCompleteAsync((status, error2) -> {
                    if (error2 != null) {
                        this.onPushError(consumers, job, (Throwable)error2);
                        return;
                    }
                    if (status == PushStatus.BLOCKED) {
                        LOGGER.trace("Underlying stream or client is blocked, retrying with next consumer");
                        CompletableFuture.runAsync(() -> this.push(consumers, job));
                    }
                });
            }
            catch (Exception e) {
                this.onPushError(consumers, job, e);
            }
        }

        private void onPushError(Queue<JobConsumer> consumers, ActivatedJob job, Throwable error2) {
            LOGGER.debug("Failed to push job to client, retrying with next consumer", error2);
            CompletableFuture.runAsync(() -> this.push(consumers, job));
        }
    }

    static interface JobConsumer {
        public CompletionStage<PushStatus> consumeJob(ActivatedJob var1);
    }

    static enum PushStatus {
        PUSHED,
        BLOCKED;

    }
}

