/*
 * Decompiled with CFR 0.152.
 */
package io.awspring.cloud.sqs.listener.sink;

import io.awspring.cloud.sqs.MessageHeaderUtils;
import io.awspring.cloud.sqs.listener.MessageProcessingContext;
import io.awspring.cloud.sqs.listener.TaskExecutorAware;
import io.awspring.cloud.sqs.listener.pipeline.MessageProcessingPipeline;
import io.awspring.cloud.sqs.listener.sink.MessageProcessingPipelineSink;
import java.util.Collection;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.task.TaskExecutor;
import org.springframework.messaging.Message;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.Assert;
import org.springframework.util.StopWatch;

public abstract class AbstractMessageProcessingPipelineSink<T>
implements MessageProcessingPipelineSink<T>,
TaskExecutorAware {
    private static final Logger logger = LoggerFactory.getLogger(AbstractMessageProcessingPipelineSink.class);
    private final Object lifecycleMonitor = new Object();
    private volatile boolean running;
    private Executor taskExecutor;
    private MessageProcessingPipeline<T> messageProcessingPipeline;
    private String id;

    @Override
    public void setMessagePipeline(MessageProcessingPipeline<T> messageProcessingPipeline) {
        Assert.notNull(messageProcessingPipeline, (String)"messageProcessingPipeline must not be null.");
        this.messageProcessingPipeline = messageProcessingPipeline;
    }

    @Override
    public void setTaskExecutor(TaskExecutor taskExecutor) {
        Assert.notNull((Object)taskExecutor, (String)"executor cannot be null");
        this.taskExecutor = taskExecutor;
    }

    @Override
    public CompletableFuture<Void> emit(Collection<Message<T>> messages, MessageProcessingContext<T> context) {
        Assert.notNull(messages, (String)"messages cannot be null");
        if (!this.isRunning()) {
            logger.debug("{} {} not running, returning", (Object)this.getClass().getSimpleName(), (Object)this.id);
            return CompletableFuture.completedFuture(null);
        }
        if (messages.size() == 0) {
            logger.debug("No messages provided for {} {}, returning.", (Object)this.getClass().getSimpleName(), (Object)this.id);
            return CompletableFuture.completedFuture(null);
        }
        return this.doEmit(messages, context);
    }

    protected abstract CompletableFuture<Void> doEmit(Collection<Message<T>> var1, MessageProcessingContext<T> var2);

    protected CompletableFuture<Void> execute(Message<T> message, MessageProcessingContext<T> context) {
        logger.trace("Executing message {}", (Object)MessageHeaderUtils.getId(message));
        StopWatch watch = this.getStartedWatch();
        return ((CompletableFuture)this.doExecute(() -> this.messageProcessingPipeline.process(message, context)).whenComplete((v, t) -> context.runBackPressureReleaseCallback())).whenComplete((v, t) -> this.measureExecution(watch, Collections.singletonList(message)));
    }

    protected CompletableFuture<Void> execute(Collection<Message<T>> messages, MessageProcessingContext<T> context) {
        StopWatch watch = this.getStartedWatch();
        return ((CompletableFuture)this.doExecute(() -> this.messageProcessingPipeline.process(messages, context)).whenComplete((v, t) -> messages.forEach(msg -> context.runBackPressureReleaseCallback()))).whenComplete((v, t) -> this.measureExecution(watch, messages));
    }

    protected Void logError(Throwable t, Message<T> msg) {
        logger.error("Error processing message {}.", (Object)MessageHeaderUtils.getId(msg), (Object)t);
        return null;
    }

    protected Void logError(Throwable t, Collection<Message<T>> msgs) {
        logger.error("Error processing message {}.", (Object)MessageHeaderUtils.getId(msgs), (Object)t);
        return null;
    }

    private StopWatch getStartedWatch() {
        StopWatch watch = new StopWatch();
        watch.start();
        return watch;
    }

    private void measureExecution(StopWatch watch, Collection<Message<T>> messages) {
        watch.stop();
        if (logger.isTraceEnabled()) {
            logger.trace("Messages {} processed in {}ms", (Object)MessageHeaderUtils.getId(messages), (Object)watch.getTotalTimeMillis());
        }
    }

    private CompletableFuture<Void> doExecute(Supplier<CompletableFuture<?>> supplier) {
        return ((CompletableFuture)CompletableFuture.supplyAsync(supplier, this.taskExecutor).thenCompose(x -> x)).thenRun(() -> {});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() {
        if (this.isRunning()) {
            logger.debug("{} {} already running", (Object)this.getClass().getSimpleName(), (Object)this.id);
            return;
        }
        Object object = this.lifecycleMonitor;
        synchronized (object) {
            Assert.notNull(this.messageProcessingPipeline, (String)"messageListener not set");
            Assert.notNull((Object)this.taskExecutor, (String)"taskExecutor not set");
            this.id = this.getOrCreateId();
            logger.debug("Starting {} {}", (Object)this.getClass().getSimpleName(), (Object)this.id);
            this.running = true;
        }
    }

    private String getOrCreateId() {
        return this.taskExecutor instanceof ThreadPoolTaskExecutor ? ((ThreadPoolTaskExecutor)this.taskExecutor).getThreadNamePrefix() : UUID.randomUUID().toString();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() {
        if (!this.isRunning()) {
            logger.debug("{} {} already stopped", (Object)this.getClass().getSimpleName(), (Object)this.id);
            return;
        }
        Object object = this.lifecycleMonitor;
        synchronized (object) {
            logger.debug("Stopping {} {}", (Object)this.getClass().getSimpleName(), (Object)this.id);
            this.running = false;
        }
    }

    public boolean isRunning() {
        return this.running;
    }
}

