/*
 * Decompiled with CFR 0.152.
 */
package com.uber.cadence.internal.worker;

import com.uber.cadence.Header;
import com.uber.cadence.PollForActivityTaskResponse;
import com.uber.cadence.RespondActivityTaskCanceledRequest;
import com.uber.cadence.RespondActivityTaskCompletedRequest;
import com.uber.cadence.RespondActivityTaskFailedRequest;
import com.uber.cadence.WorkflowExecution;
import com.uber.cadence.context.ContextPropagator;
import com.uber.cadence.internal.common.RpcRetryer;
import com.uber.cadence.internal.worker.ActivityPollTask;
import com.uber.cadence.internal.worker.ActivityTaskHandler;
import com.uber.cadence.internal.worker.PollTaskExecutor;
import com.uber.cadence.internal.worker.Poller;
import com.uber.cadence.internal.worker.PollerOptions;
import com.uber.cadence.internal.worker.SingleWorkerOptions;
import com.uber.cadence.internal.worker.SuspendableWorkerBase;
import com.uber.cadence.serviceclient.IWorkflowService;
import com.uber.m3.tally.Scope;
import com.uber.m3.tally.Stopwatch;
import com.uber.m3.util.Duration;
import com.uber.m3.util.ImmutableMap;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeUnit;
import org.apache.thrift.TBaseHelper;
import org.apache.thrift.TException;
import org.slf4j.MDC;

