Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/uipath-core/src/uipath/core/triggers/trigger.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Module defining resume trigger types and data models."""

from datetime import datetime
from enum import Enum
from typing import Any

Expand Down Expand Up @@ -87,6 +88,7 @@ class UiPathResumeTrigger(BaseModel):
integration_resume: UiPathIntegrationTrigger | None = Field(
default=None, alias="integrationResume"
)
resume_time: datetime | str | None = Field(default=None, alias="resumeTime")
folder_path: str | None = Field(default=None, alias="folderPath")
folder_key: str | None = Field(default=None, alias="folderKey")
payload: Any | None = Field(default=None, alias="interruptObject", exclude=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
WaitJobRaw,
WaitSystemAgent,
WaitTask,
WaitTimeTrigger,
)
from .paging import PagedResult

Expand Down Expand Up @@ -92,6 +93,7 @@
"DocumentExtractionValidation",
"WaitDocumentExtractionValidation",
"WaitIntegrationEvent",
"WaitTimeTrigger",
"RequestSpec",
"Endpoint",
"UiPathUrl",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Models for interrupt operations in UiPath platform."""

from datetime import datetime
from typing import Annotated, Any

from pydantic import BaseModel, ConfigDict, Field, model_validator
Expand Down Expand Up @@ -35,6 +36,7 @@ class InvokeProcess(BaseModel):
process_folder_key: str | None = None
input_arguments: dict[str, Any] | None
attachments: list[Attachment] | None = None
timeout: float | None = None


class WaitJob(BaseModel):
Expand Down Expand Up @@ -279,3 +281,11 @@ class WaitIntegrationEvent(BaseModel):
object_name: str
filter_expression: str | None = None
parameters: dict[str, str] | None = None


class WaitTimeTrigger(BaseModel):
"""Model representing a wait on an Orchestrator time trigger."""

resume_time: datetime | str = Field(alias="resumeTime")

model_config = ConfigDict(validate_by_name=True)
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
import os
import uuid
from datetime import datetime, timedelta, timezone
from typing import Any

from uipath.core.errors import (
Expand Down Expand Up @@ -49,6 +50,7 @@
WaitJobRaw,
WaitSystemAgent,
WaitTask,
WaitTimeTrigger,
)
from uipath.platform.connections import EventArguments
from uipath.platform.context_grounding import DeepRagStatus, IndexStatus
Expand All @@ -68,6 +70,9 @@
TriggerMarker,
)

_TIMEOUT_TRIGGER_KIND = "timeout"
_TIMEOUT_SOURCE_INVOKE_PROCESS = "InvokeProcess"


def _try_convert_to_json_format(value: str | None) -> Any:
"""Attempts to parse a string as JSON and returns the parsed object or original string.
Expand Down Expand Up @@ -129,6 +134,23 @@ async def read_trigger(self, trigger: UiPathResumeTrigger) -> Any | None:
UiPathRuntimeError: If reading fails, job failed, API connection failed,
trigger type is unknown, or HITL feedback retrieval failed.
"""
if trigger.trigger_type == UiPathResumeTriggerType.TIMER:
if (
isinstance(trigger.payload, dict)
and trigger.payload.get("kind") == _TIMEOUT_TRIGGER_KIND
):
return {
"timedOut": True,
"timeout": trigger.payload.get("timeout"),
"jobKey": trigger.payload.get("jobKey"),
"processName": trigger.payload.get("processName"),
PropertyName.INTERNAL.value: TriggerMarker.NO_CONTENT.value,
}
return {
"resumeTime": trigger.resume_time,
PropertyName.INTERNAL.value: TriggerMarker.NO_CONTENT.value,
}

uipath = UiPath()

match trigger.trigger_type:
Expand Down Expand Up @@ -439,6 +461,26 @@ class UiPathResumeTriggerCreator:
Implements UiPathResumeTriggerCreatorProtocol.
"""

async def create_triggers(self, suspend_value: Any) -> list[UiPathResumeTrigger]:
"""Create resume triggers from a suspend value.

