/*
 * Decompiled with CFR 0.152.
 */
package de.tschumacher.sqsservice.consumer;

import com.amazonaws.services.sqs.model.Message;
import de.tschumacher.sqsservice.SQSQueue;
import de.tschumacher.sqsservice.consumer.SQSMessageHandler;
import de.tschumacher.sqsservice.supplier.SQSMessageFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SQSMessageReceiver<F> {
    private static final Logger logger = LoggerFactory.getLogger(SQSMessageReceiver.class);
    private static final int WORKER_COUNT = 5;
    protected static final int MAX_RETRYS = 5;
    protected static final int RETRY_SECONDS = 120;
    private final SQSMessageHandler<F> handler;
    private final ExecutorService executorService;
    private boolean running = false;
    private final Runnable worker;
    final SQSMessageFactory<F> factory;

    public SQSMessageReceiver(SQSQueue queue, SQSMessageHandler<F> handler, SQSMessageFactory<F> factory) {
        this.handler = handler;
        this.worker = this.newWorker(queue);
        this.executorService = Executors.newFixedThreadPool(5);
        this.factory = factory;
        this.start();
    }

    public void start() {
        this.running = true;
        for (int i = 0; i < 5; ++i) {
            this.executorService.submit(this.worker);
        }
    }

    public void stop() throws InterruptedException {
        this.running = false;
    }

    private Runnable newWorker(final SQSQueue queue) {
        return new Runnable(){

            @Override
            public void run() {
                while (SQSMessageReceiver.this.running) {
                    try {
                        Message receiveMessage = queue.receiveMessage();
                        if (receiveMessage == null) continue;
                        try {
                            SQSMessageReceiver.this.handler.receivedMessage(queue, SQSMessageReceiver.this.factory.createMessage(receiveMessage.getBody()));
                            queue.deleteMessage(receiveMessage.getReceiptHandle());
                        }
                        catch (Throwable e) {
                            logger.error("could not process message", e);
                            queue.changeMessageVisibility(receiveMessage.getReceiptHandle(), 120);
                        }
                    }
                    catch (Throwable e) {
                        logger.error("could not handle message", e);
                    }
                }
            }
        };
    }

    protected void finalize() throws Throwable {
        this.stop();
        super.finalize();
    }
}

