/*
 * Decompiled with CFR 0.152.
 */
package io.druid.indexing.kafka;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.indexer.TaskLocation;
import io.druid.indexing.common.RetryPolicy;
import io.druid.indexing.common.RetryPolicyConfig;
import io.druid.indexing.common.RetryPolicyFactory;
import io.druid.indexing.common.TaskInfoProvider;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.kafka.KafkaIndexTask;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.IOE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.java.util.http.client.HttpClient;
import io.druid.java.util.http.client.Request;
import io.druid.java.util.http.client.response.FullResponseHandler;
import io.druid.java.util.http.client.response.FullResponseHolder;
import io.druid.java.util.http.client.response.HttpResponseHandler;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Period;

public class KafkaIndexTaskClient {
    public static final int MAX_RETRY_WAIT_SECONDS = 10;
    private static final int MIN_RETRY_WAIT_SECONDS = 2;
    private static final EmittingLogger log = new EmittingLogger(KafkaIndexTaskClient.class);
    private static final String BASE_PATH = "/druid/worker/v1/chat";
    private static final int TASK_MISMATCH_RETRY_DELAY_SECONDS = 5;
    private static final TreeMap EMPTY_TREE_MAP = new TreeMap();
    private final HttpClient httpClient;
    private final ObjectMapper jsonMapper;
    private final TaskInfoProvider taskInfoProvider;
    private final Duration httpTimeout;
    private final RetryPolicyFactory retryPolicyFactory;
    private final ListeningExecutorService executorService;
    private final long numRetries;

    public KafkaIndexTaskClient(HttpClient httpClient, ObjectMapper jsonMapper, TaskInfoProvider taskInfoProvider, String dataSource, int numThreads, Duration httpTimeout, long numRetries) {
        this.httpClient = httpClient;
        this.jsonMapper = jsonMapper;
        this.taskInfoProvider = taskInfoProvider;
        this.httpTimeout = httpTimeout;
        this.numRetries = numRetries;
        this.retryPolicyFactory = this.createRetryPolicyFactory();
        this.executorService = MoreExecutors.listeningDecorator((ExecutorService)Execs.multiThreaded((int)numThreads, (String)StringUtils.format((String)"KafkaIndexTaskClient-%s-%%d", (Object[])new Object[]{dataSource})));
    }

    public void close() {
        this.executorService.shutdownNow();
    }

    public boolean stop(String id, boolean publish) {
        log.debug("Stop task[%s] publish[%s]", new Object[]{id, publish});
        try {
            FullResponseHolder response = this.submitRequest(id, HttpMethod.POST, "stop", publish ? "publish=true" : null, true);
            return response.getStatus().getCode() / 100 == 2;
        }
        catch (NoTaskLocationException e) {
            return false;
        }
        catch (TaskNotRunnableException e) {
            log.info("Task [%s] couldn't be stopped because it is no longer running", new Object[]{id});
            return true;
        }
        catch (Exception e) {
            log.warn((Throwable)e, "Exception while stopping task [%s]", new Object[]{id});
            return false;
        }
    }

    public boolean resume(String id) {
        log.debug("Resume task[%s]", new Object[]{id});
        try {
            FullResponseHolder response = this.submitRequest(id, HttpMethod.POST, "resume", null, true);
            return response.getStatus().getCode() / 100 == 2;
        }
        catch (NoTaskLocationException e) {
            return false;
        }
    }

    public Map<Integer, Long> pause(String id) {
        return this.pause(id, 0L);
    }

    public Map<Integer, Long> pause(String id, long timeout) {
        log.debug("Pause task[%s] timeout[%d]", new Object[]{id, timeout});
        try {
            FullResponseHolder response = this.submitRequest(id, HttpMethod.POST, "pause", timeout > 0L ? StringUtils.format((String)"timeout=%d", (Object[])new Object[]{timeout}) : null, true);
            if (response.getStatus().equals((Object)HttpResponseStatus.OK)) {
                log.info("Task [%s] paused successfully", new Object[]{id});
                return (Map)this.jsonMapper.readValue(response.getContent(), (TypeReference)new TypeReference<Map<Integer, Long>>(){});
            }
            RetryPolicy retryPolicy = this.retryPolicyFactory.makeRetryPolicy();
            while (true) {
                if (this.getStatus(id) == KafkaIndexTask.Status.PAUSED) {
                    return this.getCurrentOffsets(id, true);
                }
                Duration delay = retryPolicy.getAndIncrementRetryDelay();
                if (delay == null) {
                    log.error("Task [%s] failed to pause, aborting", new Object[]{id});
                    throw new ISE("Task [%s] failed to pause, aborting", new Object[]{id});
                }
                long sleepTime = delay.getMillis();
                log.info("Still waiting for task [%s] to pause; will try again in [%s]", new Object[]{id, new Duration(sleepTime).toString()});
                Thread.sleep(sleepTime);
            }
        }
        catch (NoTaskLocationException e) {
            log.error("Exception [%s] while pausing Task [%s]", new Object[]{e.getMessage(), id});
            return ImmutableMap.of();
        }
        catch (IOException | InterruptedException e) {
            log.error("Exception [%s] while pausing Task [%s]", new Object[]{e.getMessage(), id});
            throw Throwables.propagate((Throwable)e);
        }
    }

