diff --git a/packages/uipath-core/src/uipath/core/triggers/trigger.py b/packages/uipath-core/src/uipath/core/triggers/trigger.py index c897acd28..4f6cc31b1 100644 --- a/packages/uipath-core/src/uipath/core/triggers/trigger.py +++ b/packages/uipath-core/src/uipath/core/triggers/trigger.py @@ -1,5 +1,6 @@ """Module defining resume trigger types and data models.""" +from datetime import datetime from enum import Enum from typing import Any @@ -87,6 +88,7 @@ class UiPathResumeTrigger(BaseModel): integration_resume: UiPathIntegrationTrigger | None = Field( default=None, alias="integrationResume" ) + resume_time: datetime | str | None = Field(default=None, alias="resumeTime") folder_path: str | None = Field(default=None, alias="folderPath") folder_key: str | None = Field(default=None, alias="folderKey") payload: Any | None = Field(default=None, alias="interruptObject", exclude=True) diff --git a/packages/uipath-platform/src/uipath/platform/common/__init__.py b/packages/uipath-platform/src/uipath/platform/common/__init__.py index 2407263ee..d5105b5cd 100644 --- a/packages/uipath-platform/src/uipath/platform/common/__init__.py +++ b/packages/uipath-platform/src/uipath/platform/common/__init__.py @@ -53,6 +53,7 @@ WaitJobRaw, WaitSystemAgent, WaitTask, + WaitTimeTrigger, ) from .paging import PagedResult @@ -92,6 +93,7 @@ "DocumentExtractionValidation", "WaitDocumentExtractionValidation", "WaitIntegrationEvent", + "WaitTimeTrigger", "RequestSpec", "Endpoint", "UiPathUrl", diff --git a/packages/uipath-platform/src/uipath/platform/common/interrupt_models.py b/packages/uipath-platform/src/uipath/platform/common/interrupt_models.py index 3b2468551..4e6e3b75a 100644 --- a/packages/uipath-platform/src/uipath/platform/common/interrupt_models.py +++ b/packages/uipath-platform/src/uipath/platform/common/interrupt_models.py @@ -1,5 +1,6 @@ """Models for interrupt operations in UiPath platform.""" +from datetime import datetime from typing import Annotated, Any from pydantic import BaseModel, ConfigDict, Field, model_validator @@ -35,6 +36,7 @@ class InvokeProcess(BaseModel): process_folder_key: str | None = None input_arguments: dict[str, Any] | None attachments: list[Attachment] | None = None + timeout: float | None = None class WaitJob(BaseModel): @@ -279,3 +281,11 @@ class WaitIntegrationEvent(BaseModel): object_name: str filter_expression: str | None = None parameters: dict[str, str] | None = None + + +class WaitTimeTrigger(BaseModel): + """Model representing a wait on an Orchestrator time trigger.""" + + resume_time: datetime | str = Field(alias="resumeTime") + + model_config = ConfigDict(validate_by_name=True) diff --git a/packages/uipath-platform/src/uipath/platform/resume_triggers/_protocol.py b/packages/uipath-platform/src/uipath/platform/resume_triggers/_protocol.py index b2dbae787..ce131af01 100644 --- a/packages/uipath-platform/src/uipath/platform/resume_triggers/_protocol.py +++ b/packages/uipath-platform/src/uipath/platform/resume_triggers/_protocol.py @@ -3,6 +3,7 @@ import json import os import uuid +from datetime import datetime, timedelta, timezone from typing import Any from uipath.core.errors import ( @@ -49,6 +50,7 @@ WaitJobRaw, WaitSystemAgent, WaitTask, + WaitTimeTrigger, ) from uipath.platform.connections import EventArguments from uipath.platform.context_grounding import DeepRagStatus, IndexStatus @@ -68,6 +70,9 @@ TriggerMarker, ) +_TIMEOUT_TRIGGER_KIND = "timeout" +_TIMEOUT_SOURCE_INVOKE_PROCESS = "InvokeProcess" + def _try_convert_to_json_format(value: str | None) -> Any: """Attempts to parse a string as JSON and returns the parsed object or original string. @@ -129,6 +134,23 @@ async def read_trigger(self, trigger: UiPathResumeTrigger) -> Any | None: UiPathRuntimeError: If reading fails, job failed, API connection failed, trigger type is unknown, or HITL feedback retrieval failed. """ + if trigger.trigger_type == UiPathResumeTriggerType.TIMER: + if ( + isinstance(trigger.payload, dict) + and trigger.payload.get("kind") == _TIMEOUT_TRIGGER_KIND + ): + return { + "timedOut": True, + "timeout": trigger.payload.get("timeout"), + "jobKey": trigger.payload.get("jobKey"), + "processName": trigger.payload.get("processName"), + PropertyName.INTERNAL.value: TriggerMarker.NO_CONTENT.value, + } + return { + "resumeTime": trigger.resume_time, + PropertyName.INTERNAL.value: TriggerMarker.NO_CONTENT.value, + } + uipath = UiPath() match trigger.trigger_type: @@ -439,6 +461,26 @@ class UiPathResumeTriggerCreator: Implements UiPathResumeTriggerCreatorProtocol. """ + async def create_triggers(self, suspend_value: Any) -> list[UiPathResumeTrigger]: + """Create resume triggers from a suspend value. + + Most values create a single trigger. `InvokeProcess(timeout=...)` creates + both the process-completion trigger and a timer trigger so whichever + condition happens first resumes the same interrupt. + """ + resume_trigger = await self.create_trigger(suspend_value) + if ( + isinstance(suspend_value, InvokeProcess) + and not isinstance(suspend_value, InvokeProcessRaw) + and suspend_value.timeout is not None + ): + timeout_trigger = self._create_invoke_process_timeout_trigger( + suspend_value, resume_trigger + ) + return [resume_trigger, timeout_trigger] + + return [resume_trigger] + async def create_trigger(self, suspend_value: Any) -> UiPathResumeTrigger: """Create a resume trigger from a suspend value. @@ -484,6 +526,9 @@ async def create_trigger(self, suspend_value: Any) -> UiPathResumeTrigger: case UiPathResumeTriggerType.INBOX: await self._handle_inbox_trigger(suspend_value, resume_trigger) + case UiPathResumeTriggerType.TIMER: + self._handle_time_trigger(suspend_value, resume_trigger) + case UiPathResumeTriggerType.DEEP_RAG: await self._handle_deep_rag_job_trigger( suspend_value, resume_trigger @@ -570,6 +615,8 @@ def _determine_trigger_type(self, value: Any) -> UiPathResumeTriggerType: return UiPathResumeTriggerType.IXP_VS_ESCALATION if isinstance(value, WaitIntegrationEvent): return UiPathResumeTriggerType.INBOX + if isinstance(value, WaitTimeTrigger): + return UiPathResumeTriggerType.TIMER # default to API trigger return UiPathResumeTriggerType.API @@ -606,6 +653,8 @@ def _determine_trigger_name(self, value: Any) -> UiPathResumeTriggerName: return UiPathResumeTriggerName.EXTRACTION if isinstance(value, WaitIntegrationEvent): return UiPathResumeTriggerName.INBOX + if isinstance(value, WaitTimeTrigger): + return UiPathResumeTriggerName.TIMER # default to API trigger return UiPathResumeTriggerName.API @@ -979,6 +1028,44 @@ async def _handle_inbox_trigger( inbox_id=str(uuid.uuid4()), ) + def _handle_time_trigger( + self, value: WaitTimeTrigger, resume_trigger: UiPathResumeTrigger + ) -> None: + """Handle Timer-type resume triggers. + + Orchestrator expects timer resume triggers as a top-level + `resumeTime` value on the resume trigger DTO. + + Args: + value: The suspend value (WaitTimeTrigger) + resume_trigger: The resume trigger to populate + """ + resume_trigger.resume_time = value.resume_time + + def _create_invoke_process_timeout_trigger( + self, value: InvokeProcess, job_trigger: UiPathResumeTrigger + ) -> UiPathResumeTrigger: + """Create the timer side of an InvokeProcess timeout race.""" + timeout = value.timeout + if timeout is None: + raise ValueError("InvokeProcess timeout is required.") + if timeout <= 0: + raise ValueError("InvokeProcess timeout must be greater than zero.") + + resume_time = datetime.now(timezone.utc) + timedelta(seconds=timeout) + return UiPathResumeTrigger( + trigger_type=UiPathResumeTriggerType.TIMER, + trigger_name=UiPathResumeTriggerName.TIMER, + resume_time=resume_time, + payload={ + "kind": _TIMEOUT_TRIGGER_KIND, + "source": _TIMEOUT_SOURCE_INVOKE_PROCESS, + "timeout": timeout, + "jobKey": job_trigger.item_key, + "processName": value.name, + }, + ) + class UiPathResumeTriggerHandler: """Combined handler for creating and reading resume triggers. @@ -1005,6 +1092,10 @@ async def create_trigger(self, suspend_value: Any) -> UiPathResumeTrigger: """ return await self._creator.create_trigger(suspend_value) + async def create_triggers(self, suspend_value: Any) -> list[UiPathResumeTrigger]: + """Create resume triggers from a suspend value.""" + return await self._creator.create_triggers(suspend_value) + async def read_trigger(self, trigger: UiPathResumeTrigger) -> Any | None: """Read a resume trigger and convert it to runtime-compatible input.