/*
 * Decompiled with CFR 0.152.
 */
package com.informix.jdbc.stream.impl;

import com.informix.jdbc.stream.api.RecordStream;
import com.informix.jdbc.stream.api.StreamEngine;
import com.informix.jdbc.stream.api.StreamListener;
import com.informix.jdbc.stream.api.StreamRecord;
import com.informix.jdbc.stream.impl.StreamException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;

public class RecordStreamRunner
implements RecordStream {
    private final CopyOnWriteArrayList<StreamListener> listeners = new CopyOnWriteArrayList();
    private final StreamEngine engine;
    private final List<StreamException> exceptions = new ArrayList<StreamException>();
    private final AtomicBoolean stopEngine = new AtomicBoolean(false);
    private final boolean stopOnError;

    public RecordStreamRunner(StreamEngine engine) {
        this.engine = engine;
        this.stopOnError = true;
    }

    public RecordStreamRunner(StreamEngine engine, boolean stopOnError) {
        this.engine = engine;
        this.stopOnError = stopOnError;
    }

    @Override
    public void run() {
        try {
            this.engine.init();
        }
        catch (StreamException e) {
            this.exceptions.add(e);
            return;
        }
        catch (SQLException e) {
            this.exceptions.add(new StreamException("SQL error occurred processing stream records", e));
            return;
        }
        while (!this.stopEngine.get()) {
            try {
                StreamRecord r = this.engine.getRecord();
                this.listeners.forEach(listener -> {
                    try {
                        listener.accept(r);
                    }
                    catch (Exception e) {
                        this.exceptions.add(new StreamException("An exception occurred while listener [" + listener + "] was processing record [" + r + "]", e));
                    }
                });
            }
            catch (StreamException e) {
                this.exceptions.add(e);
            }
            catch (SQLException e) {
                this.exceptions.add(new StreamException("SQL error occurred processing stream records", e));
            }
            if (!Thread.currentThread().isInterrupted() && (!this.stopOnError || this.exceptions.isEmpty())) continue;
            break;
        }
    }

    @Override
    public List<StreamException> getExceptions() {
        return this.exceptions;
    }

    @Override
    public RecordStreamRunner addListener(StreamListener listener) {
        this.listeners.add(listener);
        return this;
    }

    @Override
    public void close() throws StreamException {
        this.stopEngine.set(true);
        this.engine.close();
    }
}

