/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.seekablestream;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.MapType;
import com.fasterxml.jackson.databind.type.TypeFactory;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.RetryPolicy;
import org.apache.druid.indexing.common.RetryPolicyConfig;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.TaskInfoProvider;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.java.util.common.Either;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.http.client.response.BytesFullResponseHandler;
import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
import org.apache.druid.metadata.PendingSegmentRecord;
import org.apache.druid.rpc.HttpResponseException;
import org.apache.druid.rpc.IgnoreHttpResponseHandler;
import org.apache.druid.rpc.RequestBuilder;
import org.apache.druid.rpc.ServiceClient;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.rpc.ServiceClosedException;
import org.apache.druid.rpc.ServiceLocation;
import org.apache.druid.rpc.ServiceLocations;
import org.apache.druid.rpc.ServiceLocator;
import org.apache.druid.rpc.ServiceNotAvailableException;
import org.apache.druid.rpc.ServiceRetryPolicy;
import org.apache.druid.rpc.StandardRetryPolicy;
import org.apache.druid.rpc.indexing.SpecificTaskRetryPolicy;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Period;

public abstract class SeekableStreamIndexTaskClientAsyncImpl<PartitionIdType, SequenceOffsetType>
implements SeekableStreamIndexTaskClient<PartitionIdType, SequenceOffsetType> {
    private static final EmittingLogger log = new EmittingLogger(SeekableStreamIndexTaskClientAsyncImpl.class);
    public static final int MIN_RETRY_WAIT_SECONDS = 2;
    public static final int MAX_RETRY_WAIT_SECONDS = 10;
    private final ServiceClientFactory serviceClientFactory;
    private final TaskInfoProvider taskInfoProvider;
    private final ObjectMapper jsonMapper;
    private final Duration httpTimeout;
    private final long httpRetries;
    private final ScheduledExecutorService retryExec;

    public SeekableStreamIndexTaskClientAsyncImpl(String dataSource, ServiceClientFactory serviceClientFactory, TaskInfoProvider taskInfoProvider, ObjectMapper jsonMapper, Duration httpTimeout, long httpRetries) {
        this.serviceClientFactory = serviceClientFactory;
        this.taskInfoProvider = taskInfoProvider;
        this.jsonMapper = jsonMapper;
        this.httpTimeout = httpTimeout;
        this.httpRetries = httpRetries;
        this.retryExec = Execs.scheduledSingleThreaded((String)StringUtils.format((String)"%s-%s-%%d", (Object[])new Object[]{this.getClass().getSimpleName(), StringUtils.encodeForFormat((String)dataSource)}));
    }

    @Override
    public ListenableFuture<TreeMap<Integer, Map<PartitionIdType, SequenceOffsetType>>> getCheckpointsAsync(String id, boolean retry) {
        return this.makeRequest(id, new RequestBuilder(HttpMethod.GET, "/checkpoints")).handler(new BytesFullResponseHandler()).onSuccess(r -> {
            TypeFactory factory = this.jsonMapper.getTypeFactory();
            return (TreeMap)JacksonUtils.readValue((ObjectMapper)this.jsonMapper, (byte[])r.getContent(), (JavaType)factory.constructMapType(TreeMap.class, factory.constructType(Integer.class), (JavaType)factory.constructMapType(Map.class, this.getPartitionType(), this.getSequenceType())));
        }).onNotAvailable(e -> Either.value(new TreeMap())).retry(retry).go();
    }

    @Override
    public ListenableFuture<Boolean> stopAsync(String id, boolean publish) {
        return this.makeRequest(id, new RequestBuilder(HttpMethod.POST, "/stop" + (publish ? "?publish=true" : ""))).onSuccess(r -> true).onHttpError(e -> {
            log.warn("Task [%s] coundln't be stopped because of http request failure [%s].", new Object[]{id, e.getMessage()});
            return Either.value((Object)false);
        }).onNotAvailable(e -> {
            log.warn("Task [%s] coundln't be stopped because it is not available.", new Object[]{id});
            return Either.value((Object)false);
        }).onClosed(e -> {
            log.warn("Task [%s] couldn't be stopped because it is no longer running.", new Object[]{id});
            return Either.value((Object)true);
        }).go();
    }

    @Override
    public ListenableFuture<Boolean> resumeAsync(String id) {
        return this.makeRequest(id, new RequestBuilder(HttpMethod.POST, "/resume")).onSuccess(r -> true).onException(e -> Either.value((Object)false)).go();
    }

    @Override
    public ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> getCurrentOffsetsAsync(String id, boolean retry) {
        return this.makeRequest(id, new RequestBuilder(HttpMethod.GET, "/offsets/current")).handler(new BytesFullResponseHandler()).onSuccess(r -> this.deserializeOffsetsMap(r.getContent())).onNotAvailable(e -> Either.value(Collections.emptyMap())).retry(retry).go();
    }

    @Override
    public ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> getEndOffsetsAsync(String id) {
        return this.makeRequest(id, new RequestBuilder(HttpMethod.GET, "/offsets/end")).handler(new BytesFullResponseHandler()).onSuccess(r -> this.deserializeOffsetsMap(r.getContent())).onNotAvailable(e -> Either.value(Collections.emptyMap())).go();
    }

    @Override
    public ListenableFuture<Boolean> registerNewVersionOfPendingSegmentAsync(String taskId, PendingSegmentRecord pendingSegmentRecord) {
        RequestBuilder requestBuilder = new RequestBuilder(HttpMethod.POST, "/pendingSegmentVersion").jsonContent(this.jsonMapper, (Object)pendingSegmentRecord);
        return this.makeRequest(taskId, requestBuilder).handler(IgnoreHttpResponseHandler.INSTANCE).onSuccess(r -> true).go();
    }

    @Override
    public ListenableFuture<Boolean> setEndOffsetsAsync(String id, Map<PartitionIdType, SequenceOffsetType> endOffsets, boolean finalize) {
        RequestBuilder requestBuilder = new RequestBuilder(HttpMethod.POST, StringUtils.format((String)"/offsets/end?finish=%s", (Object[])new Object[]{finalize})).jsonContent(this.jsonMapper, endOffsets);
        return this.makeRequest(id, requestBuilder).handler(IgnoreHttpResponseHandler.INSTANCE).onSuccess(r -> true).go();
    }

    @Override
    public ListenableFuture<SeekableStreamIndexTaskRunner.Status> getStatusAsync(String id) {
        return this.makeRequest(id, new RequestBuilder(HttpMethod.GET, "/status")).handler(new BytesFullResponseHandler()).onSuccess(r -> (SeekableStreamIndexTaskRunner.Status)((Object)((Object)JacksonUtils.readValue((ObjectMapper)this.jsonMapper, (byte[])r.getContent(), SeekableStreamIndexTaskRunner.Status.class)))).onNotAvailable(e -> Either.value((Object)((Object)SeekableStreamIndexTaskRunner.Status.NOT_STARTED))).go();
    }

    @Override
    public ListenableFuture<DateTime> getStartTimeAsync(String id) {
        return this.makeRequest(id, new RequestBuilder(HttpMethod.GET, "/time/start")).handler(new BytesFullResponseHandler()).onSuccess(r -> {
            if (SeekableStreamIndexTaskClientAsyncImpl.isNullOrEmpty(r.getContent())) {
                return null;
            }
            return (DateTime)JacksonUtils.readValue((ObjectMapper)this.jsonMapper, (byte[])r.getContent(), DateTime.class);
        }).onNotAvailable(e -> Either.value(null)).go();
    }

    @Override
    public ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> pauseAsync(String id) {
        ListenableFuture<Map> pauseFuture = this.makeRequest(id, new RequestBuilder(HttpMethod.POST, "/pause")).handler(new BytesFullResponseHandler()).onSuccess(r -> {
            if (r.getStatus().equals((Object)HttpResponseStatus.OK)) {
                log.info("Task [%s] paused successfully", new Object[]{id});
                return this.deserializeOffsetsMap(r.getContent());
            }
            if (r.getStatus().equals((Object)HttpResponseStatus.ACCEPTED)) {
                return null;
            }
            throw new ISE("Pause request for task [%s] failed with response [%s]", new Object[]{id, r.getStatus()});
        }).onNotAvailable(e -> Either.value(Collections.emptyMap())).go();
        return FutureUtils.transformAsync(pauseFuture, result -> {
            if (result != null) {
                return Futures.immediateFuture((Object)result);
            }
            return this.getOffsetsWhenPaused(id, new RetryPolicyFactory(new RetryPolicyConfig().setMinWait(Period.seconds((int)2)).setMaxWait(Period.seconds((int)10)).setMaxRetryCount(this.httpRetries)).makeRetryPolicy());
        });
    }

    @Override
    public ListenableFuture<Map<String, Object>> getMovingAveragesAsync(String id) {
        return this.makeRequest(id, new RequestBuilder(HttpMethod.GET, "/rowStats")).handler(new BytesFullResponseHandler()).onSuccess(r -> {
            if (SeekableStreamIndexTaskClientAsyncImpl.isNullOrEmpty(r.getContent())) {
                log.warn("Got empty response when calling getMovingAverages, id[%s]", new Object[]{id});
                return null;
            }
            return (Map)JacksonUtils.readValue((ObjectMapper)this.jsonMapper, (byte[])r.getContent(), (TypeReference)JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT);
        }).onNotAvailable(e -> Either.value(Collections.emptyMap())).go();
    }

    @Override
    public ListenableFuture<List<ParseExceptionReport>> getParseErrorsAsync(String id) {
        return this.makeRequest(id, new RequestBuilder(HttpMethod.GET, "/unparseableEvents")).handler(new BytesFullResponseHandler()).onSuccess(r -> {
            if (SeekableStreamIndexTaskClientAsyncImpl.isNullOrEmpty(r.getContent())) {
                log.warn("Got empty response when calling getParseErrors, id[%s]", new Object[]{id});
                return null;
            }
            return (List)JacksonUtils.readValue((ObjectMapper)this.jsonMapper, (byte[])r.getContent(), (TypeReference)TYPE_REFERENCE_LIST_PARSE_EXCEPTION_REPORT);
        }).onNotAvailable(e -> Either.value(Collections.emptyList())).go();
    }

    @Override
    public void close() {
        this.retryExec.shutdownNow();
    }

    private SeekableStreamRequestBuilder<Void, Void, Void> makeRequest(String taskId, RequestBuilder requestBuilder) {
        return new SeekableStreamRequestBuilder<Void, Void, Void>(taskId, requestBuilder, (HttpResponseHandler<Void, Void>)IgnoreHttpResponseHandler.INSTANCE, Function.identity());
    }

    private Map<PartitionIdType, SequenceOffsetType> deserializeOffsetsMap(byte[] content) {
        MapType offsetsMapType = this.jsonMapper.getTypeFactory().constructMapType(Map.class, this.getPartitionType(), this.getSequenceType());
        return (Map)JacksonUtils.readValue((ObjectMapper)this.jsonMapper, (byte[])content, (JavaType)offsetsMapType);
    }

    private ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> getOffsetsWhenPaused(String taskId, RetryPolicy retryPolicy) {
        ListenableFuture<SeekableStreamIndexTaskRunner.Status> statusFuture = this.getStatusAsync(taskId);
        return FutureUtils.transformAsync(statusFuture, status -> {
            Duration delay;
            if (status == SeekableStreamIndexTaskRunner.Status.PAUSED) {
                return this.getCurrentOffsetsAsync(taskId, true);
            }
            RetryPolicy retryPolicy2 = retryPolicy;
            synchronized (retryPolicy2) {
                delay = retryPolicy.getAndIncrementRetryDelay();
            }
            if (delay == null) {
                return Futures.immediateFailedFuture((Throwable)new ISE("Task [%s] failed to change its status from [%s] to [%s], aborting", new Object[]{taskId, status, SeekableStreamIndexTaskRunner.Status.PAUSED}));
            }
            long sleepTime = delay.getMillis();
            final SettableFuture retVal = SettableFuture.create();
            this.retryExec.schedule(() -> Futures.addCallback(this.getOffsetsWhenPaused(taskId, retryPolicy), (FutureCallback)new FutureCallback<Map<PartitionIdType, SequenceOffsetType>>(){

                public void onSuccess(@Nullable Map<PartitionIdType, SequenceOffsetType> result) {
                    retVal.set(result);
                }

                public void onFailure(Throwable t) {
                    retVal.setException(t);
                }
            }, (Executor)MoreExecutors.directExecutor()), sleepTime, TimeUnit.MILLISECONDS);
            return retVal;
        });
    }

    private static boolean isNullOrEmpty(@Nullable byte[] content) {
        return content == null || content.length == 0;
    }

    static class SeekableStreamTaskLocator
    implements ServiceLocator {
        private static final String BASE_PATH = "/druid/worker/v1/chat";
        private final TaskInfoProvider taskInfoProvider;
        private final String taskId;

        SeekableStreamTaskLocator(TaskInfoProvider taskInfoProvider, String taskId) {
            this.taskInfoProvider = taskInfoProvider;
            this.taskId = taskId;
        }

        public ListenableFuture<ServiceLocations> locate() {
            Optional<TaskStatus> status = this.taskInfoProvider.getTaskStatus(this.taskId);
            if (status.isPresent() && ((TaskStatus)status.get()).isRunnable()) {
                TaskLocation location = this.taskInfoProvider.getTaskLocation(this.taskId);
                if (location.getHost() == null) {
                    return Futures.immediateFuture((Object)ServiceLocations.forLocations(Collections.emptySet()));
                }
                return Futures.immediateFuture((Object)ServiceLocations.forLocation((ServiceLocation)new ServiceLocation(location.getHost(), location.getPort(), location.getTlsPort(), StringUtils.format((String)"%s/%s", (Object[])new Object[]{BASE_PATH, StringUtils.urlEncode((String)this.taskId)}))));
            }
            return Futures.immediateFuture((Object)ServiceLocations.closed());
        }

        public void close() {
        }
    }

    private class SeekableStreamRequestBuilder<IntermediateType, FinalType, T> {
        private final String taskId;
        private final RequestBuilder requestBuilder;
        private final List<Function<Throwable, Either<Throwable, T>>> exceptionMappers = new ArrayList<Function<Throwable, Either<Throwable, T>>>();
        private HttpResponseHandler<IntermediateType, FinalType> responseHandler;
        private Function<FinalType, T> responseTransformer;
        private boolean retry = true;

        SeekableStreamRequestBuilder(String taskId, RequestBuilder requestBuilder, HttpResponseHandler<IntermediateType, FinalType> responseHandler, Function<FinalType, T> responseTransformer) {
            this.taskId = taskId;
            this.requestBuilder = requestBuilder;
            this.responseHandler = responseHandler;
            this.responseTransformer = responseTransformer;
        }

        public <NewIntermediateType, NewFinalType> SeekableStreamRequestBuilder<NewIntermediateType, NewFinalType, T> handler(HttpResponseHandler<NewIntermediateType, NewFinalType> handler) {
            this.responseHandler = handler;
            return this;
        }

        public <NewT> SeekableStreamRequestBuilder<IntermediateType, FinalType, NewT> onSuccess(Function<FinalType, NewT> responseTransformer) {
            this.responseTransformer = responseTransformer;
            return this;
        }

        public SeekableStreamRequestBuilder<IntermediateType, FinalType, T> retry(boolean retry) {
            this.retry = retry;
            return this;
        }

        public SeekableStreamRequestBuilder<IntermediateType, FinalType, T> onException(Function<Throwable, Either<Throwable, T>> fn) {
            this.exceptionMappers.add(fn);
            return this;
        }

        public SeekableStreamRequestBuilder<IntermediateType, FinalType, T> onHttpError(Function<HttpResponseException, Either<Throwable, T>> fn) {
            return this.onException(e -> {
                if (e instanceof HttpResponseException) {
                    return (Either)fn.apply((HttpResponseException)e);
                }
                return Either.error((Object)e);
            });
        }

        public SeekableStreamRequestBuilder<IntermediateType, FinalType, T> onNotAvailable(Function<ServiceNotAvailableException, Either<Throwable, T>> fn) {
            return this.onException(e -> {
                if (e instanceof ServiceNotAvailableException) {
                    return (Either)fn.apply((ServiceNotAvailableException)e);
                }
                return Either.error((Object)e);
            });
        }

        public SeekableStreamRequestBuilder<IntermediateType, FinalType, T> onClosed(Function<ServiceClosedException, Either<Throwable, T>> fn) {
            return this.onException(e -> {
                if (e instanceof ServiceClosedException) {
                    return (Either)fn.apply((ServiceClosedException)e);
                }
                return Either.error((Object)e);
            });
        }

        public ListenableFuture<T> go() {
            ServiceClient client = this.makeClient(this.taskId, this.retry);
            final SettableFuture retVal = SettableFuture.create();
            Futures.addCallback((ListenableFuture)FutureUtils.transform((ListenableFuture)client.asyncRequest(this.requestBuilder.timeout(SeekableStreamIndexTaskClientAsyncImpl.this.httpTimeout), this.responseHandler), this.responseTransformer), (FutureCallback)new FutureCallback<T>(){

                public void onSuccess(@Nullable T result) {
                    retVal.set(result);
                }

                public void onFailure(Throwable t) {
                    Either either = Either.error((Object)t);
                    for (Function exceptionMapper : SeekableStreamRequestBuilder.this.exceptionMappers) {
                        if (!either.isError()) break;
                        try {
                            Either nextEither = (Either)exceptionMapper.apply(either.error());
                            if (nextEither == null) continue;
                            either = nextEither;
                        }
                        catch (Throwable e) {
                            log.warn(e, "Failed to map exception encountered while contacting task [%s]", new Object[]{SeekableStreamRequestBuilder.this.taskId});
                        }
                    }
                    if (either.isError()) {
                        retVal.setException((Throwable)either.error());
                    } else {
                        retVal.set(either.valueOrThrow());
                    }
                }
            }, (Executor)MoreExecutors.directExecutor());
            return retVal;
        }

        private ServiceClient makeClient(String taskId, boolean retry) {
            ServiceRetryPolicy retryPolicy = this.makeRetryPolicy(taskId, retry);
            SeekableStreamTaskLocator locator = new SeekableStreamTaskLocator(SeekableStreamIndexTaskClientAsyncImpl.this.taskInfoProvider, taskId);
            return SeekableStreamIndexTaskClientAsyncImpl.this.serviceClientFactory.makeClient(taskId, (ServiceLocator)locator, retryPolicy);
        }

        private ServiceRetryPolicy makeRetryPolicy(String taskId, boolean retry) {
            StandardRetryPolicy baseRetryPolicy = retry ? StandardRetryPolicy.builder().maxAttempts(SeekableStreamIndexTaskClientAsyncImpl.this.httpRetries + 1L).minWaitMillis(2000L).maxWaitMillis(10000L).retryNotAvailable(false).build() : StandardRetryPolicy.noRetries();
            return new SpecificTaskRetryPolicy(taskId, (ServiceRetryPolicy)baseRetryPolicy);
        }
    }
}