public class ActivityWorker
extends SuspendableWorkerBase {
    protected final SingleWorkerOptions options;
    private final ActivityTaskHandler handler;
    private final IWorkflowService service;
    private final String domain;
    private final String taskList;

    public ActivityWorker(IWorkflowService service, String domain, String taskList, SingleWorkerOptions options, ActivityTaskHandler handler) {
        this(service, domain, taskList, options, handler, "Activity Poller taskList=");
    }

    public ActivityWorker(IWorkflowService service, String domain, String taskList, SingleWorkerOptions options, ActivityTaskHandler handler, String pollThreadNamePrefix) {
        this.service = Objects.requireNonNull(service);
        this.domain = Objects.requireNonNull(domain);
        this.taskList = Objects.requireNonNull(taskList);
        this.handler = handler;
        PollerOptions pollerOptions = options.getPollerOptions();
        if (pollerOptions.getPollThreadNamePrefix() == null) {
            pollerOptions = PollerOptions.newBuilder(pollerOptions).setPollThreadNamePrefix(pollThreadNamePrefix + "\"" + taskList + "\", domain=\"" + domain + "\"").build();
        }
        this.options = SingleWorkerOptions.newBuilder(options).setPollerOptions(pollerOptions).build();
    }

    @Override
    public void start() {
        if (this.handler.isAnyTypeSupported()) {
            Poller<PollForActivityTaskResponse> poller = new Poller<PollForActivityTaskResponse>(this.options.getIdentity(), this.getOrCreateActivityPollTask(), new PollTaskExecutor<PollForActivityTaskResponse>(this.domain, this.taskList, this.options, new TaskHandlerImpl(this.handler)), this.options.getPollerOptions(), this.options.getMetricsScope());
            poller.start();
            this.setPoller(poller);
            this.options.getMetricsScope().counter("cadence-worker-start").inc(1L);
        }
    }

    protected Poller.PollTask<PollForActivityTaskResponse> getOrCreateActivityPollTask() {
        return new ActivityPollTask(this.service, this.domain, this.taskList, this.options);
    }

    private class TaskHandlerImpl
    implements PollTaskExecutor.TaskHandler<PollForActivityTaskResponse> {
        final ActivityTaskHandler handler;

        private TaskHandlerImpl(ActivityTaskHandler handler) {
            this.handler = handler;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void handle(PollForActivityTaskResponse task) throws Exception {
            Scope metricsScope = ActivityWorker.this.options.getMetricsScope().tagged((Map)ImmutableMap.of((Object)"ActivityType", (Object)task.getActivityType().getName(), (Object)"WorkflowType", (Object)task.getWorkflowType().getName()));
            metricsScope.timer("cadence-activity-scheduled-to-start-latency").record(Duration.ofNanos((long)(task.getStartedTimestamp() - task.getScheduledTimestampOfThisAttempt())));
            MDC.put((String)"ActivityID", (String)task.getActivityId());
            MDC.put((String)"ActivityType", (String)task.getActivityType().getName());
            MDC.put((String)"WorkflowID", (String)task.getWorkflowExecution().getWorkflowId());
            MDC.put((String)"RunID", (String)task.getWorkflowExecution().getRunId());
            this.propagateContext(task);
            try {
                Stopwatch sw = metricsScope.timer("cadence-activity-execution-latency").start();
                ActivityTaskHandler.Result response = this.handler.handle(task, metricsScope, false);
                sw.stop();
                sw = metricsScope.timer("cadence-activity-response-latency").start();
                this.sendReply(task, response, metricsScope);
                sw.stop();
                long nanoTime = TimeUnit.NANOSECONDS.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
                Duration duration = Duration.ofNanos((long)(nanoTime - task.getScheduledTimestampOfThisAttempt()));
                metricsScope.timer("cadence-activity-endtoend-latency").record(duration);
            }
            catch (CancellationException e) {
                RespondActivityTaskCanceledRequest cancelledRequest = new RespondActivityTaskCanceledRequest();
                cancelledRequest.setDetails(String.valueOf(e.getMessage()).getBytes(StandardCharsets.UTF_8));
                Stopwatch sw = metricsScope.timer("cadence-activity-response-latency").start();
                this.sendReply(task, new ActivityTaskHandler.Result(null, null, cancelledRequest), metricsScope);
                sw.stop();
            }
            finally {
                MDC.remove((String)"ActivityID");
                MDC.remove((String)"ActivityType");
                MDC.remove((String)"WorkflowID");
                MDC.remove((String)"RunID");
            }
        }

        void propagateContext(PollForActivityTaskResponse response) {
            if (ActivityWorker.this.options.getContextPropagators() == null || ActivityWorker.this.options.getContextPropagators().isEmpty()) {
                return;
            }
            Header headers = response.getHeader();
            if (headers == null) {
                return;
            }
            HashMap<String, byte[]> headerData = new HashMap<String, byte[]>();
            headers.getFields().forEach((k, v) -> headerData.put((String)k, TBaseHelper.byteBufferToByteArray((ByteBuffer)v)));
            for (ContextPropagator propagator : ActivityWorker.this.options.getContextPropagators()) {
                propagator.setCurrentContext(propagator.deserializeContext(headerData));
            }
        }

        @Override
        public Throwable wrapFailure(PollForActivityTaskResponse task, Throwable failure) {
            WorkflowExecution execution = task.getWorkflowExecution();
            return new RuntimeException("Failure processing activity task. WorkflowID=" + execution.getWorkflowId() + ", RunID=" + execution.getRunId() + ", ActivityType=" + task.getActivityType().getName() + ", ActivityID=" + task.getActivityId(), failure);
        }

        private void sendReply(PollForActivityTaskResponse task, ActivityTaskHandler.Result response, Scope metricsScope) throws TException {
            RespondActivityTaskCompletedRequest taskCompleted = response.getTaskCompleted();
            if (taskCompleted != null) {
                taskCompleted.setTaskToken(task.getTaskToken());
                taskCompleted.setIdentity(ActivityWorker.this.options.getIdentity());
                RpcRetryer.retry(() -> ActivityWorker.this.service.RespondActivityTaskCompleted(taskCompleted));
                metricsScope.counter("cadence-activity-task-completed").inc(1L);
            } else if (response.getTaskFailedResult() != null) {
                RespondActivityTaskFailedRequest taskFailed = response.getTaskFailedResult().getTaskFailedRequest();
                taskFailed.setTaskToken(task.getTaskToken());
                taskFailed.setIdentity(ActivityWorker.this.options.getIdentity());
                RpcRetryer.retry(() -> ActivityWorker.this.service.RespondActivityTaskFailed(taskFailed));
                metricsScope.counter("cadence-activity-task-failed").inc(1L);
            } else {
                RespondActivityTaskCanceledRequest taskCancelled = response.getTaskCancelled();
                if (taskCancelled != null) {
                    taskCancelled.setTaskToken(task.getTaskToken());
                    taskCancelled.setIdentity(ActivityWorker.this.options.getIdentity());
                    RpcRetryer.retry(() -> ActivityWorker.this.service.RespondActivityTaskCanceled(taskCancelled));
                    metricsScope.counter("cadence-activity-task-canceled").inc(1L);
                }
            }
        }
    }
}

