/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.server.coordinator.loading;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
import org.apache.druid.server.coordination.DataSegmentChangeCallback;
import org.apache.druid.server.coordination.DataSegmentChangeHandler;
import org.apache.druid.server.coordination.DataSegmentChangeRequest;
import org.apache.druid.server.coordination.SegmentLoadDropHandler;
import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
import org.apache.druid.server.coordinator.loading.LoadPeonCallback;
import org.apache.druid.server.coordinator.loading.LoadQueuePeon;
import org.apache.druid.server.coordinator.loading.SegmentAction;
import org.apache.druid.server.coordinator.loading.SegmentHolder;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.CoordinatorStat;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.Duration;

public class HttpLoadQueuePeon
implements LoadQueuePeon {
    public static final TypeReference<List<DataSegmentChangeRequest>> REQUEST_ENTITY_TYPE_REF = new TypeReference<List<DataSegmentChangeRequest>>(){};
    public static final TypeReference<List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus>> RESPONSE_ENTITY_TYPE_REF = new TypeReference<List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus>>(){};
    private static final EmittingLogger log = new EmittingLogger(HttpLoadQueuePeon.class);
    private final AtomicLong queuedSize = new AtomicLong(0L);
    private final CoordinatorRunStats stats = new CoordinatorRunStats();
    private final ConcurrentMap<DataSegment, SegmentHolder> segmentsToLoad = new ConcurrentHashMap<DataSegment, SegmentHolder>();
    private final ConcurrentMap<DataSegment, SegmentHolder> segmentsToDrop = new ConcurrentHashMap<DataSegment, SegmentHolder>();
    private final Set<DataSegment> segmentsMarkedToDrop = ConcurrentHashMap.newKeySet();
    private final Set<SegmentHolder> queuedSegments = new TreeSet<SegmentHolder>();
    private final Set<DataSegment> activeRequestSegments = new HashSet<DataSegment>();
    private final ScheduledExecutorService processingExecutor;
    private volatile boolean stopped = false;
    private final Object lock = new Object();
    private final DruidCoordinatorConfig config;
    private final ObjectMapper jsonMapper;
    private final HttpClient httpClient;
    private final URL changeRequestURL;
    private final String serverId;
    private final AtomicBoolean mainLoopInProgress = new AtomicBoolean(false);
    private final ExecutorService callBackExecutor;
    private final ObjectWriter requestBodyWriter;

    public HttpLoadQueuePeon(String baseUrl, ObjectMapper jsonMapper, HttpClient httpClient, DruidCoordinatorConfig config, ScheduledExecutorService processingExecutor, ExecutorService callBackExecutor) {
        this.jsonMapper = jsonMapper;
        this.requestBodyWriter = jsonMapper.writerWithType(REQUEST_ENTITY_TYPE_REF);
        this.httpClient = httpClient;
        this.config = config;
        this.processingExecutor = processingExecutor;
        this.callBackExecutor = callBackExecutor;
        this.serverId = baseUrl;
        try {
            this.changeRequestURL = new URL(new URL(baseUrl), StringUtils.nonStrictFormat((String)"druid-internal/v1/segments/changeRequests?timeout=%d", (Object[])new Object[]{config.getHttpLoadQueuePeonHostTimeout().getMillis()}));
        }
        catch (MalformedURLException ex) {
            throw new RuntimeException(ex);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doSegmentManagement() {
        if (this.stopped || !this.mainLoopInProgress.compareAndSet(false, true)) {
            log.trace("[%s]Ignoring tick. Either in-progress already or stopped.", new Object[]{this.serverId});
            return;
        }
        int batchSize = this.config.getHttpLoadQueuePeonBatchSize();
        ArrayList<DataSegmentChangeRequest> newRequests = new ArrayList<DataSegmentChangeRequest>(batchSize);
        Object object = this.lock;
        synchronized (object) {
            Iterator<SegmentHolder> queuedSegmentIterator = this.queuedSegments.iterator();
            long currentTimeMillis = System.currentTimeMillis();
            while (newRequests.size() < batchSize && queuedSegmentIterator.hasNext()) {
                SegmentHolder holder = queuedSegmentIterator.next();
                DataSegment segment = holder.getSegment();
                if (this.hasRequestTimedOut(holder, currentTimeMillis)) {
                    this.onRequestFailed(holder, "timed out");
                    queuedSegmentIterator.remove();
                    if (holder.isLoad()) {
                        this.segmentsToLoad.remove(segment);
                    } else {
                        this.segmentsToDrop.remove(segment);
                    }
                    this.activeRequestSegments.remove(segment);
                    continue;
                }
                newRequests.add(holder.getChangeRequest());
                holder.markRequestSentToServer();
                this.activeRequestSegments.add(segment);
            }
        }
        if (newRequests.size() == 0) {
            log.trace("[%s]Found no load/drop requests. SegmentsToLoad[%d], SegmentsToDrop[%d], batchSize[%d].", new Object[]{this.serverId, this.segmentsToLoad.size(), this.segmentsToDrop.size(), this.config.getHttpLoadQueuePeonBatchSize()});
            this.mainLoopInProgress.set(false);
            return;
        }
        try {
            log.trace("Sending [%d] load/drop requests to Server[%s].", new Object[]{newRequests.size(), this.serverId});
            final BytesAccumulatingResponseHandler responseHandler = new BytesAccumulatingResponseHandler();
            ListenableFuture future = this.httpClient.go(new Request(HttpMethod.POST, this.changeRequestURL).addHeader("Accept", "application/json").addHeader("Content-Type", "application/json").setContent(this.requestBodyWriter.writeValueAsBytes(newRequests)), (HttpResponseHandler)responseHandler, new Duration(this.config.getHttpLoadQueuePeonHostTimeout().getMillis() + 5000L));
            Futures.addCallback((ListenableFuture)future, (FutureCallback)new FutureCallback<InputStream>(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 * Enabled aggressive block sorting
                 * Enabled unnecessary exception pruning
                 * Enabled aggressive exception aggregation
                 */
                public void onSuccess(InputStream result) {
                    boolean scheduleNextRunImmediately = true;
                    try {
                        if (responseHandler.getStatus() == 204) {
                            log.trace("Received NO CONTENT reseponse from [%s]", new Object[]{HttpLoadQueuePeon.this.serverId});
                            return;
                        }
                        if (200 == responseHandler.getStatus()) {
                            try {
                                List statuses = (List)HttpLoadQueuePeon.this.jsonMapper.readValue(result, RESPONSE_ENTITY_TYPE_REF);
                                log.trace("Server[%s] returned status response [%s].", new Object[]{HttpLoadQueuePeon.this.serverId, statuses});
                                Object object = HttpLoadQueuePeon.this.lock;
                                synchronized (object) {
                                    if (HttpLoadQueuePeon.this.stopped) {
                                        log.trace("Ignoring response from Server[%s]. We are already stopped.", new Object[]{HttpLoadQueuePeon.this.serverId});
                                        scheduleNextRunImmediately = false;
                                        return;
                                    }
                                    Iterator iterator = statuses.iterator();
                                    block14: while (iterator.hasNext()) {
                                        SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus e = (SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus)iterator.next();
                                        switch (e.getStatus().getState()) {
                                            case SUCCESS: 
                                            case FAILED: {
                                                HttpLoadQueuePeon.this.handleResponseStatus(e.getRequest(), e.getStatus());
                                                continue block14;
                                            }
                                            case PENDING: {
                                                log.trace("Request[%s] is still pending on server[%s].", new Object[]{e.getRequest(), HttpLoadQueuePeon.this.serverId});
                                                continue block14;
                                            }
                                        }
                                        scheduleNextRunImmediately = false;
                                        log.error("Server[%s] returned unknown state in status[%s].", new Object[]{HttpLoadQueuePeon.this.serverId, e.getStatus()});
                                    }
                                    return;
                                }
                            }
                            catch (Exception ex) {
                                scheduleNextRunImmediately = false;
                                this.logRequestFailure(ex);
                                return;
                            }
                        }
                        scheduleNextRunImmediately = false;
                        this.logRequestFailure((Throwable)new RE("Unexpected Response Status.", new Object[0]));
                        return;
                    }
                    finally {
                        HttpLoadQueuePeon.this.mainLoopInProgress.set(false);
                        if (scheduleNextRunImmediately) {
                            HttpLoadQueuePeon.this.processingExecutor.execute(() -> HttpLoadQueuePeon.this.doSegmentManagement());
                        }
                    }
                }

                public void onFailure(Throwable t) {
                    try {
                        this.logRequestFailure(t);
                    }
                    finally {
                        HttpLoadQueuePeon.this.mainLoopInProgress.set(false);
                    }
                }

                private void logRequestFailure(Throwable t) {
                    log.error(t, "Request[%s] Failed with status[%s]. Reason[%s].", new Object[]{HttpLoadQueuePeon.this.changeRequestURL, responseHandler.getStatus(), responseHandler.getDescription()});
                }
            }, (Executor)this.processingExecutor);
        }
        catch (Throwable th) {
            log.error(th, "Error sending load/drop request to [%s].", new Object[]{this.serverId});
            this.mainLoopInProgress.set(false);
        }
    }

    private void handleResponseStatus(DataSegmentChangeRequest changeRequest, final SegmentLoadDropHandler.Status status) {
        changeRequest.go(new DataSegmentChangeHandler(){

            @Override
            public void addSegment(DataSegment segment, DataSegmentChangeCallback callback) {
                this.updateSuccessOrFailureInHolder((SegmentHolder)HttpLoadQueuePeon.this.segmentsToLoad.remove(segment), status);
            }

            @Override
            public void removeSegment(DataSegment segment, DataSegmentChangeCallback callback) {
                this.updateSuccessOrFailureInHolder((SegmentHolder)HttpLoadQueuePeon.this.segmentsToDrop.remove(segment), status);
            }

            private void updateSuccessOrFailureInHolder(SegmentHolder holder, SegmentLoadDropHandler.Status status2) {
                if (holder == null) {
                    return;
                }
                HttpLoadQueuePeon.this.queuedSegments.remove(holder);
                HttpLoadQueuePeon.this.activeRequestSegments.remove(holder.getSegment());
                if (status2.getState() == SegmentLoadDropHandler.Status.STATE.FAILED) {
                    HttpLoadQueuePeon.this.onRequestFailed(holder, status2.getFailureCause());
                } else {
                    HttpLoadQueuePeon.this.onRequestCompleted(holder, RequestStatus.SUCCESS);
                }
            }
        }, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void start() {
        Object object = this.lock;
        synchronized (object) {
            if (this.stopped) {
                throw new ISE("Can't start.", new Object[0]);
            }
            ScheduledExecutors.scheduleAtFixedRate((ScheduledExecutorService)this.processingExecutor, (Duration)this.config.getHttpLoadQueuePeonRepeatDelay(), () -> {
                if (!this.stopped) {
                    this.doSegmentManagement();
                }
                if (this.stopped) {
                    return ScheduledExecutors.Signal.STOP;
                }
                return ScheduledExecutors.Signal.REPEAT;
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void stop() {
        Object object = this.lock;
        synchronized (object) {
            if (this.stopped) {
                return;
            }
            log.info("Stopping load queue peon for server [%s].", new Object[]{this.serverId});
            this.stopped = true;
            this.queuedSegments.forEach(holder -> this.onRequestCompleted((SegmentHolder)holder, RequestStatus.CANCELLED));
            log.info("Cancelled [%d] requests queued on server [%s].", new Object[]{this.queuedSegments.size(), this.serverId});
            this.segmentsToDrop.clear();
            this.segmentsToLoad.clear();
            this.queuedSegments.clear();
            this.activeRequestSegments.clear();
            this.queuedSize.set(0L);
            this.stats.clear();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void loadSegment(DataSegment segment, SegmentAction action, LoadPeonCallback callback) {
        if (!action.isLoad()) {
            log.warn("Invalid load action [%s] for segment [%s] on server [%s].", new Object[]{action, segment.getId(), this.serverId});
            return;
        }
        Object object = this.lock;
        synchronized (object) {
            if (this.stopped) {
                log.warn("Server[%s] cannot load segment[%s] because load queue peon is stopped.", new Object[]{this.serverId, segment.getId()});
                if (callback != null) {
                    callback.execute(false);
                }
                return;
            }
            SegmentHolder holder = (SegmentHolder)this.segmentsToLoad.get(segment);
            if (holder == null) {
                log.trace("Server[%s] to load segment[%s] queued.", new Object[]{this.serverId, segment.getId()});
                this.queuedSize.addAndGet(segment.getSize());
                holder = new SegmentHolder(segment, action, callback);
                this.segmentsToLoad.put(segment, holder);
                this.queuedSegments.add(holder);
                this.processingExecutor.execute(this::doSegmentManagement);
                this.incrementStat(holder, RequestStatus.ASSIGNED);
            } else {
                holder.addCallback(callback);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void dropSegment(DataSegment segment, LoadPeonCallback callback) {
        Object object = this.lock;
        synchronized (object) {
            if (this.stopped) {
                log.warn("Server[%s] cannot drop segment[%s] because load queue peon is stopped.", new Object[]{this.serverId, segment.getId()});
                if (callback != null) {
                    callback.execute(false);
                }
                return;
            }
            SegmentHolder holder = (SegmentHolder)this.segmentsToDrop.get(segment);
            if (holder == null) {
                log.trace("Server[%s] to drop segment[%s] queued.", new Object[]{this.serverId, segment.getId()});
                holder = new SegmentHolder(segment, SegmentAction.DROP, callback);
                this.segmentsToDrop.put(segment, holder);
                this.queuedSegments.add(holder);
                this.processingExecutor.execute(this::doSegmentManagement);
                this.incrementStat(holder, RequestStatus.ASSIGNED);
            } else {
                holder.addCallback(callback);
            }
        }
    }

    @Override
    public Set<DataSegment> getSegmentsToLoad() {
        return Collections.unmodifiableSet(this.segmentsToLoad.keySet());
    }

    @Override
    public Set<DataSegment> getSegmentsToDrop() {
        return Collections.unmodifiableSet(this.segmentsToDrop.keySet());
    }

    @Override
    public Set<DataSegment> getTimedOutSegments() {
        return Collections.emptySet();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Set<SegmentHolder> getSegmentsInQueue() {
        HashSet<SegmentHolder> segmentsInQueue;
        Object object = this.lock;
        synchronized (object) {
            segmentsInQueue = new HashSet<SegmentHolder>(this.queuedSegments);
        }
        return segmentsInQueue;
    }

    @Override
    public long getSizeOfSegmentsToLoad() {
        return this.queuedSize.get();
    }

    @Override
    public CoordinatorRunStats getAndResetStats() {
        return this.stats.getSnapshotAndReset();
    }

    @Override
    public void markSegmentToDrop(DataSegment dataSegment) {
        this.segmentsMarkedToDrop.add(dataSegment);
    }

    @Override
    public void unmarkSegmentToDrop(DataSegment dataSegment) {
        this.segmentsMarkedToDrop.remove(dataSegment);
    }

    @Override
    public Set<DataSegment> getSegmentsMarkedToDrop() {
        return Collections.unmodifiableSet(this.segmentsMarkedToDrop);
    }

    private boolean hasRequestTimedOut(SegmentHolder holder, long currentTimeMillis) {
        return holder.isRequestSentToServer() && currentTimeMillis - holder.getFirstRequestMillis() > this.config.getLoadTimeoutDelay().getMillis();
    }

    private void onRequestFailed(SegmentHolder holder, String failureCause) {
        log.error("Server[%s] failed segment[%s] request[%s] with cause [%s].", new Object[]{this.serverId, holder.getSegment().getId(), holder.getAction(), failureCause});
        this.onRequestCompleted(holder, RequestStatus.FAILED);
    }

    private void onRequestCompleted(SegmentHolder holder, RequestStatus status) {
        SegmentAction action = holder.getAction();
        log.trace("Server[%s] completed request[%s] on segment[%s] with status[%s].", new Object[]{this.serverId, action, holder.getSegment().getId(), status});
        if (holder.isLoad()) {
            this.queuedSize.addAndGet(-holder.getSegment().getSize());
        }
        this.incrementStat(holder, status);
        this.executeCallbacks(holder, status == RequestStatus.SUCCESS);
    }

    private void incrementStat(SegmentHolder holder, RequestStatus status) {
        RowKey rowKey = RowKey.with(Dimension.DATASOURCE, holder.getSegment().getDataSource()).and(Dimension.DESCRIPTION, holder.getAction().name());
        this.stats.add(status.datasourceStat, rowKey, 1L);
    }

    private void executeCallbacks(SegmentHolder holder, boolean success) {
        this.callBackExecutor.execute(() -> {
            for (LoadPeonCallback callback : holder.getCallbacks()) {
                callback.execute(success);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean cancelOperation(DataSegment segment) {
        Object object = this.lock;
        synchronized (object) {
            SegmentHolder holder;
            if (this.activeRequestSegments.contains(segment)) {
                return false;
            }
            SegmentHolder segmentHolder = holder = this.segmentsToLoad.containsKey(segment) ? (SegmentHolder)this.segmentsToLoad.remove(segment) : (SegmentHolder)this.segmentsToDrop.remove(segment);
            if (holder == null) {
                return false;
            }
            this.queuedSegments.remove(holder);
            this.onRequestCompleted(holder, RequestStatus.CANCELLED);
            return true;
        }
    }

    private static enum RequestStatus {
        ASSIGNED(Stats.SegmentQueue.ASSIGNED_ACTIONS),
        SUCCESS(Stats.SegmentQueue.COMPLETED_ACTIONS),
        FAILED(Stats.SegmentQueue.FAILED_ACTIONS),
        CANCELLED(Stats.SegmentQueue.CANCELLED_ACTIONS);

        final CoordinatorStat datasourceStat;

        private RequestStatus(CoordinatorStat datasourceStat) {
            this.datasourceStat = datasourceStat;
        }
    }
}