    public KafkaIndexTask.Status getStatus(String id) {
        log.debug("GetStatus task[%s]", new Object[]{id});
        try {
            FullResponseHolder response = this.submitRequest(id, HttpMethod.GET, "status", null, true);
            return (KafkaIndexTask.Status)((Object)this.jsonMapper.readValue(response.getContent(), KafkaIndexTask.Status.class));
        }
        catch (NoTaskLocationException e) {
            return KafkaIndexTask.Status.NOT_STARTED;
        }
        catch (IOException e) {
            throw Throwables.propagate((Throwable)e);
        }
    }

    public DateTime getStartTime(String id) {
        log.debug("GetStartTime task[%s]", new Object[]{id});
        try {
            FullResponseHolder response = this.submitRequest(id, HttpMethod.GET, "time/start", null, true);
            return response.getContent() == null || response.getContent().isEmpty() ? null : (DateTime)this.jsonMapper.readValue(response.getContent(), DateTime.class);
        }
        catch (NoTaskLocationException e) {
            return null;
        }
        catch (IOException e) {
            throw Throwables.propagate((Throwable)e);
        }
    }

    public Map<Integer, Long> getCurrentOffsets(String id, boolean retry) {
        log.debug("GetCurrentOffsets task[%s] retry[%s]", new Object[]{id, retry});
        try {
            FullResponseHolder response = this.submitRequest(id, HttpMethod.GET, "offsets/current", null, retry);
            return (Map)this.jsonMapper.readValue(response.getContent(), (TypeReference)new TypeReference<Map<Integer, Long>>(){});
        }
        catch (NoTaskLocationException e) {
            return ImmutableMap.of();
        }
        catch (IOException e) {
            throw Throwables.propagate((Throwable)e);
        }
    }

    public TreeMap<Integer, Map<Integer, Long>> getCheckpoints(String id, boolean retry) {
        log.debug("GetCheckpoints task[%s] retry[%s]", new Object[]{id, retry});
        try {
            FullResponseHolder response = this.submitRequest(id, HttpMethod.GET, "checkpoints", null, retry);
            return (TreeMap)this.jsonMapper.readValue(response.getContent(), (TypeReference)new TypeReference<TreeMap<Integer, TreeMap<Integer, Long>>>(){});
        }
        catch (NoTaskLocationException e) {
            return EMPTY_TREE_MAP;
        }
        catch (IOException e) {
            throw Throwables.propagate((Throwable)e);
        }
    }

    public ListenableFuture<TreeMap<Integer, Map<Integer, Long>>> getCheckpointsAsync(String id, boolean retry) {
        return this.executorService.submit(() -> this.getCheckpoints(id, retry));
    }

    public Map<Integer, Long> getEndOffsets(String id) {
        log.debug("GetEndOffsets task[%s]", new Object[]{id});
        try {
            FullResponseHolder response = this.submitRequest(id, HttpMethod.GET, "offsets/end", null, true);
            return (Map)this.jsonMapper.readValue(response.getContent(), (TypeReference)new TypeReference<Map<Integer, Long>>(){});
        }
        catch (NoTaskLocationException e) {
            return ImmutableMap.of();
        }
        catch (IOException e) {
            throw Throwables.propagate((Throwable)e);
        }
    }

