/*
 * Decompiled with CFR 0.152.
 */
package io.dapr.durabletask;

import com.google.protobuf.StringValue;
import io.dapr.durabletask.DataConverter;
import io.dapr.durabletask.DurableTaskGrpcWorkerBuilder;
import io.dapr.durabletask.FailureDetails;
import io.dapr.durabletask.JacksonDataConverter;
import io.dapr.durabletask.TaskActivityExecutor;
import io.dapr.durabletask.TaskActivityFactory;
import io.dapr.durabletask.TaskOrchestrationExecutor;
import io.dapr.durabletask.TaskOrchestrationFactory;
import io.dapr.durabletask.TaskOrchestratorResult;
import io.dapr.durabletask.implementation.protobuf.OrchestratorService;
import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

public final class DurableTaskGrpcWorker
implements AutoCloseable {
    private static final int DEFAULT_PORT = 4001;
    private static final Logger logger = Logger.getLogger(DurableTaskGrpcWorker.class.getPackage().getName());
    private static final Duration DEFAULT_MAXIMUM_TIMER_INTERVAL = Duration.ofDays(3L);
    private final HashMap<String, TaskOrchestrationFactory> orchestrationFactories = new HashMap();
    private final HashMap<String, TaskActivityFactory> activityFactories = new HashMap();
    private final ManagedChannel managedSidecarChannel;
    private final DataConverter dataConverter;
    private final Duration maximumTimerInterval;
    private final ExecutorService workerPool;
    private final TaskHubSidecarServiceGrpc.TaskHubSidecarServiceBlockingStub sidecarClient;
    private final boolean isExecutorServiceManaged;
    private volatile boolean isNormalShutdown = false;

    DurableTaskGrpcWorker(DurableTaskGrpcWorkerBuilder builder) {
        Channel sidecarGrpcChannel;
        this.orchestrationFactories.putAll(builder.orchestrationFactories);
        this.activityFactories.putAll(builder.activityFactories);
        if (builder.channel != null) {
            this.managedSidecarChannel = null;
            sidecarGrpcChannel = builder.channel;
        } else {
            int port = 4001;
            if (builder.port > 0) {
                port = builder.port;
            }
            this.managedSidecarChannel = ManagedChannelBuilder.forAddress((String)"localhost", (int)port).usePlaintext().build();
            sidecarGrpcChannel = this.managedSidecarChannel;
        }
        this.sidecarClient = TaskHubSidecarServiceGrpc.newBlockingStub(sidecarGrpcChannel);
        this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter();
        this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval : DEFAULT_MAXIMUM_TIMER_INTERVAL;
        this.workerPool = builder.executorService != null ? builder.executorService : Executors.newCachedThreadPool();
        this.isExecutorServiceManaged = builder.executorService == null;
    }

    public void start() {
        new Thread(this::startAndBlock).start();
    }

    @Override
    public void close() {
        this.isNormalShutdown = true;
        this.shutDownWorkerPool();
        this.closeSideCarChannel();
    }

    public void startAndBlock() {
        logger.log(Level.INFO, "Durable Task worker is connecting to sidecar at {0}.", this.getSidecarAddress());
        TaskOrchestrationExecutor taskOrchestrationExecutor = new TaskOrchestrationExecutor(this.orchestrationFactories, this.dataConverter, this.maximumTimerInterval, logger);
        TaskActivityExecutor taskActivityExecutor = new TaskActivityExecutor(this.activityFactories, this.dataConverter, logger);
        while (true) {
            try {
                block5: while (true) {
                    OrchestratorService.GetWorkItemsRequest getWorkItemsRequest = OrchestratorService.GetWorkItemsRequest.newBuilder().build();
                    Iterator<OrchestratorService.WorkItem> workItemStream = this.sidecarClient.getWorkItems(getWorkItemsRequest);
                    while (true) {
                        if (!workItemStream.hasNext()) continue block5;
                        OrchestratorService.WorkItem workItem = workItemStream.next();
                        OrchestratorService.WorkItem.RequestCase requestType = workItem.getRequestCase();
                        if (requestType == OrchestratorService.WorkItem.RequestCase.ORCHESTRATORREQUEST) {
                            OrchestratorService.OrchestratorRequest orchestratorRequest = workItem.getOrchestratorRequest();
                            this.workerPool.submit(() -> {
                                TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute(orchestratorRequest.getPastEventsList(), orchestratorRequest.getNewEventsList());
                                OrchestratorService.OrchestratorResponse response = OrchestratorService.OrchestratorResponse.newBuilder().setInstanceId(orchestratorRequest.getInstanceId()).addAllActions(taskOrchestratorResult.getActions()).setCustomStatus(StringValue.of((String)taskOrchestratorResult.getCustomStatus())).setCompletionToken(workItem.getCompletionToken()).build();
                                try {
                                    this.sidecarClient.completeOrchestratorTask(response);
                                }
                                catch (StatusRuntimeException e) {
                                    if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
                                        logger.log(Level.WARNING, "The sidecar at address {0} is unavailable while completing the orchestrator task.", this.getSidecarAddress());
                                    }
                                    if (e.getStatus().getCode() == Status.Code.CANCELLED) {
                                        logger.log(Level.WARNING, "Durable Task worker has disconnected from {0} while completing the orchestrator task.", this.getSidecarAddress());
                                    }
                                    logger.log(Level.WARNING, "Unexpected failure completing the orchestrator task at {0}.", this.getSidecarAddress());
                                }
                            });
                            continue;
                        }
                        if (requestType == OrchestratorService.WorkItem.RequestCase.ACTIVITYREQUEST) {
                            OrchestratorService.ActivityRequest activityRequest = workItem.getActivityRequest();
                            this.workerPool.submit(() -> {
                                String output = null;
                                OrchestratorService.TaskFailureDetails failureDetails = null;
                                try {
                                    output = taskActivityExecutor.execute(activityRequest.getName(), activityRequest.getInput().getValue(), activityRequest.getTaskId());
                                }
                                catch (Throwable e) {
                                    failureDetails = OrchestratorService.TaskFailureDetails.newBuilder().setErrorType(e.getClass().getName()).setErrorMessage(e.getMessage()).setStackTrace(StringValue.of((String)FailureDetails.getFullStackTrace(e))).build();
                                }
                                OrchestratorService.ActivityResponse.Builder responseBuilder = OrchestratorService.ActivityResponse.newBuilder().setInstanceId(activityRequest.getOrchestrationInstance().getInstanceId()).setTaskId(activityRequest.getTaskId()).setCompletionToken(workItem.getCompletionToken());
                                if (output != null) {
                                    responseBuilder.setResult(StringValue.of((String)output));
                                }
                                if (failureDetails != null) {
                                    responseBuilder.setFailureDetails(failureDetails);
                                }
                                try {
                                    this.sidecarClient.completeActivityTask(responseBuilder.build());
                                }
                                catch (StatusRuntimeException e) {
                                    if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
                                        logger.log(Level.WARNING, "The sidecar at address {0} is unavailable while completing the activity task.", this.getSidecarAddress());
                                    }
                                    if (e.getStatus().getCode() == Status.Code.CANCELLED) {
                                        logger.log(Level.WARNING, "Durable Task worker has disconnected from {0} while completing the activity task.", this.getSidecarAddress());
                                    }
                                    logger.log(Level.WARNING, "Unexpected failure completing the activity task at {0}.", this.getSidecarAddress());
                                }
                            });
                            continue;
                        }
                        if (requestType == OrchestratorService.WorkItem.RequestCase.HEALTHPING) continue;
                        logger.log(Level.WARNING, "Received and dropped an unknown '{0}' work-item from the sidecar.", (Object)requestType);
                    }
                    break;
                }
            }
            catch (StatusRuntimeException e) {
                if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
                    logger.log(Level.INFO, "The sidecar at address {0} is unavailable. Will continue retrying.", this.getSidecarAddress());
                } else if (e.getStatus().getCode() == Status.Code.CANCELLED) {
                    logger.log(Level.INFO, "Durable Task worker has disconnected from {0}.", this.getSidecarAddress());
                } else {
                    logger.log(Level.WARNING, String.format("Unexpected failure connecting to %s", this.getSidecarAddress()), e);
                }
                try {
                    Thread.sleep(5000L);
                }
                catch (InterruptedException ex) {
                    return;
                }
            }
        }
    }

    public void stop() {
        this.close();
    }

    private void closeSideCarChannel() {
        if (this.managedSidecarChannel != null) {
            try {
                this.managedSidecarChannel.shutdownNow().awaitTermination(5L, TimeUnit.SECONDS);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    private void shutDownWorkerPool() {
        if (this.isExecutorServiceManaged) {
            if (!this.isNormalShutdown) {
                logger.log(Level.WARNING, "ExecutorService shutdown initiated unexpectedly. No new tasks will be accepted");
            }
            this.workerPool.shutdown();
            try {
                if (!this.workerPool.awaitTermination(60L, TimeUnit.SECONDS)) {
                    this.workerPool.shutdownNow();
                }
            }
            catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
            }
        }
    }

    private String getSidecarAddress() {
        return this.sidecarClient.getChannel().authority();
    }
}

