/*
 * Decompiled with CFR 0.152.
 */
package org.zalando.nakadi.controller;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;
import org.zalando.nakadi.exceptions.NakadiException;
import org.zalando.nakadi.service.ClosedConnectionsCrutch;
import org.zalando.nakadi.service.subscription.StreamParameters;
import org.zalando.nakadi.service.subscription.SubscriptionOutput;
import org.zalando.nakadi.service.subscription.SubscriptionStreamer;
import org.zalando.nakadi.service.subscription.SubscriptionStreamerFactory;
import org.zalando.nakadi.util.FeatureToggleService;
import org.zalando.problem.Problem;

@RestController
public class SubscriptionStreamController {
    private static final Logger LOG = LoggerFactory.getLogger(SubscriptionStreamController.class);
    private final SubscriptionStreamerFactory subscriptionStreamerFactory;
    private final FeatureToggleService featureToggleService;
    private final ObjectMapper jsonMapper;
    private final ClosedConnectionsCrutch closedConnectionsCrutch;

    @Autowired
    public SubscriptionStreamController(SubscriptionStreamerFactory subscriptionStreamerFactory, FeatureToggleService featureToggleService, ObjectMapper objectMapper, ClosedConnectionsCrutch closedConnectionsCrutch) {
        this.subscriptionStreamerFactory = subscriptionStreamerFactory;
        this.featureToggleService = featureToggleService;
        this.jsonMapper = objectMapper;
        this.closedConnectionsCrutch = closedConnectionsCrutch;
    }

    @RequestMapping(value={"/subscriptions/{subscription_id}/events"}, method={RequestMethod.GET})
    public StreamingResponseBody streamEvents(@PathVariable(value="subscription_id") String subscriptionId, @RequestParam(value="window_size", required=false, defaultValue="100") int windowSize, @RequestParam(value="commit_timeout", required=false, defaultValue="30") int commitTimeout, @RequestParam(value="batch_limit", required=false, defaultValue="1") int batchLimit, @Nullable @RequestParam(value="stream_limit", required=false) Long streamLimit, @RequestParam(value="batch_flush_timeout", required=false, defaultValue="30") int batchTimeout, @Nullable @RequestParam(value="stream_timeout", required=false) Long streamTimeout, @Nullable @RequestParam(value="stream_keep_alive_limit", required=false) Integer streamKeepAliveLimit, HttpServletRequest request, HttpServletResponse response) throws IOException {
        return outputStream -> {
            if (!this.featureToggleService.isFeatureEnabled(FeatureToggleService.Feature.HIGH_LEVEL_API)) {
                response.setStatus(501);
                return;
            }
            AtomicBoolean connectionReady = this.closedConnectionsCrutch.listenForConnectionClose(request);
            SubscriptionStreamer streamer = null;
            SubscriptionOutputImpl output = new SubscriptionOutputImpl(response, outputStream);
            try {
                StreamParameters streamParameters = StreamParameters.of(batchLimit, streamLimit, batchTimeout, streamTimeout, streamKeepAliveLimit, windowSize, commitTimeout);
                streamer = this.subscriptionStreamerFactory.build(subscriptionId, streamParameters, output, connectionReady);
                streamer.stream();
            }
            catch (InterruptedException ex) {
                LOG.warn("Interrupted while streaming with " + streamer, (Throwable)ex);
                Thread.currentThread().interrupt();
            }
            catch (Exception e) {
                output.onException(e);
            }
            finally {
                outputStream.close();
            }
        };
    }

    private class SubscriptionOutputImpl
    implements SubscriptionOutput {
        private boolean headersSent;
        private final HttpServletResponse response;
        private final OutputStream out;

        SubscriptionOutputImpl(HttpServletResponse response, OutputStream out) {
            this.response = response;
            this.out = out;
            this.headersSent = false;
        }

        @Override
        public void onInitialized(String sessionId) throws IOException {
            if (!this.headersSent) {
                this.headersSent = true;
                this.response.setStatus(HttpStatus.OK.value());
                this.response.setContentType("application/x-json-stream");
                this.response.setHeader("X-Nakadi-SessionId", sessionId);
                this.out.flush();
            }
        }

        @Override
        public void onException(Exception ex) {
            block5: {
                LOG.warn("Exception occurred while streaming", (Throwable)ex);
                if (!this.headersSent) {
                    this.headersSent = true;
                    try {
                        if (ex instanceof NakadiException) {
                            this.writeProblemResponse(((NakadiException)ex).asProblem());
                            break block5;
                        }
                        this.writeProblemResponse((Problem)Problem.valueOf((Response.StatusType)Response.Status.SERVICE_UNAVAILABLE, (String)"Failed to continue streaming"));
                    }
                    catch (IOException e) {
                        LOG.error("Failed to write exception to response", (Throwable)e);
                    }
                } else {
                    LOG.warn("Exception found while streaming, but no data could be provided to client", (Throwable)ex);
                }
            }
        }

        void writeProblemResponse(Problem problem) throws IOException {
            this.response.setStatus(problem.getStatus().getStatusCode());
            this.response.setContentType("application/problem+json");
            SubscriptionStreamController.this.jsonMapper.writer().writeValue(this.out, (Object)problem);
        }

        @Override
        public void streamData(byte[] data) throws IOException {
            this.headersSent = true;
            this.out.write(data);
            this.out.flush();
        }
    }
}

