/*
 * Decompiled with CFR 0.152.
 */
package io.temporal.internal.worker;

import com.google.protobuf.ByteString;
import com.uber.m3.tally.Scope;
import com.uber.m3.tally.Stopwatch;
import com.uber.m3.util.Duration;
import com.uber.m3.util.ImmutableMap;
import io.temporal.api.command.v1.ScheduleActivityTaskCommandAttributesOrBuilder;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse;
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponseOrBuilder;
import io.temporal.api.workflowservice.v1.RespondActivityTaskCanceledRequest;
import io.temporal.api.workflowservice.v1.RespondActivityTaskCompletedRequest;
import io.temporal.api.workflowservice.v1.RespondActivityTaskFailedRequest;
import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc;
import io.temporal.internal.activity.ActivityPollResponseToInfo;
import io.temporal.internal.common.ProtobufTimeUtils;
import io.temporal.internal.retryer.GrpcRetryer;
import io.temporal.internal.worker.ActivityPollTask;
import io.temporal.internal.worker.ActivityTask;
import io.temporal.internal.worker.ActivityTaskHandler;
import io.temporal.internal.worker.EagerActivityDispatcher;
import io.temporal.internal.worker.NoopWorker;
import io.temporal.internal.worker.PollTaskExecutor;
import io.temporal.internal.worker.Poller;
import io.temporal.internal.worker.PollerOptions;
import io.temporal.internal.worker.ShutdownManager;
import io.temporal.internal.worker.SingleWorkerOptions;
import io.temporal.internal.worker.SlotReservationData;
import io.temporal.internal.worker.SuspendableWorker;
import io.temporal.internal.worker.TrackingSlotSupplier;
import io.temporal.internal.worker.WorkerLifecycleState;
import io.temporal.internal.worker.WorkerThreadsNameHelper;
import io.temporal.serviceclient.MetricsTag;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.serviceclient.rpcretry.DefaultStubServiceOperationRpcRetryOptions;
import io.temporal.worker.WorkerMetricsTag;
import io.temporal.worker.tuning.ActivitySlotInfo;
import io.temporal.worker.tuning.SlotPermit;
import io.temporal.worker.tuning.SlotReleaseReason;
import io.temporal.worker.tuning.SlotSupplier;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

