/*
 * Decompiled with CFR 0.152.
 */
package org.zalando.nakadi.service;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.common.KafkaException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zalando.nakadi.domain.ConsumedEvent;
import org.zalando.nakadi.repository.EventConsumer;
import org.zalando.nakadi.service.EventStreamConfig;

public class EventStream {
    private static final Logger LOG = LoggerFactory.getLogger(EventStream.class);
    public static final String BATCH_SEPARATOR = "\n";
    public static final Charset UTF8 = Charset.forName("UTF-8");
    private final OutputStream outputStream;
    private final EventConsumer eventConsumer;
    private final EventStreamConfig config;

    public EventStream(EventConsumer eventConsumer, OutputStream outputStream, EventStreamConfig config) {
        this.eventConsumer = eventConsumer;
        this.outputStream = outputStream;
        this.config = config;
    }

    public void streamEvents(AtomicBoolean connectionReady) {
        try {
            int messagesRead = 0;
            Map<String, Integer> keepAliveInARow = this.createMapWithPartitionKeys(partition -> 0);
            Map<String, List> currentBatches = this.createMapWithPartitionKeys(partition -> Lists.newArrayList());
            HashMap latestOffsets = Maps.newHashMap(this.config.getCursors());
            long start = System.currentTimeMillis();
            Map<String, Long> batchStartTimes = this.createMapWithPartitionKeys(partition -> start);
            while (connectionReady.get()) {
                boolean keepAliveLimitReachedForAllPartitions;
                Optional<ConsumedEvent> eventOrEmpty = this.eventConsumer.readEvent();
                if (eventOrEmpty.isPresent()) {
                    ConsumedEvent event = eventOrEmpty.get();
                    latestOffsets.put(event.getPartition(), event.getOffset());
                    currentBatches.get(event.getPartition()).add(event.getEvent());
                    ++messagesRead;
                    keepAliveInARow.put(event.getPartition(), 0);
                }
                for (String partition2 : this.config.getCursors().keySet()) {
                    long timeSinceBatchStart = System.currentTimeMillis() - batchStartTimes.get(partition2);
                    if ((long)(this.config.getBatchTimeout() * 1000) > timeSinceBatchStart && currentBatches.get(partition2).size() < this.config.getBatchLimit()) continue;
                    this.sendBatch(partition2, (String)latestOffsets.get(partition2), currentBatches.get(partition2));
                    if (currentBatches.get(partition2).size() == 0) {
                        keepAliveInARow.put(partition2, keepAliveInARow.get(partition2) + 1);
                    }
                    currentBatches.get(partition2).clear();
                    batchStartTimes.put(partition2, System.currentTimeMillis());
                }
                if (this.config.getStreamKeepAliveLimit() == 0 || !(keepAliveLimitReachedForAllPartitions = keepAliveInARow.values().stream().allMatch(keepAlives -> keepAlives >= this.config.getStreamKeepAliveLimit()))) {
                    long timeSinceStart = System.currentTimeMillis() - start;
                    if ((this.config.getStreamTimeout() == 0 || timeSinceStart < (long)(this.config.getStreamTimeout() * 1000)) && (this.config.getStreamLimit() == 0 || messagesRead < this.config.getStreamLimit())) continue;
                    for (String partition3 : this.config.getCursors().keySet()) {
                        if (currentBatches.get(partition3).size() <= 0) continue;
                        this.sendBatch(partition3, (String)latestOffsets.get(partition3), currentBatches.get(partition3));
                    }
                }
                break;
            }
        }
        catch (IOException e) {
            LOG.info("I/O error occurred when streaming events (possibly client closed connection)", (Throwable)e);
        }
        catch (IllegalStateException e) {
            LOG.info("Error occurred when streaming events (possibly server closed connection)", (Throwable)e);
        }
        catch (KafkaException e) {
            LOG.error("Error occurred when polling events from kafka", (Throwable)e);
        }
    }

    private <T> Map<String, T> createMapWithPartitionKeys(Function<String, T> valueFunction) {
        return this.config.getCursors().keySet().stream().collect(Collectors.toMap(Function.identity(), valueFunction));
    }

    public static String createStreamEvent(String partition, String offset, List<String> events, Optional<String> topology) {
        StringBuilder builder = new StringBuilder().append("{\"cursor\":{\"partition\":\"").append(partition).append("\",\"offset\":\"").append(offset).append("\"}");
        if (!events.isEmpty()) {
            builder.append(",\"events\":[");
            events.stream().forEach(event -> builder.append((String)event).append(","));
            builder.deleteCharAt(builder.length() - 1).append("]");
        }
        builder.append("}").append(BATCH_SEPARATOR);
        return builder.toString();
    }

    private void sendBatch(String partition, String offset, List<String> currentBatch) throws IOException {
        String streamEvent = EventStream.createStreamEvent(partition, offset, currentBatch, Optional.empty());
        this.outputStream.write(streamEvent.getBytes(UTF8));
        this.outputStream.flush();
    }
}

