/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.segment.realtime.firehose;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.CountingInputStream;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import org.apache.druid.concurrent.Threads;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.server.metrics.EventReceiverFirehoseMetric;
import org.apache.druid.server.metrics.EventReceiverFirehoseRegister;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizationUtils;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.joda.time.DateTime;

@Deprecated
public class EventReceiverFirehoseFactory
implements FirehoseFactory<InputRowParser<Map<String, Object>>> {
    private static final EmittingLogger log = new EmittingLogger(EventReceiverFirehoseFactory.class);
    public static final int MAX_FIREHOSE_PRODUCERS = 10000;
    private static final int DEFAULT_BUFFER_SIZE = 100000;
    private static final Object FIREHOSE_CLOSED = new Object();
    private final String serviceName;
    private final int bufferSize;
    private final long maxIdleTimeMillis;
    private final ChatHandlerProvider chatHandlerProvider;
    private final ObjectMapper jsonMapper;
    private final ObjectMapper smileMapper;
    private final EventReceiverFirehoseRegister eventReceiverFirehoseRegister;
    private final AuthorizerMapper authorizerMapper;

    @JsonCreator
    public EventReceiverFirehoseFactory(@JsonProperty(value="serviceName") String serviceName, @JsonProperty(value="bufferSize") Integer bufferSize, @JsonProperty(value="maxIdleTime") @Nullable Long maxIdleTimeMillis, @JacksonInject ChatHandlerProvider chatHandlerProvider, @JacksonInject @Json ObjectMapper jsonMapper, @JacksonInject @Smile ObjectMapper smileMapper, @JacksonInject EventReceiverFirehoseRegister eventReceiverFirehoseRegister, @JacksonInject AuthorizerMapper authorizerMapper) {
        Preconditions.checkNotNull((Object)serviceName, (Object)"serviceName");
        this.serviceName = serviceName;
        this.bufferSize = bufferSize == null || bufferSize <= 0 ? 100000 : bufferSize;
        this.maxIdleTimeMillis = maxIdleTimeMillis == null || maxIdleTimeMillis <= 0L ? Long.MAX_VALUE : maxIdleTimeMillis;
        this.chatHandlerProvider = chatHandlerProvider;
        this.jsonMapper = jsonMapper;
        this.smileMapper = smileMapper;
        this.eventReceiverFirehoseRegister = eventReceiverFirehoseRegister;
        this.authorizerMapper = authorizerMapper;
    }

    public Firehose connect(InputRowParser<Map<String, Object>> firehoseParser, File temporaryDirectory) {
        log.info("Connecting firehose: %s", new Object[]{this.serviceName});
        EventReceiverFirehose firehose = new EventReceiverFirehose(firehoseParser);
        if (this.chatHandlerProvider != null) {
            log.info("Found chathandler of class[%s]", new Object[]{this.chatHandlerProvider.getClass().getName()});
            this.chatHandlerProvider.register(this.serviceName, firehose);
            int lastIndexOfColon = this.serviceName.lastIndexOf(58);
            if (lastIndexOfColon > 0) {
                this.chatHandlerProvider.register(this.serviceName.substring(lastIndexOfColon + 1), firehose);
            }
        } else {
            log.warn("No chathandler detected", new Object[0]);
        }
        this.eventReceiverFirehoseRegister.register(this.serviceName, firehose);
        return firehose;
    }

    @JsonProperty
    public String getServiceName() {
        return this.serviceName;
    }

    @JsonProperty
    public int getBufferSize() {
        return this.bufferSize;
    }

    @JsonProperty(value="maxIdleTime")
    public long getMaxIdleTimeMillis() {
        return this.maxIdleTimeMillis;
    }

    @VisibleForTesting
    public class EventReceiverFirehose
    implements ChatHandler,
    Firehose,
    EventReceiverFirehoseMetric {
        @Nullable
        @GuardedBy(value="this")
        private Thread delayedCloseExecutor;
        private final BlockingQueue<Object> buffer;
        private final InputRowParser<Map<String, Object>> parser;
        private volatile boolean closed = false;
        @Nullable
        private InputRow nextRow = null;
        private boolean rowsRunOut = false;
        private final AtomicLong bytesReceived = new AtomicLong(0L);
        private final AtomicLong lastBufferAddFailLoggingTimeNs = new AtomicLong(System.nanoTime());
        private final ConcurrentHashMap<String, Long> producerSequences = new ConcurrentHashMap();
        @Nullable
        private volatile Long idleCloseTimeNs = null;
        @Nullable
        private volatile Long requestedShutdownTimeNs = null;

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        EventReceiverFirehose(InputRowParser<Map<String, Object>> parser) {
            this.buffer = new ArrayBlockingQueue<Object>(EventReceiverFirehoseFactory.this.bufferSize);
            this.parser = parser;
            if (EventReceiverFirehoseFactory.this.maxIdleTimeMillis != Long.MAX_VALUE) {
                this.idleCloseTimeNs = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(EventReceiverFirehoseFactory.this.maxIdleTimeMillis);
                EventReceiverFirehose eventReceiverFirehose = this;
                synchronized (eventReceiverFirehose) {
                    this.createDelayedCloseExecutor();
                }
            }
        }

        @Nullable
        @VisibleForTesting
        synchronized Thread getDelayedCloseExecutor() {
            return this.delayedCloseExecutor;
        }

        @GuardedBy(value="this")
        private Thread createDelayedCloseExecutor() {
            Thread delayedCloseExecutor = new Thread(() -> {
                while (!this.closed) {
                    if (this.idleCloseTimeNs == null && this.requestedShutdownTimeNs == null) {
                        log.error("Either idleCloseTimeNs or requestedShutdownTimeNs must be non-null. Please file a bug at https://github.com/apache/druid/issues", new Object[0]);
                    }
                    if (this.idleCloseTimeNs != null && this.idleCloseTimeNs - System.nanoTime() <= 0L) {
                        log.info("Firehose has been idle for %d ms, closing.", new Object[]{EventReceiverFirehoseFactory.this.maxIdleTimeMillis});
                        this.close();
                    } else if (this.requestedShutdownTimeNs != null && this.requestedShutdownTimeNs - System.nanoTime() <= 0L) {
                        log.info("Closing Firehose after a shutdown request", new Object[0]);
                        this.close();
                    }
                    try {
                        Threads.sleepFor((long)1L, (TimeUnit)TimeUnit.SECONDS);
                    }
                    catch (InterruptedException interruptedException) {}
                }
            }, "event-receiver-firehose-closer");
            delayedCloseExecutor.setDaemon(true);
            this.delayedCloseExecutor = delayedCloseExecutor;
            delayedCloseExecutor.start();
            return delayedCloseExecutor;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @POST
        @Path(value="/push-events")
        @Consumes(value={"application/json", "application/x-jackson-smile"})
        @Produces(value={"application/json", "application/x-jackson-smile"})
        public Response addAll(InputStream in, @Context HttpServletRequest req) throws JsonProcessingException {
            Collection events;
            this.idleCloseTimeNs = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(EventReceiverFirehoseFactory.this.maxIdleTimeMillis);
            Access accessResult = AuthorizationUtils.authorizeResourceAction(req, new ResourceAction(Resource.STATE_RESOURCE, Action.WRITE), EventReceiverFirehoseFactory.this.authorizerMapper);
            if (!accessResult.isAllowed()) {
                return Response.status((int)403).build();
            }
            String reqContentType = req.getContentType();
            boolean isSmile = "application/x-jackson-smile".equals(reqContentType);
            String contentType = isSmile ? "application/x-jackson-smile" : "application/json";
            ObjectMapper objectMapper = isSmile ? EventReceiverFirehoseFactory.this.smileMapper : EventReceiverFirehoseFactory.this.jsonMapper;
            Response producerSequenceResponse = this.checkProducerSequence(req, reqContentType, objectMapper);
            if (producerSequenceResponse != null) {
                return producerSequenceResponse;
            }
            CountingInputStream countingInputStream = new CountingInputStream(in);
            try {
                events = (Collection)objectMapper.readValue((InputStream)countingInputStream, (TypeReference)new TypeReference<Collection<Map<String, Object>>>(){});
            }
            catch (IOException e) {
                Response response = Response.serverError().entity((Object)ImmutableMap.of((Object)"error", (Object)e.getMessage())).build();
                return response;
            }
            finally {
                this.bytesReceived.addAndGet(countingInputStream.getCount());
            }
            log.debug("Adding %,d events to firehose: %s", new Object[]{events.size(), EventReceiverFirehoseFactory.this.serviceName});
            ArrayList<InputRow> rows = new ArrayList<InputRow>();
            for (Map event : events) {
                rows.addAll(this.parser.parseBatch((Object)event));
            }
            try {
                this.addRows(rows);
                return Response.ok((Object)objectMapper.writeValueAsString((Object)ImmutableMap.of((Object)"eventCount", (Object)events.size())), (String)contentType).build();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }

        public boolean hasMore() {
            Object next;
            if (this.rowsRunOut) {
                return false;
            }
            if (this.nextRow != null) {
                return true;
            }
            try {
                next = this.buffer.take();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            if (next == FIREHOSE_CLOSED) {
                this.rowsRunOut = true;
                return false;
            }
            this.nextRow = (InputRow)next;
            return true;
        }

        @Nullable
        public InputRow nextRow() {
            InputRow row = this.nextRow;
            if (row == null) {
                throw new NoSuchElementException();
            }
            this.nextRow = null;
            return row;
        }

        @Override
        public int getCurrentBufferSize() {
            return this.buffer.size();
        }

        @Override
        public int getCapacity() {
            return EventReceiverFirehoseFactory.this.bufferSize;
        }

        @Override
        public long getBytesReceived() {
            return this.bytesReceived.get();
        }

        public synchronized void close() {
            if (this.closed) {
                return;
            }
            this.closed = true;
            log.info("Firehose closing.", new Object[0]);
            Uninterruptibles.putUninterruptibly(this.buffer, (Object)FIREHOSE_CLOSED);
            EventReceiverFirehoseFactory.this.eventReceiverFirehoseRegister.unregister(EventReceiverFirehoseFactory.this.serviceName);
            if (EventReceiverFirehoseFactory.this.chatHandlerProvider != null) {
                EventReceiverFirehoseFactory.this.chatHandlerProvider.unregister(EventReceiverFirehoseFactory.this.serviceName);
            }
            if (this.delayedCloseExecutor != null && !this.delayedCloseExecutor.equals(Thread.currentThread())) {
                this.delayedCloseExecutor.interrupt();
            }
        }

        @VisibleForTesting
        void addRows(Iterable<InputRow> rows) throws InterruptedException {
            for (InputRow row : rows) {
                boolean added = false;
                while (!this.closed && !added) {
                    long lastTimeNs;
                    long currTimeNs;
                    added = this.buffer.offer(row, 500L, TimeUnit.MILLISECONDS);
                    if (added || (currTimeNs = System.nanoTime()) - (lastTimeNs = this.lastBufferAddFailLoggingTimeNs.get()) <= TimeUnit.SECONDS.toNanos(10L) || !this.lastBufferAddFailLoggingTimeNs.compareAndSet(lastTimeNs, currTimeNs)) continue;
                    log.warn("Failed to add event to buffer with current size [%s] . Retrying...", new Object[]{this.buffer.size()});
                }
                if (added) continue;
                throw new IllegalStateException("Cannot add events to closed firehose!");
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @POST
        @Path(value="/shutdown")
        @Consumes(value={"application/json", "application/x-jackson-smile"})
        @Produces(value={"application/json", "application/x-jackson-smile"})
        public Response shutdown(@QueryParam(value="shutoffTime") String shutoffTimeMillis, @Context HttpServletRequest req) {
            Access accessResult = AuthorizationUtils.authorizeResourceAction(req, new ResourceAction(Resource.STATE_RESOURCE, Action.WRITE), EventReceiverFirehoseFactory.this.authorizerMapper);
            if (!accessResult.isAllowed()) {
                return Response.status((int)403).build();
            }
            try {
                Thread delayedCloseExecutor;
                DateTime shutoffAt = shutoffTimeMillis == null ? DateTimes.nowUtc() : DateTimes.of((String)shutoffTimeMillis);
                log.info("Setting Firehose shutoffTime to %s", new Object[]{shutoffTimeMillis});
                long shutoffTimeoutMillis = Math.max(shutoffAt.getMillis() - System.currentTimeMillis(), 0L);
                this.requestedShutdownTimeNs = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(shutoffTimeoutMillis);
                boolean needToInterruptDelayedCloseExecutor = true;
                EventReceiverFirehose eventReceiverFirehose = this;
                synchronized (eventReceiverFirehose) {
                    delayedCloseExecutor = this.delayedCloseExecutor;
                    if (delayedCloseExecutor == null) {
                        delayedCloseExecutor = this.createDelayedCloseExecutor();
                        needToInterruptDelayedCloseExecutor = false;
                    }
                }
                if (needToInterruptDelayedCloseExecutor) {
                    delayedCloseExecutor.interrupt();
                }
                return Response.ok().build();
            }
            catch (IllegalArgumentException e) {
                return Response.status((Response.Status)Response.Status.BAD_REQUEST).entity((Object)ImmutableMap.of((Object)"error", (Object)e.getMessage())).build();
            }
        }

        @VisibleForTesting
        boolean isClosed() {
            return this.closed;
        }

        @Nullable
        private Response checkProducerSequence(HttpServletRequest req, String responseContentType, ObjectMapper responseMapper) {
            String producerId = req.getHeader("X-Firehose-Producer-Id");
            if (producerId == null) {
                return null;
            }
            String sequenceValue = req.getHeader("X-Firehose-Producer-Seq");
            if (sequenceValue == null) {
                return Response.status((Response.Status)Response.Status.BAD_REQUEST).entity((Object)ImmutableMap.of((Object)"error", (Object)"Producer sequence value is missing")).build();
            }
            Long producerSequence = this.producerSequences.computeIfAbsent(producerId, key -> Long.MIN_VALUE);
            if (this.producerSequences.size() >= 10000) {
                return Response.status((Response.Status)Response.Status.FORBIDDEN).entity((Object)ImmutableMap.of((Object)"error", (Object)"Too many individual producer IDs for this firehose.  Max is 10000")).build();
            }
            try {
                Long newSequence = Long.parseLong(sequenceValue);
                while (true) {
                    if (newSequence <= producerSequence) {
                        return Response.ok((Object)responseMapper.writeValueAsString((Object)ImmutableMap.of((Object)"eventCount", (Object)0, (Object)"skipped", (Object)true)), (String)responseContentType).build();
                    }
                    if (this.producerSequences.replace(producerId, producerSequence, newSequence)) {
                        return null;
                    }
                    producerSequence = this.producerSequences.get(producerId);
                }
            }
            catch (JsonProcessingException ex) {
                throw new RuntimeException(ex);
            }
            catch (NumberFormatException ex) {
                return Response.status((Response.Status)Response.Status.BAD_REQUEST).entity((Object)ImmutableMap.of((Object)"error", (Object)"Producer sequence must be a number")).build();
            }
        }
    }
}