Most values create a single trigger. `InvokeProcess(timeout=...)` creates
both the process-completion trigger and a timer trigger so whichever
condition happens first resumes the same interrupt.
"""
resume_trigger = await self.create_trigger(suspend_value)
if (
isinstance(suspend_value, InvokeProcess)
and not isinstance(suspend_value, InvokeProcessRaw)
and suspend_value.timeout is not None
):
timeout_trigger = self._create_invoke_process_timeout_trigger(
suspend_value, resume_trigger
)
return [resume_trigger, timeout_trigger]

return [resume_trigger]

async def create_trigger(self, suspend_value: Any) -> UiPathResumeTrigger:
"""Create a resume trigger from a suspend value.

Expand Down Expand Up @@ -484,6 +526,9 @@ async def create_trigger(self, suspend_value: Any) -> UiPathResumeTrigger:
case UiPathResumeTriggerType.INBOX:
await self._handle_inbox_trigger(suspend_value, resume_trigger)

case UiPathResumeTriggerType.TIMER:
self._handle_time_trigger(suspend_value, resume_trigger)

case UiPathResumeTriggerType.DEEP_RAG:
await self._handle_deep_rag_job_trigger(
suspend_value, resume_trigger
Expand Down Expand Up @@ -570,6 +615,8 @@ def _determine_trigger_type(self, value: Any) -> UiPathResumeTriggerType:
return UiPathResumeTriggerType.IXP_VS_ESCALATION
if isinstance(value, WaitIntegrationEvent):
return UiPathResumeTriggerType.INBOX
if isinstance(value, WaitTimeTrigger):
return UiPathResumeTriggerType.TIMER
# default to API trigger
return UiPathResumeTriggerType.API

Expand Down Expand Up @@ -606,6 +653,8 @@ def _determine_trigger_name(self, value: Any) -> UiPathResumeTriggerName:
return UiPathResumeTriggerName.EXTRACTION
if isinstance(value, WaitIntegrationEvent):
return UiPathResumeTriggerName.INBOX
if isinstance(value, WaitTimeTrigger):
return UiPathResumeTriggerName.TIMER
# default to API trigger
return UiPathResumeTriggerName.API

Expand Down Expand Up @@ -979,6 +1028,44 @@ async def _handle_inbox_trigger(
inbox_id=str(uuid.uuid4()),
)

def _handle_time_trigger(
self, value: WaitTimeTrigger, resume_trigger: UiPathResumeTrigger
) -> None:
"""Handle Timer-type resume triggers.

Orchestrator expects timer resume triggers as a top-level
`resumeTime` value on the resume trigger DTO.

Args:
value: The suspend value (WaitTimeTrigger)
resume_trigger: The resume trigger to populate
"""
resume_trigger.resume_time = value.resume_time

def _create_invoke_process_timeout_trigger(
self, value: InvokeProcess, job_trigger: UiPathResumeTrigger
) -> UiPathResumeTrigger:
"""Create the timer side of an InvokeProcess timeout race."""
timeout = value.timeout
if timeout is None:
raise ValueError("InvokeProcess timeout is required.")
if timeout <= 0:
raise ValueError("InvokeProcess timeout must be greater than zero.")

resume_time = datetime.now(timezone.utc) + timedelta(seconds=timeout)
return UiPathResumeTrigger(
trigger_type=UiPathResumeTriggerType.TIMER,
trigger_name=UiPathResumeTriggerName.TIMER,
resume_time=resume_time,
payload={
"kind": _TIMEOUT_TRIGGER_KIND,
"source": _TIMEOUT_SOURCE_INVOKE_PROCESS,
"timeout": timeout,
"jobKey": job_trigger.item_key,
"processName": value.name,
},
)


class UiPathResumeTriggerHandler:
"""Combined handler for creating and reading resume triggers.
Expand All @@ -1005,6 +1092,10 @@ async def create_trigger(self, suspend_value: Any) -> UiPathResumeTrigger:
"""
return await self._creator.create_trigger(suspend_value)

async def create_triggers(self, suspend_value: Any) -> list[UiPathResumeTrigger]:
"""Create resume triggers from a suspend value."""
return await self._creator.create_triggers(suspend_value)

async def read_trigger(self, trigger: UiPathResumeTrigger) -> Any | None:
"""Read a resume trigger and convert it to runtime-compatible input.

Expand Down
Loading