    public boolean setEndOffsets(String id, Map<Integer, Long> endOffsets, boolean resume, boolean finalize) {
        log.debug("SetEndOffsets task[%s] endOffsets[%s] resume[%s] finalize[%s]", new Object[]{id, endOffsets, resume, finalize});
        try {
            FullResponseHolder response = this.submitRequest(id, HttpMethod.POST, "offsets/end", StringUtils.format((String)"resume=%s&finish=%s", (Object[])new Object[]{resume, finalize}), this.jsonMapper.writeValueAsBytes(endOffsets), true);
            return response.getStatus().getCode() / 100 == 2;
        }
        catch (NoTaskLocationException e) {
            return false;
        }
        catch (IOException e) {
            throw Throwables.propagate((Throwable)e);
        }
    }

    public ListenableFuture<Boolean> stopAsync(final String id, final boolean publish) {
        return this.executorService.submit((Callable)new Callable<Boolean>(){

            @Override
            public Boolean call() throws Exception {
                return KafkaIndexTaskClient.this.stop(id, publish);
            }
        });
    }

    public ListenableFuture<Boolean> resumeAsync(final String id) {
        return this.executorService.submit((Callable)new Callable<Boolean>(){

            @Override
            public Boolean call() throws Exception {
                return KafkaIndexTaskClient.this.resume(id);
            }
        });
    }

    public ListenableFuture<Map<Integer, Long>> pauseAsync(String id) {
        return this.pauseAsync(id, 0L);
    }

    public ListenableFuture<Map<Integer, Long>> pauseAsync(final String id, final long timeout) {
        return this.executorService.submit((Callable)new Callable<Map<Integer, Long>>(){

            @Override
            public Map<Integer, Long> call() throws Exception {
                return KafkaIndexTaskClient.this.pause(id, timeout);
            }
        });
    }

    public ListenableFuture<KafkaIndexTask.Status> getStatusAsync(final String id) {
        return this.executorService.submit((Callable)new Callable<KafkaIndexTask.Status>(){

            @Override
            public KafkaIndexTask.Status call() throws Exception {
                return KafkaIndexTaskClient.this.getStatus(id);
            }
        });
    }

    public ListenableFuture<DateTime> getStartTimeAsync(final String id) {
        return this.executorService.submit((Callable)new Callable<DateTime>(){

            @Override
            public DateTime call() throws Exception {
                return KafkaIndexTaskClient.this.getStartTime(id);
            }
        });
    }

    public ListenableFuture<Map<Integer, Long>> getCurrentOffsetsAsync(final String id, final boolean retry) {
        return this.executorService.submit((Callable)new Callable<Map<Integer, Long>>(){

            @Override
            public Map<Integer, Long> call() throws Exception {
                return KafkaIndexTaskClient.this.getCurrentOffsets(id, retry);
            }
        });
    }

    public ListenableFuture<Map<Integer, Long>> getEndOffsetsAsync(final String id) {
        return this.executorService.submit((Callable)new Callable<Map<Integer, Long>>(){

            @Override
            public Map<Integer, Long> call() throws Exception {
                return KafkaIndexTaskClient.this.getEndOffsets(id);
            }
        });
    }

    public ListenableFuture<Boolean> setEndOffsetsAsync(final String id, final Map<Integer, Long> endOffsets, final boolean resume, final boolean finalize) {
        return this.executorService.submit((Callable)new Callable<Boolean>(){

            @Override
            public Boolean call() throws Exception {
                return KafkaIndexTaskClient.this.setEndOffsets(id, endOffsets, resume, finalize);
            }
        });
    }

    @VisibleForTesting
    RetryPolicyFactory createRetryPolicyFactory() {
        return new RetryPolicyFactory(new RetryPolicyConfig().setMinWait(Period.seconds((int)2)).setMaxWait(Period.seconds((int)10)).setMaxRetryCount(this.numRetries));
    }

    @VisibleForTesting
    void checkConnection(String host, int port) throws IOException {
        new Socket(host, port).close();
    }

    private FullResponseHolder submitRequest(String id, HttpMethod method, String pathSuffix, String query, boolean retry) {
        return this.submitRequest(id, method, pathSuffix, query, new byte[0], retry);
    }

