/*
 * Decompiled with CFR 0.152.
 */
package com.informix.stream.impl;

import com.informix.stream.api.IfmxStreamEngine;
import com.informix.stream.api.IfmxStreamListener;
import com.informix.stream.api.IfmxStreamRecord;
import com.informix.stream.api.IfxStream;
import com.informix.stream.impl.IfxStreamException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;

public class IfxStreamRunner
implements IfxStream {
    private final CopyOnWriteArrayList<IfmxStreamListener> listeners = new CopyOnWriteArrayList();
    private final IfmxStreamEngine engine;
    private final List<IfxStreamException> exceptions = new ArrayList<IfxStreamException>();
    private final AtomicBoolean stopEngine = new AtomicBoolean(false);

    public IfxStreamRunner(IfmxStreamEngine engine) {
        this.engine = engine;
    }

    @Override
    public void run() {
        try {
            this.engine.init();
        }
        catch (IfxStreamException e) {
            this.exceptions.add(e);
            return;
        }
        catch (SQLException e) {
            this.exceptions.add(new IfxStreamException("SQL error occurred processing stream records", e));
            return;
        }
        while (!this.stopEngine.get()) {
            IfmxStreamRecord r = null;
            try {
                r = this.engine.getRecord();
                for (IfmxStreamListener listener : this.listeners) {
                    try {
                        listener.accept(r);
                    }
                    catch (Exception e) {
                        this.exceptions.add(new IfxStreamException("An exception occurred while listener [" + listener + "] was processing record [" + r + "]", e));
                    }
                }
            }
            catch (IfxStreamException e) {
                this.exceptions.add(e);
            }
            catch (SQLException e) {
                this.exceptions.add(new IfxStreamException("SQL error occurred processing stream records", e));
            }
            if (!Thread.currentThread().isInterrupted()) continue;
            break;
        }
    }

    @Override
    public List<IfxStreamException> getExceptions() {
        return this.exceptions;
    }

    @Override
    public IfxStreamRunner addListener(IfmxStreamListener listener) {
        this.listeners.add(listener);
        return this;
    }

    @Override
    public void close() throws IfxStreamException {
        this.stopEngine.set(true);
        this.engine.close();
    }
}