final class ActivityWorker
implements SuspendableWorker {
    private static final Logger log = LoggerFactory.getLogger(ActivityWorker.class);
    private SuspendableWorker poller = new NoopWorker();
    private PollTaskExecutor<ActivityTask> pollTaskExecutor;
    private final ActivityTaskHandler handler;
    private final WorkflowServiceStubs service;
    private final String namespace;
    private final String taskQueue;
    private final SingleWorkerOptions options;
    private final double taskQueueActivitiesPerSecond;
    private final PollerOptions pollerOptions;
    private final Scope workerMetricsScope;
    private final GrpcRetryer grpcRetryer;
    private final GrpcRetryer.GrpcRetryerOptions replyGrpcRetryerOptions;
    private final TrackingSlotSupplier<ActivitySlotInfo> slotSupplier;

    public ActivityWorker(@Nonnull WorkflowServiceStubs service, @Nonnull String namespace, @Nonnull String taskQueue, double taskQueueActivitiesPerSecond, @Nonnull SingleWorkerOptions options, @Nonnull ActivityTaskHandler handler, @Nonnull SlotSupplier<ActivitySlotInfo> slotSupplier) {
        this.service = Objects.requireNonNull(service);
        this.namespace = Objects.requireNonNull(namespace);
        this.taskQueue = Objects.requireNonNull(taskQueue);
        this.handler = Objects.requireNonNull(handler);
        this.taskQueueActivitiesPerSecond = taskQueueActivitiesPerSecond;
        this.options = Objects.requireNonNull(options);
        this.pollerOptions = this.getPollerOptions(options);
        this.workerMetricsScope = MetricsTag.tagged((Scope)options.getMetricsScope(), (MetricsTag.TagValue)WorkerMetricsTag.WorkerType.ACTIVITY_WORKER);
        this.grpcRetryer = new GrpcRetryer(service.getServerCapabilities());
        this.replyGrpcRetryerOptions = new GrpcRetryer.GrpcRetryerOptions(DefaultStubServiceOperationRpcRetryOptions.INSTANCE, null);
        this.slotSupplier = new TrackingSlotSupplier<ActivitySlotInfo>(slotSupplier, this.workerMetricsScope);
    }

    @Override
    public boolean start() {
        if (this.handler.isAnyTypeSupported()) {
            this.pollTaskExecutor = new PollTaskExecutor<ActivityTask>(this.namespace, this.taskQueue, this.options.getIdentity(), new TaskHandlerImpl(this.handler), this.pollerOptions, this.slotSupplier.maximumSlots().orElse(Integer.MAX_VALUE), this.options.isUsingVirtualThreads());
            this.poller = new Poller<ActivityTask>(this.options.getIdentity(), new ActivityPollTask(this.service, this.namespace, this.taskQueue, this.options.getIdentity(), this.options.getBuildId(), this.options.isUsingBuildIdForVersioning(), this.taskQueueActivitiesPerSecond, this.slotSupplier, this.workerMetricsScope, this.service.getServerCapabilities()), this.pollTaskExecutor, this.pollerOptions, this.workerMetricsScope);
            this.poller.start();
            this.workerMetricsScope.counter("temporal_worker_start").inc(1L);
            return true;
        }
        return false;
    }

    @Override
    public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean interruptTasks) {
        String supplierName = this + "#executorSlots";
        return ((CompletableFuture)((CompletableFuture)this.poller.shutdown(shutdownManager, interruptTasks).thenCompose(ignore -> !interruptTasks ? shutdownManager.waitForSupplierPermitsReleasedUnlimited(this.slotSupplier, supplierName) : CompletableFuture.completedFuture(null))).thenCompose(ignore -> this.pollTaskExecutor != null ? this.pollTaskExecutor.shutdown(shutdownManager, interruptTasks) : CompletableFuture.completedFuture(null))).exceptionally(e -> {
            log.error("Unexpected exception during shutdown", e);
            return null;
        });
    }

    @Override
    public void awaitTermination(long timeout, TimeUnit unit) {
        long timeoutMillis = ShutdownManager.awaitTermination(this.poller, unit.toMillis(timeout));
        ShutdownManager.awaitTermination(this.pollTaskExecutor, timeoutMillis);
    }

    @Override
    public void suspendPolling() {
        this.poller.suspendPolling();
    }

    @Override
    public void resumePolling() {
        this.poller.resumePolling();
    }

    @Override
    public boolean isShutdown() {
        return this.poller.isShutdown();
    }

    @Override
    public boolean isTerminated() {
        return this.poller.isTerminated() && (this.pollTaskExecutor == null || this.pollTaskExecutor.isTerminated());
    }

    @Override
    public boolean isSuspended() {
        return this.poller.isSuspended();
    }

    @Override
    public WorkerLifecycleState getLifecycleState() {
        return this.poller.getLifecycleState();
    }

    public EagerActivityDispatcher getEagerActivityDispatcher() {
        return new EagerActivityDispatcherImpl();
    }

    private PollerOptions getPollerOptions(SingleWorkerOptions options) {
        PollerOptions pollerOptions = options.getPollerOptions();
        if (pollerOptions.getPollThreadNamePrefix() == null) {
            pollerOptions = PollerOptions.newBuilder(pollerOptions).setPollThreadNamePrefix(WorkerThreadsNameHelper.getActivityPollerThreadPrefix(this.namespace, this.taskQueue)).build();
        }
        return pollerOptions;
    }

    public String toString() {
        return String.format("ActivityWorker{identity=%s, namespace=%s, taskQueue=%s}", this.options.getIdentity(), this.namespace, this.taskQueue);
    }

    private final class EagerActivityDispatcherImpl
    implements EagerActivityDispatcher {
        private EagerActivityDispatcherImpl() {
        }

        @Override
        public Optional<SlotPermit> tryReserveActivitySlot(ScheduleActivityTaskCommandAttributesOrBuilder commandAttributes) {
            if (!WorkerLifecycleState.ACTIVE.equals((Object)ActivityWorker.this.getLifecycleState()) || !Objects.equals(commandAttributes.getTaskQueue().getName(), ActivityWorker.this.taskQueue)) {
                return Optional.empty();
            }
            return ActivityWorker.this.slotSupplier.tryReserveSlot(new SlotReservationData(ActivityWorker.this.taskQueue, ActivityWorker.this.options.getIdentity(), ActivityWorker.this.options.getBuildId()));
        }

        @Override
        public void releaseActivitySlotReservations(Iterable<SlotPermit> permits) {
            for (SlotPermit permit : permits) {
                ActivityWorker.this.slotSupplier.releaseSlot(SlotReleaseReason.neverUsed(), permit);
            }
        }

        @Override
        public void dispatchActivity(PollActivityTaskQueueResponse activity, SlotPermit permit) {
            ActivityWorker.this.pollTaskExecutor.process(new ActivityTask((PollActivityTaskQueueResponseOrBuilder)activity, permit, () -> ActivityWorker.this.slotSupplier.releaseSlot(SlotReleaseReason.taskComplete(), permit)));
        }
    }

    private class TaskHandlerImpl
    implements PollTaskExecutor.TaskHandler<ActivityTask> {
        final ActivityTaskHandler handler;

        private TaskHandlerImpl(ActivityTaskHandler handler) {
            this.handler = handler;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void handle(ActivityTask task) throws Exception {
            PollActivityTaskQueueResponseOrBuilder pollResponse = task.getResponse();
            ActivityWorker.this.slotSupplier.markSlotUsed(new ActivitySlotInfo(ActivityPollResponseToInfo.toActivityInfoImpl(pollResponse, ActivityWorker.this.namespace, ActivityWorker.this.taskQueue, false), ActivityWorker.this.options.getIdentity(), ActivityWorker.this.options.getBuildId()), task.getPermit());
            Scope metricsScope = ActivityWorker.this.workerMetricsScope.tagged((Map)ImmutableMap.of((Object)"activity_type", (Object)pollResponse.getActivityType().getName(), (Object)"workflow_type", (Object)pollResponse.getWorkflowType().getName()));
            MDC.put((String)"ActivityId", (String)pollResponse.getActivityId());
            MDC.put((String)"ActivityType", (String)pollResponse.getActivityType().getName());
            MDC.put((String)"WorkflowId", (String)pollResponse.getWorkflowExecution().getWorkflowId());
            MDC.put((String)"WorkflowType", (String)pollResponse.getWorkflowType().getName());
            MDC.put((String)"RunId", (String)pollResponse.getWorkflowExecution().getRunId());
            MDC.put((String)"Attempt", (String)Integer.toString(pollResponse.getAttempt()));
            ActivityTaskHandler.Result result = null;
            try {
                result = this.handleActivity(task, metricsScope);
            }
            finally {
                MDC.remove((String)"ActivityId");
                MDC.remove((String)"ActivityType");
                MDC.remove((String)"WorkflowId");
                MDC.remove((String)"WorkflowType");
                MDC.remove((String)"RunId");
                MDC.remove((String)"Attempt");
                if (result == null || !result.isManualCompletion()) {
                    task.getCompletionCallback().apply();
                }
            }
            if (result.getTaskFailed() != null && result.getTaskFailed().getFailure() instanceof Error) {
                throw (Error)result.getTaskFailed().getFailure();
            }
        }

        private ActivityTaskHandler.Result handleActivity(ActivityTask task, Scope metricsScope) {
            ActivityTaskHandler.Result result;
            PollActivityTaskQueueResponseOrBuilder pollResponse = task.getResponse();
            ByteString taskToken = pollResponse.getTaskToken();
            metricsScope.timer("temporal_activity_schedule_to_start_latency").record(ProtobufTimeUtils.toM3Duration(pollResponse.getStartedTime(), pollResponse.getCurrentAttemptScheduledTime()));
            Stopwatch sw = metricsScope.timer("temporal_activity_execution_latency").start();
            try {
                result = this.handler.handle(task, metricsScope, false);
            }
            catch (Throwable ex) {
                log.error("[BUG] Code that expected to never throw an exception threw an exception", ex);
                throw ex;
            }
            finally {
                sw.stop();
            }
            try {
                this.sendReply(taskToken, result, metricsScope);
            }
            catch (Exception e) {
                this.logExceptionDuringResultReporting(e, pollResponse, result);
                throw e;
            }
            if (result.getTaskCompleted() != null) {
                Duration e2eDuration = ProtobufTimeUtils.toM3DurationSinceNow(pollResponse.getScheduledTime());
                metricsScope.timer("temporal_activity_succeed_endtoend_latency").record(e2eDuration);
            }
            return result;
        }

        @Override
        public Throwable wrapFailure(ActivityTask t, Throwable failure) {
            PollActivityTaskQueueResponseOrBuilder response = t.getResponse();
            WorkflowExecution execution = response.getWorkflowExecution();
            return new RuntimeException("Failure processing activity response. WorkflowId=" + execution.getWorkflowId() + ", RunId=" + execution.getRunId() + ", ActivityType=" + response.getActivityType().getName() + ", ActivityId=" + response.getActivityId(), failure);
        }

        private void sendReply(ByteString taskToken, ActivityTaskHandler.Result response, Scope metricsScope) {
            RespondActivityTaskCompletedRequest taskCompleted = response.getTaskCompleted();
            if (taskCompleted != null) {
                RespondActivityTaskCompletedRequest request = taskCompleted.toBuilder().setTaskToken(taskToken).setIdentity(ActivityWorker.this.options.getIdentity()).setNamespace(ActivityWorker.this.namespace).setWorkerVersion(ActivityWorker.this.options.workerVersionStamp()).build();
                ActivityWorker.this.grpcRetryer.retry(() -> ((WorkflowServiceGrpc.WorkflowServiceBlockingStub)((WorkflowServiceGrpc.WorkflowServiceBlockingStub)ActivityWorker.this.service.blockingStub()).withOption(MetricsTag.METRICS_TAGS_CALL_OPTIONS_KEY, (Object)metricsScope)).respondActivityTaskCompleted(request), ActivityWorker.this.replyGrpcRetryerOptions);
            } else {
                ActivityTaskHandler.Result.TaskFailedResult taskFailed = response.getTaskFailed();
                if (taskFailed != null) {
                    RespondActivityTaskFailedRequest request = taskFailed.getTaskFailedRequest().toBuilder().setTaskToken(taskToken).setIdentity(ActivityWorker.this.options.getIdentity()).setNamespace(ActivityWorker.this.namespace).setWorkerVersion(ActivityWorker.this.options.workerVersionStamp()).build();
                    ActivityWorker.this.grpcRetryer.retry(() -> ((WorkflowServiceGrpc.WorkflowServiceBlockingStub)((WorkflowServiceGrpc.WorkflowServiceBlockingStub)ActivityWorker.this.service.blockingStub()).withOption(MetricsTag.METRICS_TAGS_CALL_OPTIONS_KEY, (Object)metricsScope)).respondActivityTaskFailed(request), ActivityWorker.this.replyGrpcRetryerOptions);
                } else {
                    RespondActivityTaskCanceledRequest taskCanceled = response.getTaskCanceled();
                    if (taskCanceled != null) {
                        RespondActivityTaskCanceledRequest request = taskCanceled.toBuilder().setTaskToken(taskToken).setIdentity(ActivityWorker.this.options.getIdentity()).setNamespace(ActivityWorker.this.namespace).setWorkerVersion(ActivityWorker.this.options.workerVersionStamp()).build();
                        ActivityWorker.this.grpcRetryer.retry(() -> ((WorkflowServiceGrpc.WorkflowServiceBlockingStub)((WorkflowServiceGrpc.WorkflowServiceBlockingStub)ActivityWorker.this.service.blockingStub()).withOption(MetricsTag.METRICS_TAGS_CALL_OPTIONS_KEY, (Object)metricsScope)).respondActivityTaskCanceled(request), ActivityWorker.this.replyGrpcRetryerOptions);
                    }
                }
            }
        }

        private void logExceptionDuringResultReporting(Exception e, PollActivityTaskQueueResponseOrBuilder pollResponse, ActivityTaskHandler.Result result) {
            MDC.put((String)"ActivityId", (String)pollResponse.getActivityId());
            MDC.put((String)"ActivityType", (String)pollResponse.getActivityType().getName());
            MDC.put((String)"WorkflowId", (String)pollResponse.getWorkflowExecution().getWorkflowId());
            MDC.put((String)"RunId", (String)pollResponse.getWorkflowExecution().getRunId());
            if (log.isDebugEnabled()) {
                log.debug("Failure during reporting of activity result to the server. ActivityId = {}, ActivityType = {}, WorkflowId={}, WorkflowType={}, RunId={}, ActivityResult={}", new Object[]{pollResponse.getActivityId(), pollResponse.getActivityType().getName(), pollResponse.getWorkflowExecution().getWorkflowId(), pollResponse.getWorkflowType().getName(), pollResponse.getWorkflowExecution().getRunId(), result, e});
            } else {
                log.warn("Failure during reporting of activity result to the server. ActivityId = {}, ActivityType = {}, WorkflowId={}, WorkflowType={}, RunId={}", new Object[]{pollResponse.getActivityId(), pollResponse.getActivityType().getName(), pollResponse.getWorkflowExecution().getWorkflowId(), pollResponse.getWorkflowType().getName(), pollResponse.getWorkflowExecution().getRunId(), e});
            }
        }
    }
}