    private FullResponseHolder submitRequest(String id, HttpMethod method, String pathSuffix, String query, byte[] content, boolean retry) {
        RetryPolicy retryPolicy = this.retryPolicyFactory.makeRetryPolicy();
        while (true) {
            FullResponseHolder response = null;
            Request request = null;
            TaskLocation location = TaskLocation.unknown();
            String path = StringUtils.format((String)"%s/%s/%s", (Object[])new Object[]{BASE_PATH, id, pathSuffix});
            Optional status = this.taskInfoProvider.getTaskStatus(id);
            if (!status.isPresent() || !((TaskStatus)status.get()).isRunnable()) {
                throw new TaskNotRunnableException(StringUtils.format((String)"Aborting request because task [%s] is not runnable", (Object[])new Object[]{id}));
            }
            String host = location.getHost();
            String scheme = "";
            int port = -1;
            try {
                location = this.taskInfoProvider.getTaskLocation(id);
                if (location.equals((Object)TaskLocation.unknown())) {
                    throw new NoTaskLocationException(StringUtils.format((String)"No TaskLocation available for task [%s]", (Object[])new Object[]{id}));
                }
                host = location.getHost();
                scheme = location.getTlsPort() >= 0 ? "https" : "http";
                port = location.getTlsPort() >= 0 ? location.getTlsPort() : location.getPort();
                this.checkConnection(host, port);
                try {
                    URI serviceUri = new URI(scheme, null, host, port, path, query, null);
                    request = new Request(method, serviceUri.toURL());
                    request.addHeader("X-Druid-Task-Id", id);
                    if (content.length > 0) {
                        request.setContent("application/json", content);
                    }
                    log.debug("HTTP %s: %s", new Object[]{method.getName(), serviceUri.toString()});
                    response = (FullResponseHolder)this.httpClient.go(request, (HttpResponseHandler)new FullResponseHandler(Charsets.UTF_8), this.httpTimeout).get();
                }
                catch (Exception e) {
                    Throwables.propagateIfInstanceOf((Throwable)e.getCause(), IOException.class);
                    Throwables.propagateIfInstanceOf((Throwable)e.getCause(), ChannelException.class);
                    throw Throwables.propagate((Throwable)e);
                }
                int responseCode = response.getStatus().getCode();
                if (responseCode / 100 == 2) {
                    return response;
                }
                if (responseCode == 400) {
                    throw new IAE("Received 400 Bad Request with body: %s", new Object[]{response.getContent()});
                }
                throw new IOE("Received status [%d]", new Object[]{responseCode});
            }
            catch (IOException | ChannelException e) {
                String urlForLog;
                Duration delay;
                if (response != null && response.getStatus().equals((Object)HttpResponseStatus.NOT_FOUND)) {
                    String headerId = response.getResponse().headers().get("X-Druid-Task-Id");
                    if (headerId != null && !headerId.equals(id)) {
                        log.warn("Expected worker to have taskId [%s] but has taskId [%s], will retry in [%d]s", new Object[]{id, headerId, 5});
                        delay = Duration.standardSeconds((long)5L);
                    } else {
                        delay = retryPolicy.getAndIncrementRetryDelay();
                    }
                } else {
                    delay = retryPolicy.getAndIncrementRetryDelay();
                }
                String string = urlForLog = request != null ? request.getUrl().toString() : StringUtils.format((String)"%s://%s:%d%s", (Object[])new Object[]{scheme, host, port, path});
                if (!retry) {
                    log.info("submitRequest failed for [%s], with message [%s]", new Object[]{urlForLog, e.getMessage()});
                    Throwables.propagate((Throwable)e);
                    continue;
                }
                if (delay == null) {
                    log.warn(e, "Retries exhausted for [%s], last exception:", new Object[]{urlForLog});
                    Throwables.propagate((Throwable)e);
                    continue;
                }
                try {
                    long sleepTime = delay.getMillis();
                    log.debug("Bad response HTTP [%s] from [%s]; will try again in [%s] (body/exception: [%s])", new Object[]{response != null ? Integer.valueOf(response.getStatus().getCode()) : "no response", urlForLog, new Duration(sleepTime).toString(), response != null ? response.getContent() : e.getMessage()});
                    Thread.sleep(sleepTime);
                }
                catch (InterruptedException e2) {
                    Throwables.propagate((Throwable)e2);
                }
                continue;
            }
            catch (NoTaskLocationException e) {
                log.info("No TaskLocation available for task [%s], this task may not have been assigned to a worker yet or may have already completed", new Object[]{id});
                throw e;
            }
            catch (Exception e) {
                log.warn((Throwable)e, "Exception while sending request", new Object[0]);
                throw e;
            }
            break;
        }
    }

    public static class TaskNotRunnableException
    extends RuntimeException {
        public TaskNotRunnableException(String message) {
            super(message);
        }
    }

    public static class NoTaskLocationException
    extends RuntimeException {
        public NoTaskLocationException(String message) {
            super(message);
        }
    }
}

