build(om): wire messaging package CI#35
Conversation
egil
left a comment
There was a problem hiding this comment.
PR review findings
I reviewed the full PR diff against origin/main, including the new package, tests, workflow, docs, and the compiled public API surface. I also checked current Orleans streaming docs/API shape for implicit vs explicit subscriptions: Orleans implicit subscriptions are selected by [ImplicitStreamSubscription] + grain identity/namespace, but the grain still attaches observer logic; explicit subscriptions are durable handles that must be resumed to avoid duplicate subscriptions. See Microsoft docs: https://learn.microsoft.com/en-us/dotnet/orleans/streaming/streams-programming-apis.
Local validation during review: dotnet build Egil.Orleans.Messaging\Egil.Orleans.Messaging.slnx -c Release --no-restore passes.
Findings
P1 - Stream handler exceptions are swallowed, so failed messages can be accepted/lost
StreamManager.OnNextAsync catches every handler exception, records telemetry, invokes/logs onError, and then returns a successfully completed task (Egil.Orleans.Messaging/src/Egil.Orleans.Messaging/Streams/StreamManager.cs:343, :362, :370). Since StreamObserver<T>.OnNextAsync just returns that task (:523), Orleans sees the stream delivery as completed even when the user handler failed. That is surprising compared to Orleans' IAsyncObserver<T>.OnNextAsync contract, where the returned task is the natural way to surface a failed delivery. It also means a grain can fail before updating MessageTracker, while the stream provider still advances past the event.
The existing test Default_error_handler_does_not_break_subscription_when_handler_throws (Egil.Orleans.Messaging/test/Egil.Orleans.Messaging.Tests/Streams/StreamManagerTests.cs:126) locks in the swallow-and-continue behavior, but I think this should be an explicit error policy, not the default. Suggested shape: default to rethrow after telemetry/logging, and add an opt-in StreamErrorPolicy/disposition callback for users who intentionally want "log and continue" semantics. If onError remains, consider making it async and able to choose whether the exception is handled.
P1 - Missing implicit subscription configuration only logs and returns
If a grain has [ImplicitStreamSubscription] but forgets to call ConfigureImplicitSubscription(...), or configures a different namespace, IStreamManagerComponent.OnSubscribedAsync logs a warning and returns successfully (Egil.Orleans.Messaging/src/Egil.Orleans.Messaging/Streams/StreamManager.cs:207-:218). That makes a namespace mismatch a runtime log-only problem, and the stream activation can proceed without attaching any observer. This is the same class of misconfiguration as a missing StreamManager component, which already throws in IImplicitStreamGrain (Egil.Orleans.Messaging/src/Egil.Orleans.Messaging/Streams/IImplicitStreamGrain.cs:13-:20).
I would fail fast here with an InvalidOperationException that includes the provider and stream id/namespace. A warning is too easy to miss for a message-delivery path.
P2 - ConfigureExplicitSubscription looks like Orleans explicit streams, but only supports grain-keyed streams
The public API takes only (streamProviderName, streamNamespace, handler) (Egil.Orleans.Messaging/src/Egil.Orleans.Messaging/Streams/StreamManager.cs:124-:148). Internally, it always derives the stream id from the owning grain (:317-:323, :439-:452). That is narrower than Orleans explicit subscriptions, where the caller can subscribe to any StreamId from an IStreamProvider. It also makes the API hard to use for common explicit-stream cases: subscribing to a stream keyed by an order/customer id, fan-in from multiple streams, or using an explicit stream id unrelated to the grain id.
Two concrete API options:
- Add overloads that accept
StreamIdorFunc<TGrain, StreamId>/Func<StreamId>. - Rename the current overload to communicate the convention, e.g.
ConfigureGrainKeyedExplicitSubscriptionorConfigureExplicitSubscriptionForGrainKey, and layer the general Orleans-like overload beside it.
Also note that CreateStreamId discards the key extension returned by TryGetGuidKey/TryGetIntegerKey (:441-:449), so compound-key grains cannot round-trip their Orleans identity into the stream id convention.
P2 - C# 14 extension blocks leak generated public nested types into the package API
The extension block declarations (Egil.Orleans.Messaging/src/Egil.Orleans.Messaging/Outboxes/OutboxProcessorExtensions.cs:11, Streams/StreamManagerExtensions.cs:11, State/StateManagerExtensions.cs:11, State/StateManagerRegistrationExtensions.cs:11, etc.) compile into exported public nested types with generated names. Reflection over the Release assembly shows public types like:
Orleans.StreamManagerExtensions+<G>$5588D94CEEAB9159D96D3F2B24A5C17A...Microsoft.Extensions.DependencyInjection.OutboxPostmanServiceCollectionExtensions+<G>$7B30185E57B8298C6754F16C0BCF4773...
That is a public API surface concern for a NuGet package with package validation enabled: docs/reflection tools can see these names, and future compiler/lowering changes could look like API churn. If that exposure is acceptable, I would document that decision. If not, use classic extension methods while keeping the owner namespaces (Orleans, Orleans.Hosting, Microsoft.Extensions.DependencyInjection) for discoverability.
P2 - Postman dispatch has ambiguous subtype/base-type behavior
FindPostman returns the first registration whose filter matches (Egil.Orleans.Messaging/src/Egil.Orleans.Messaging/Outboxes/OutboxProcessor.cs:364-:371). If a processor registers both AddPostman<IOrderEvent>(...) and AddPostman<OrderSubmitted>(...), the selected handler depends entirely on registration order. The API design doc mentions first-registered-wins (Egil.Orleans.Messaging/api-design.md:1438-:1439), but the public API/README do not make that easy to discover, and it differs from the expectation many users will have from type-specific handler APIs.
I would either reject ambiguous registrations, dispatch to the most-specific registered message type, or document the rule in XML + README directly beside AddPostman<TSub>. The current behavior is easy to misuse when adding a catch-all postman later.
P2 - Duplicate keyed postman registrations are not surfaced
AddOutboxPostman<TMessage, TPostman> and the multi-contract registration path always call AddKeyedScoped (Egil.Orleans.Messaging/src/Egil.Orleans.Messaging/Outboxes/OutboxPostmanServiceCollectionExtensions.cs:47-:51, :82-:84). If two postmen register the same (postmanName, IPostman<TMessage>), the container will hold multiple keyed registrations and OutboxProcessor.AddPostman<TSub>(string) resolves a single required service at activation (Egil.Orleans.Messaging/src/Egil.Orleans.Messaging/Outboxes/OutboxProcessor.cs:104-:112). That makes a name collision order-dependent instead of a clear configuration error.
Given that postmanName is part of the public routing key, duplicate key+message registrations should probably fail during service registration or at least be detected during validation.
P2 - Public docs promise an outbox maximum that does not exist
The Outbox<T> XML docs say unbounded growth is mitigated by a configurable max via OutboxProcessorOptions<TOutbox>, with oldest messages auto-dropped FIFO (Egil.Orleans.Messaging/src/Egil.Orleans.Messaging/Outboxes/Outbox.cs:48-:51). The API design doc says the same (Egil.Orleans.Messaging/api-design.md:450-:458). OutboxProcessorOptions<TOutbox> has no max/dead-letter/drop property (Egil.Orleans.Messaging/src/Egil.Orleans.Messaging/Outboxes/OutboxProcessorOptions.cs:30-:83).
This is a public contract mismatch. Either implement the cap explicitly, or remove the promise from XML/design docs and state that cap/dead-letter policy remains the grain's responsibility.
P3 - ThreadPool execution mode is under-documented in XML docs
The API design correctly warns that ThreadPool postmen must not read or mutate activation-local grain state (Egil.Orleans.Messaging/api-design.md:1249-:1254), but the public XML docs only say it executes callbacks on the .NET thread pool (Egil.Orleans.Messaging/src/Egil.Orleans.Messaging/Outboxes/OutboxProcessorOptions.cs:14-:18). Since this is a runtime safety boundary in Orleans, that warning should be in XML docs on OutboxPostmanExecutionMode.ThreadPool and probably in README near the outbox processor examples.
P3 - The Event Hubs capability is a hard dependency for every consumer
The package references Microsoft.Orleans.Streaming.EventHubs directly (Egil.Orleans.Messaging/src/Egil.Orleans.Messaging/Egil.Orleans.Messaging.csproj:41). That means a consumer using only State, Outboxes, or Tracking still takes the Event Hubs dependency graph. The API design says one package is intentional for now, so this may be acceptable, but it is the main package/runtime footprint tradeoff I see. If dependency weight matters, Streams.EventHubs is the clean future split.
P3 - The PR includes unrelated agent skill files
The diff adds .agents/skills/aspire, .agents/skills/aspireify, .agents/skills/dotnet-inspect, and .agents/skills/playwright-cli alongside the messaging package. These do not appear tied to Egil.Orleans.Messaging behavior or CI. If they are intentional repo tooling, they should probably be split out; otherwise they add noise and maintenance surface to this package PR.
Test gaps I would add before merging
- Implicit subscription namespace mismatch throws instead of log-only no-op.
- Stream handler exception propagation/default policy, including tracker not being advanced.
- Explicit subscription to a non-grain-key
StreamId. - Compound grain key stream-id derivation, if grain-keyed explicit subscriptions remain.
- Base/subtype postman ambiguity.
- Duplicate keyed postman name+message registration.
- Public API snapshot or package inspection to catch generated extension-block surface if C# 14 extension blocks stay.
fe165eb to
591fcde
Compare
| if (current is RequestFailedException requestFailed | ||
| && requestFailed.Status is 412) |
There was a problem hiding this comment.
I think we can do better here. The question: When talking to azure storage (table/blob) what exceptions indicates that the server received the put request but without a doubt did not complete the transaction, vs what exceptions means we do not know, e.g., because the server did not respond with a status code?
Short answer: in the Azure SDK for .NET, "the server told us no" exceptions are always RequestFailedException carrying an HTTP Status and a storage ErrorCode. "We don't know" exceptions are the ones where no status code ever came back — i.e. transport/timeout exceptions, plus a small set of ambiguous 5xx codes where the server may have started processing before failing.
Definitively did NOT complete
These are RequestFailedException (Azure.Core) with a non‑zero Status and an ErrorCode. The server received the PUT, parsed it, and rejected it before committing. You can safely treat the operation as not applied.
- 400 Bad Request —
InvalidInput,InvalidXmlDocument,InvalidHeaderValue,OutOfRangeInput,InvalidUri,PropertyValueTooLarge,EntityTooLarge,InvalidPk/InvalidRketc. Request was rejected before being applied. - 401 Unauthorized —
AuthenticationFailed,InvalidAuthenticationInfo. Not applied. - 403 Forbidden —
AuthorizationFailure,InsufficientAccountPermissions,AccountIsDisabled,KeyVaultEncryptionKeyNotFound. Not applied. - 404 Not Found —
ContainerNotFound,BlobNotFound,TableNotFound,ResourceNotFound. Not applied. - 405 / 406 / 411 / 413 / 414 / 415 / 416 — payload / header / content-length / range validation failures. Not applied.
- 409 Conflict —
BlobAlreadyExists,ContainerAlreadyExists,TableAlreadyExists,EntityAlreadyExists,LeaseIdMismatchWithBlobOperation,LeaseAlreadyPresent. Your write was not applied (though something else is there — important nuance for idempotent retries: if you just retried, the prior attempt may actually have succeeded; see below). - 412 Precondition Failed —
ConditionNotMet,LeaseLost, ETag/If‑Match/If‑None‑Match mismatch. Not applied. This is the canonical "use optimistic concurrency to detect the retry-after-success case" code. - 428 Precondition Required, 431 — not applied.
- 503 Service Unavailable with
ServerBusy— throttled; the server rejected the request without processing it. Treat as not applied. (ErrorCode == "ServerBusy".) - 507 Insufficient Storage — not applied.
"We don't know" (no status, or ambiguous server-side)
You cannot conclude either way. The PUT may have been received, durably committed, and the ack lost on the way back. Retries must be idempotent (use If-None-Match: * for create, ETag/If-Match for update, or read-back to verify).
- No HTTP response received — pre‑response network/transport failures. In the modern SDK these surface as:
RequestFailedExceptionwithStatus == 0(no response) wrapping an inner exception, or- the raw transport exception:
HttpRequestException,IOException,SocketException(ConnectionReset,ConnectionAborted,TimedOut),System.Net.Http.HttpIOException. TaskCanceledException/OperationCanceledExceptionwhere theCancellationTokenyou passed was not cancelled (i.e. it's actually an HttpClient/per‑try timeout). If your own token was cancelled, you also don't know whether the bytes made it.AuthenticationException/ SSL/TLS errors mid‑handshake — unknown if anything was sent.
- 500 Internal Server Error (
InternalError,OperationTimedOut) — the server hit an internal failure; the write may or may not have been applied. Treat as unknown. - 502 Bad Gateway — front‑end/proxy error; backend state is unknown.
- 503 Service Unavailable without
ServerBusy(e.g. plain unavailable, no error code) — ambiguous; safest to treat as unknown. - 504 Gateway Timeout — request reached the service but the response was lost; unknown.
- 404 on a follow‑up read after a retry of a create that previously got an unknown error — still unknown for the original attempt, but if the resource is absent now you can conclude nothing was committed.
Practical guidance for Orleans‑style providers
When implementing Azure Table/Blob persistence, membership, reminders, or queue providers:
- Catch
RequestFailedExceptionand branch onex.Status/ex.ErrorCode:- 4xx and 503/
ServerBusy→ not applied, surface as a definite error to the caller (no idempotency concerns). - 500 / 502 / 503‑unknown / 504 → unknown, retry with ETag /
If-Match/If-None-Match: *so a duplicate commit is detected as 409 or 412.
- 4xx and 503/
- Anything not a
RequestFailedException(rawHttpRequestException,IOException,SocketException,TimeoutException,TaskCanceledExceptionfrom a per‑try timeout) → unknown, same idempotent‑retry rule. - On a retry that returns 409
*AlreadyExistsor 412ConditionNotMetafter a prior unknown attempt, treat that as evidence the prior attempt actually succeeded — read back and reconcile (and capture the new ETag) rather than failing the operation. - Don't conflate
OperationCanceledExceptionfrom a user‑supplied token (definitely cancelled, but the write state on the server is still unknown) with one from an internal HttpClient timeout. Either way, the server-side outcome is unknown.
Rule of thumb: RequestFailedException with a 4xx (or 503/ServerBusy) = definitive "no". Everything else that throws on a write = "unknown, must be retried idempotently".
| { | ||
| var tokenSet = tokens.ToHashSet(); | ||
| if (tokenSet.Count == 0) | ||
| { | ||
| return this; | ||
| } | ||
|
|
||
| var removeCount = 0; | ||
| while (removeCount < items.Length && tokenSet.Contains(items[removeCount].Token)) | ||
| { | ||
| removeCount++; | ||
| } | ||
|
|
||
| var remaining = items.RemoveRange(0, removeCount); | ||
| if (remaining.Length == items.Length) | ||
| { | ||
| return this; | ||
| } |
There was a problem hiding this comment.
The assumption is that tokens are in the same order in items and there are no skipped items.
It does not well handled if one token in the middle ran into a transient error and needs to be retried and is not included.
Something like this should cover holes in tokens:
| { | |
| var tokenSet = tokens.ToHashSet(); | |
| if (tokenSet.Count == 0) | |
| { | |
| return this; | |
| } | |
| var removeCount = 0; | |
| while (removeCount < items.Length && tokenSet.Contains(items[removeCount].Token)) | |
| { | |
| removeCount++; | |
| } | |
| var remaining = items.RemoveRange(0, removeCount); | |
| if (remaining.Length == items.Length) | |
| { | |
| return this; | |
| } | |
| { | |
| var itemBuilder = items.ToBuilder(); | |
| int startIndex = 0; | |
| int removeLength = 0; | |
| foreach(var removeToken in tokens) | |
| { | |
| for(int i = startIndex; i < itemBuilder.Count; i++) | |
| { | |
| if(removeToken == itemBuilder[i].Token) | |
| { | |
| removeLength++; | |
| } | |
| else | |
| { | |
| if(removeLength > 0) | |
| { | |
| itemBuilder.RemoveRange(startIndex, removeLength); | |
| } | |
| removeLength = 0; | |
| startIndex = 1; | |
| } | |
| } | |
| } |
| private static bool IsAmbiguousOrTransient(RequestFailedException exception) | ||
| { | ||
| return exception.Status is 0 | ||
| or (int)HttpStatusCode.RequestTimeout | ||
| or 429 | ||
| or >= 500 | ||
| || IsErrorCode( | ||
| exception, | ||
| BlobErrorCode.OperationTimedOut.ToString(), | ||
| BlobErrorCode.ServerBusy.ToString(), | ||
| BlobErrorCode.InternalError.ToString(), | ||
| TableErrorCode.OperationTimedOut.ToString(), | ||
| "ServerBusy", | ||
| "InternalError", | ||
| "AccountIOPSLimitExceeded"); | ||
| } | ||
|
|
||
| private static bool IsRejectedBeforePersistence(RequestFailedException exception) | ||
| { | ||
| return exception.Status is >= 400 and < 500 | ||
| || IsErrorCode( | ||
| exception, | ||
| BlobErrorCode.ConditionNotMet.ToString(), | ||
| BlobErrorCode.AppendPositionConditionNotMet.ToString(), | ||
| BlobErrorCode.MaxBlobSizeConditionNotMet.ToString(), | ||
| BlobErrorCode.SequenceNumberConditionNotMet.ToString(), | ||
| BlobErrorCode.SourceConditionNotMet.ToString(), | ||
| BlobErrorCode.TargetConditionNotMet.ToString(), | ||
| BlobErrorCode.BlobAlreadyExists.ToString(), | ||
| BlobErrorCode.BlobNotFound.ToString(), | ||
| BlobErrorCode.ContainerNotFound.ToString(), | ||
| BlobErrorCode.ResourceAlreadyExists.ToString(), | ||
| BlobErrorCode.ResourceNotFound.ToString(), | ||
| TableErrorCode.UpdateConditionNotSatisfied.ToString(), | ||
| TableErrorCode.EntityAlreadyExists.ToString(), | ||
| TableErrorCode.EntityNotFound.ToString(), | ||
| TableErrorCode.ResourceNotFound.ToString(), | ||
| TableErrorCode.TableNotFound.ToString()); | ||
| } |
There was a problem hiding this comment.
lets use a switch instead searching through an array
| private sealed record OutboxMessageEnvelopeJsonModel<T>( | ||
| OutboxSequenceToken? Token, | ||
| T Message); |
There was a problem hiding this comment.
Why do we need an jsonmodel object when it has the exact same signature as OutboxMessageEnvelope itself. Redundant?
There was a problem hiding this comment.
This does not follow the recommended pattern from Microsoft: https://learn.microsoft.com/en-us/dotnet/standard/serialization/system-text-json/converters-how-to#sample-factory-pattern-converter
egil
left a comment
There was a problem hiding this comment.
-
for orleans serializable types, dont use primary ctor properties, use regular required init only properties. add empty ctor and ctor that takes all properties and sets properties for convenience.
-
For custom json converters, lets drop using "json payload types" and instead follow conventions established in https://learn.microsoft.com/en-us/dotnet/standard/serialization/system-text-json/converters-how-to#sample-factory-pattern-converter.
-
follow code comment skill and ensure all files have appropriate code comments.
| public sealed record OutboxMessageEnvelope<T>( | ||
| [property: Id(0)] OutboxSequenceToken Token, | ||
| [property: Id(1)] T Message); No newline at end of file |
There was a problem hiding this comment.
For orleans serializable types it is better not to use primary ctor properties but have init only normal properties. An empty ctor and a ctor that takes both required properties is nice to have too.
There was a problem hiding this comment.
lets follow recommend pattern from here for this scenario: https://learn.microsoft.com/en-us/dotnet/standard/serialization/system-text-json/converters-how-to#sample-factory-pattern-converter
| public Func<ImmutableArray<(TOutbox Item, Exception Error, int Attempt)>, | ||
| CancellationToken, ValueTask>? ReconcileFailedAsync | ||
| { get; init; } |
There was a problem hiding this comment.
| public Func<ImmutableArray<(TOutbox Item, Exception Error, int Attempt)>, | |
| CancellationToken, ValueTask>? ReconcileFailedAsync | |
| { get; init; } | |
| public Func<ImmutableArray<(TOutbox Item, Exception Error, int Attempt)>, CancellationToken, ValueTask>? ReconcileFailedAsync { get; init; } |
one-liner
| public required Func<ImmutableArray<TOutbox>, CancellationToken, ValueTask> | ||
| AcknowledgePostedAsync | ||
| { get; init; } |
There was a problem hiding this comment.
| public required Func<ImmutableArray<TOutbox>, CancellationToken, ValueTask> | |
| AcknowledgePostedAsync | |
| { get; init; } | |
| public required Func<ImmutableArray<TOutbox>, CancellationToken, ValueTask> AcknowledgePostedAsync { get; init; } |
one liner
| /// Whether acknowledgement and failure reconciliation callbacks may | ||
| /// interleave with other grain calls when posting runs in the background. |
There was a problem hiding this comment.
lets use to point to the callbacks in question above to be explicit
| /// <summary> | ||
| /// Registers a custom keyed open-generic state manager factory on the silo builder. | ||
| /// </summary> | ||
| public ISiloBuilder AddStateManagerFactory( | ||
| string storageName, | ||
| Type openGenericFactoryType) | ||
| { | ||
| ArgumentNullException.ThrowIfNull(builder); | ||
| StateManagerRegistrationExtensions.ValidateStorageName(storageName); | ||
| StateManagerRegistrationExtensions.ValidateOpenGenericFactoryType(openGenericFactoryType); | ||
|
|
||
| builder.ConfigureServices(services => | ||
| services.AddStateManagerFactory(storageName, openGenericFactoryType)); | ||
| return builder; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Registers a custom keyed state manager factory for a specific state type on the silo builder. | ||
| /// </summary> | ||
| public ISiloBuilder AddStateManagerFactory<TState, TFactory>(string storageName) | ||
| where TState : class, IEquatable<TState> | ||
| where TFactory : class, IStateManagerFactory<TState> | ||
| { | ||
| ArgumentNullException.ThrowIfNull(builder); | ||
| StateManagerRegistrationExtensions.ValidateStorageName(storageName); | ||
|
|
||
| builder.ConfigureServices(services => | ||
| services.AddStateManagerFactory<TState, TFactory>(storageName)); | ||
| return builder; | ||
| } | ||
| } |
There was a problem hiding this comment.
| /// <summary> | |
| /// Registers a custom keyed open-generic state manager factory on the silo builder. | |
| /// </summary> | |
| public ISiloBuilder AddStateManagerFactory( | |
| string storageName, | |
| Type openGenericFactoryType) | |
| { | |
| ArgumentNullException.ThrowIfNull(builder); | |
| StateManagerRegistrationExtensions.ValidateStorageName(storageName); | |
| StateManagerRegistrationExtensions.ValidateOpenGenericFactoryType(openGenericFactoryType); | |
| builder.ConfigureServices(services => | |
| services.AddStateManagerFactory(storageName, openGenericFactoryType)); | |
| return builder; | |
| } | |
| /// <summary> | |
| /// Registers a custom keyed state manager factory for a specific state type on the silo builder. | |
| /// </summary> | |
| public ISiloBuilder AddStateManagerFactory<TState, TFactory>(string storageName) | |
| where TState : class, IEquatable<TState> | |
| where TFactory : class, IStateManagerFactory<TState> | |
| { | |
| ArgumentNullException.ThrowIfNull(builder); | |
| StateManagerRegistrationExtensions.ValidateStorageName(storageName); | |
| builder.ConfigureServices(services => | |
| services.AddStateManagerFactory<TState, TFactory>(storageName)); | |
| return builder; | |
| } | |
| } | |
| /// <summary> | |
| /// Registers a custom keyed open-generic state manager factory on the silo builder. | |
| /// </summary> | |
| public ISiloBuilder AddStateManagerFactory( | |
| string storageName, | |
| Type openGenericFactoryType) | |
| { | |
| ArgumentNullException.ThrowIfNull(builder); | |
| StateManagerRegistrationExtensions.ValidateStorageName(storageName); | |
| StateManagerRegistrationExtensions.ValidateOpenGenericFactoryType(openGenericFactoryType); | |
| builder.ConfigureServices(services => | |
| services.AddStateManagerFactory(storageName, openGenericFactoryType)); | |
| return builder; | |
| } | |
| /// <summary> | |
| /// Registers a custom keyed state manager factory for a specific state type on the silo builder. | |
| /// </summary> | |
| public ISiloBuilder AddStateManagerFactory<TFactory>(string storageName) | |
| where TFactory : class, IStateManagerFactory | |
| { | |
| ArgumentNullException.ThrowIfNull(builder); | |
| StateManagerRegistrationExtensions.ValidateStorageName(storageName); | |
| builder.ConfigureServices(services => | |
| services.AddStateManagerFactory<TFactory>(storageName)); | |
| return builder; | |
| } | |
| } |
| [Id(0)] | ||
| [JsonInclude] | ||
| public Guid Version { get; internal set; } = Guid.CreateVersion7(); |
There was a problem hiding this comment.
Potentially more flexible if we make this a string, and trim - from guid. still fine to use a guid as the base though.
| /// <summary> | ||
| /// Attempts to extract the broker-side enqueue time from the token. | ||
| /// </summary> | ||
| bool TryGetEnqueuedTime(out DateTimeOffset enqueuedTime); |
There was a problem hiding this comment.
is enqueued too eventhub specific or general enough?
There was a problem hiding this comment.
should these extension methods be on a JsonSerializationOption type instead. That way, they are added to a specific one, not to a static type somewhere. Seems more correct.
| public bool ProcessMessage( | ||
| string streamNamespace, | ||
| StreamSequenceToken? token, | ||
| out MessageTracker next) |
There was a problem hiding this comment.
it does not make sense to track if StreamSequenceToken is null. So, method signature should require that. Same elsewhere in this class, or is it better to allow it to be null and just return true in case of null?
|
Addressed the actionable review items in the latest push:
Validation after the fixes:
Open design questions I did not change yet:
|
Comprehensive API design document covering all 8 design sections: - §0: Scope, packaging (one NuGet, split-ready), name settled as Egil.Orleans.Messaging - §1: IStateManager<T> atomic writes with in-flight recovery - §2: Outbox<T> storage placement (co-located on grain state) - §3: Outbox<T> sealed class shape, epoch semantics, fingerprint equality, configurable max depth - §3a: VersionedState<TSelf> for ImmutableArray equality trap - §4: MessageTracker sealed class, dual-dictionary dedup - §5: StreamManager fluent builder facade - §6: Opinionated functional-grain pattern (guidance, not enforced) - §7: OutboxProcessor<T> grain-scoped timer+reminder dispatch with callback-based postmen, per-attempt error tracking - §8: Naming (flat namespace), serialization ([Alias] with fully qualified prefix, sequential [Id]), telemetry conventions All decisions settled via structured grilling sessions. Directory still named Egil.Orleans.CQRS — rename deferred to project scaffold. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Rename directory from Egil.Orleans.CQRS to Egil.Orleans.Messaging. Add solution file, Directory.Packages.props, version.json, global.json, src csproj (library), and test csproj. Package description: toolbox of composable building blocks — not a framework. Pick what you need, leave the rest. [skip notes] Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Limit StreamCursorJsonConverter to Orleans built-in stream sequence token types and remove custom token adapter serialization paths. Unsupported token kinds now throw a NotSupportedException with an actionable message that explains the stability rationale and points users to the GitHub issues page for feature requests. Update StreamCursorJsonConverterTests to assert unsupported custom tokens and unknown token kinds fail with the new guidance message while keeping built-in token round-trip coverage. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Enable JSON converter attribute on OutboxSequenceToken and implement OutboxSequenceTokenJsonConverter read/write paths. The converter now round-trips sequence, sender GrainId shape, timestamp, and epoch without requiring caller-side serializer options. Add focused tests for converter attribute registration, round-trip behavior (including multiple GrainId key shapes), and payload validation for missing sender. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Enable JSON converter factory attribute on OutboxMessageEnvelope<T> and implement OutboxMessageEnvelopeJsonConverterFactory with closed generic converters for read/write round-trip serialization. Add tests for attribute wiring, CanConvert behavior, round-trip serialization, and missing-token payload validation. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Implement StateManager<T> state cache, read/write/clear operations, force-write ETag override, and conflict handling that reconciles equivalent persisted state before rethrowing non-equivalent failures. Add focused tests for read refresh, successful writes, write-failure reconciliation behavior, rollback on conflicting writes, rollback when post-failure read fails, and VersionedState version stamping. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Implement AsStateManager<T> to validate input and wrap IPersistentState<T> with StateManager<T> for grain-facing usage. Add focused tests for successful wrapping and null-argument validation. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Enable StreamCursor JSON converter attribute and implement TryGetEnqueuedTime, TryGetStreamProviderName, and TryGetTraceParent to expose enrichment metadata when backed by EnrichedEventHubSequenceToken. Add focused tests for positive and negative cases across enqueued time, stream provider name, and traceparent access. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Implement UseEnrichedDataAdapter extension to validate input and register EnrichedEventHubAdapter through the Event Hub configurator using Orleans Serializer from DI. Add focused tests for null-argument validation and configurator callback registration. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Implement EnrichedEventHubAdapter overrides to stamp traceparent on queued messages, create EnrichedEventHubSequenceToken from cached Event Hub messages, and return enriched stream positions that preserve offset/sequence/event index and optional traceparent. Add focused tests for traceparent stamping behavior and enriched token construction from cached message metadata. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Add InitializeStateManager(...) that resolves keyed IStateManagerFactory<T> by storage name, with IServiceCollection and ISiloBuilder helper APIs for keyed singleton registrations. Refactor state management to StateManagerBase<T>/DefaultStateManager<T>, remove WritePolicy and force-write behavior, and drop reflection-based ETag mutation. Update state manager tests and API design docs to match the new factory wiring and write semantics. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
StreamManager now separates subscription configuration from activation-time subscription setup via RegisterStreamManager, AddSubscription, and SubscribeAsync. This lets grains await Orleans stream subscription establishment without blocking activation and keeps subscription failures visible. MessageTracker now records receive-lag histograms for accepted enriched stream cursors and outbox tokens, with shared telemetry plumbing kept in MessagingTelemetry. The Messaging test suite adds an in-process Orleans cluster with memory streams covering ValueTask, Task, default error handling, and resume-token behavior. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Rename InitializeStateManager to RegisterStateManager so state and stream activation helpers use the same registration language before the package ships. Update state manager docs and tests to match the new API name. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Capture the current OutboxProcessor design direction, including background posting, timer interleaving options, postman execution modes, and related telemetry guidance. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Implements the outbox processor registration and draining path, including postman dispatch, success acknowledgement, failure reconciliation, retry timer/reminder scheduling, and stream-backed integration coverage. Adds outbox and stream receive telemetry plus focused Orleans in-process tests for stream delivery, error paths, resume behavior, JSON converter branches, and tracker semantics. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Adds targeted tests for high-risk converter and tracker branches, including stream cursor token variants, malformed persisted JSON, and same-count MessageTracker equality mismatches. These tests raise branch coverage around runtime serialization and dedup semantics without padding low-value argument validation paths. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Stream cursors now carry namespace and provider metadata instead of using the Orleans stream id as the persisted tracking identity. Provider names are populated from subscription handles, with enriched Event Hub tokens taking precedence when present. This keeps tracking grain-local while still supporting provider-aware deduplication for non-conventional multi-provider namespace usage. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Document the Messaging package entry points, add it to the repository package list, and register its conventional commit scope. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Add the Messaging package workflow, include the package README in the NuGet artifact, and make Messaging tests consume Egil.Orleans.Testing as a package dependency instead of a sibling project reference. [skip notes] Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Capture the StreamManager implicit/explicit subscription split and the capability namespace layout before the refactor continues. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Move messaging tools into capability namespaces and matching test folders, with Outboxes used to avoid a namespace/type name collision. Replace StreamManager's AddSubscription/SubscribeAsync surface with explicit implicit/explicit configuration APIs and IImplicitStreamGrain forwarding through an attached grain component. BREAKING CHANGE: Messaging types moved from Egil.Orleans.Messaging into capability namespaces such as Egil.Orleans.Messaging.State, Egil.Orleans.Messaging.Outboxes, Egil.Orleans.Messaging.Tracking, and Egil.Orleans.Messaging.Streams. StreamManager now uses ConfigureImplicitSubscription, ConfigureExplicitSubscription, ResumeExplicitSubscriptionsAsync, and EnsureExplicitSubscriptionsAsync instead of AddSubscription and SubscribeAsync. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Use the activation-time MessageTracker snapshot to provide provider-aware resume tokens when StreamManager attaches implicit handles, resumes explicit handles, or creates missing explicit subscriptions. Add focused tests for provider-specific cursor lookup and the Orleans stream APIs that receive those tokens. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Throw clear configuration errors when implicit stream and outbox marker interfaces receive callbacks before their corresponding components have been attached. This avoids silently swallowing stream subscription or reminder callbacks when a grain implements the marker interface but forgets to call the registration helper during activation. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Allow grains to register StreamManager without providing a MessageTracker when they do not persist stream high-water marks. The tracked overloads remain source-compatible and still supply resume tokens when a tracker snapshot is provided. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Keep provider selection at the subscription boundary: implicit subscriptions get the provider from Orleans handle factories, and explicit subscriptions specify it when configured. This removes redundant RegisterStreamManager provider overloads and avoids carrying a provider default through StreamManager registration. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Align XML documentation indentation inside the C# extension block. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Run dotnet format against the messaging solution after the capability namespace refactor. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Make MessageTracker fully optional for StreamManager registration and let each implicit or explicit subscription opt out of passing the tracked resume token to Orleans. Remove the StreamCursor constructor that accepted StreamId so cursors are built from the stream namespace directly. Add tests and docs for the resume-token policy and include the outbox postman API plan. BREAKING CHANGE: StreamCursor no longer exposes a constructor that accepts StreamId; construct it with the stream namespace string instead. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Allow MessageTracker callers to process stream sequence tokens directly with explicit stream namespace or provider-qualified namespace, and expose latest stream sequence token lookup for resume-token use cases. Clarify stream manager tests by using distinct provider-name and namespace constants for explicit subscriptions. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Move DefaultStateManager and WriteFailureKind into their own files without changing state manager behavior. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Add IPostman<TMessage>, named postman attributes, IServiceCollection registration helpers, and an OutboxProcessor.AddPostman overload that resolves keyed postman services from the grain activation service provider. Cover keyed registration, scoped multi-contract resolution, successful keyed dispatch, and keyed dispatch failure reconciliation. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Add keyed outbox postman tests for caller cancellation and processor timeout, verifying both paths leave the outbox item pending and do not acknowledge or reconcile it as a per-item failure. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Add built-in stream and grain postman helpers so common Orleans fan-out targets can use the outbox processor without hand-written dispatch callbacks. Cover helper behavior through Orleans tests and fill validation coverage for keyed postman registration edge cases. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Fail fast when Orleans invokes an implicit stream subscription without a configured handler, and add explicit StreamId subscription overloads so explicit streams are not limited to the grain-keyed convention. Clarify outbox ownership, first-match postman dispatch, thread-pool postman safety, and future provider-specific package boundaries in public docs. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Move Event Hubs stream enrichment into Egil.Orleans.Messaging.Streams.EventHubs so the core package no longer carries the Event Hubs dependency. Add Egil.Orleans.Messaging.State.AzureStorage with Azure Table/Blob storage-aware state manager registration and failure classification for optimistic concurrency failures. Rename WriteFailureKind to StorageFailureKind so write and clear recovery can share the same provider classification model. Update CI to pack all messaging packages and document the companion package model. BREAKING CHANGE: Event Hubs enrichment types moved out of Egil.Orleans.Messaging into Egil.Orleans.Messaging.Streams.EventHubs, and WriteFailureKind was renamed to StorageFailureKind. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Classify deterministic Azure Storage failures as non-persistence outcomes and keep ambiguous transient results on the read-back recovery path. This lets StateManager skip unnecessary recovery reads for rejected mutations while preserving correctness for throttling, timeouts, and service failures. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Dispatch pending outbox items concurrently on the Orleans activation scheduler and remove the thread-pool execution mode. Background posting now allows delivery to interleave by default while durable acknowledgement and failure reconciliation remain non-interleaving unless explicitly opted in. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Add an explicit JsonConverter registry for concrete StreamSequenceToken types and route StreamCursor and MessageTracker token payloads through that registry. Register Event Hubs token converters from the enriched adapter setup so enriched checkpoints round-trip without downcasting provider metadata. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Document the Azure Storage classification behavior, outbox background posting model, and explicit stream token JSON converter registry. Keep the public README focused on observable behavior while leaving Orleans scheduling rationale in the API design notes. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Use a non-generic IStateManagerFactory with a generic Create<T> method so keyed registrations are easier to reason about and provider-specific factories do not need open generic registrations. Also simplifies Azure Storage failure code matching and documents the ambiguous clear recovery path.
Remove posted outbox tokens by identity so successful items after a failed gap can still be acknowledged. Also makes outbox token/envelope types Orleans-friendly with required init properties and rewrites the STJ converters to delegate through typed converters while preserving the existing JSON shape.
Do not persist a stream tracking entry when Orleans delivers a null StreamSequenceToken. Without a token there is no high-water mark to compare, so tokenless stream messages are accepted without changing the tracker instead of causing later tokenless messages to look like duplicates.
Replace the outbox processor drain flow with a processor-owned postman registry, envelope-aware postmen, and activation-local postman caching. Outboxes can now validate new items against the registered postmen immediately, while dispatch still resolves postmen lazily after activation or deserialization. Add optional reminder recovery through UseReminders and update docs/tests to use Outbox<T>.Empty, direct Outbox<T> pending snapshots, and AddPostman/AddStreamPostman/AddGrainPostman helpers.
db79089 to
6efdd12
Compare
What changed
Egil.Orleans.Messagingand listed it in the repository README.omConventional Commit scope for the Messaging package.egil-orleans-messagingGitHub Actions workflow.Egil.Orleans.Testingproject reference to the published NuGet package dependency.Why
The Messaging package should build and validate as an independent NuGet package boundary. Its tests should consume
Egil.Orleans.Testingthe same way external consumers do, and CI should validate that boundary on GitHub.Validation
dotnet test Egil.Orleans.Messaging\Egil.Orleans.Messaging.slnx -c Releasepassed: 123 tests.dotnet pack Egil.Orleans.Messaging\src\Egil.Orleans.Messaging\Egil.Orleans.Messaging.csproj -c Release --no-restore -p:ContinuousIntegrationBuild=true --output Egil.Orleans.Messaging\artifactssucceeded.dotnet meziantou.validate-nuget-package (Get-ChildItem "Egil.Orleans.Messaging\artifacts\*.nupkg") --excluded-rules IconMustBeSetreturnedIsValid: true.