/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.core.util.event.handler;

import com.lmax.disruptor.EventHandler;
import io.siddhi.core.event.Event;
import io.siddhi.core.stream.StreamJunction;
import io.siddhi.core.util.event.handler.EventExchangeHolder;
import java.beans.ExceptionListener;
import java.util.LinkedList;
import java.util.List;
import org.apache.log4j.Logger;

public class StreamHandler
implements EventHandler<EventExchangeHolder> {
    private static final Logger log = Logger.getLogger(StreamHandler.class);
    private final String streamName;
    private final String siddhiAppName;
    private final StreamJunction faultStreamJunction;
    private final StreamJunction.OnErrorAction onErrorAction;
    private final ExceptionListener exceptionListener;
    private List<StreamJunction.Receiver> receivers;
    private int batchSize;
    private List<Event> eventBuffer = new LinkedList<Event>();

    public StreamHandler(List<StreamJunction.Receiver> receivers, int batchSize, String streamName, String siddhiAppName, StreamJunction faultStreamJunction, StreamJunction.OnErrorAction onErrorAction, ExceptionListener exceptionListener) {
        this.receivers = receivers;
        this.batchSize = batchSize;
        this.streamName = streamName;
        this.siddhiAppName = siddhiAppName;
        this.faultStreamJunction = faultStreamJunction;
        this.onErrorAction = onErrorAction;
        this.exceptionListener = exceptionListener;
    }

    public void onEvent(EventExchangeHolder eventExchangeHolder, long sequence, boolean endOfBatch) {
        boolean isProcessed = eventExchangeHolder.getAndSetIsProcessed(true);
        if (!isProcessed) {
            this.eventBuffer.add(eventExchangeHolder.getEvent());
            if (this.eventBuffer.size() == this.batchSize || endOfBatch) {
                for (StreamJunction.Receiver receiver : this.receivers) {
                    try {
                        receiver.receive(this.eventBuffer);
                    }
                    catch (Exception e) {
                        this.onError(this.eventBuffer, e);
                    }
                }
                this.eventBuffer.clear();
            }
        } else if (endOfBatch && this.eventBuffer.size() != 0) {
            for (StreamJunction.Receiver receiver : this.receivers) {
                try {
                    receiver.receive(this.eventBuffer);
                }
                catch (Exception e) {
                    this.onError(this.eventBuffer, e);
                }
            }
            this.eventBuffer.clear();
        }
    }

    private void onError(List<Event> eventBuffer, Exception e) {
        if (this.exceptionListener != null) {
            this.exceptionListener.exceptionThrown(e);
        }
        switch (this.onErrorAction) {
            case LOG: {
                for (Event event : eventBuffer) {
                    log.error((Object)("Error in SiddhiApp '" + this.siddhiAppName + "' after consuming events from Stream '" + this.streamName + "', " + e.getMessage() + ". Hence, dropping event '" + event.toString() + "'"), (Throwable)e);
                }
                break;
            }
            case STREAM: {
                for (Event event : eventBuffer) {
                    if (this.faultStreamJunction != null) {
                        this.faultStreamJunction.sendEvent(event);
                        continue;
                    }
                    log.error((Object)("Error in SiddhiApp '" + this.siddhiAppName + "' after consuming events from Stream '" + this.streamName + "', " + e.getMessage() + ". Siddhi Fault Stream for '" + this.streamName + "' is not defined. Hence dropping the event '" + event.toString() + "'"), (Throwable)e);
                }
                break;
            }
        }
    }
}

