-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfix.patch
More file actions
executable file
·69 lines (66 loc) · 2.92 KB
/
Copy pathfix.patch
File metadata and controls
executable file
·69 lines (66 loc) · 2.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
--- a/src/services/billing_delta.rs
+++ b/src/services/billing_delta.rs
@@ -82,43 +82,27 @@
info!("Syncing {} traffic deltas to database...", updates.len());
- let mut tasks = tokio::task::JoinSet::new();
- // Limit concurrency to avoid overloading the DB
- let semaphore = Arc::new(tokio::sync::Semaphore::new(50));
-
- for ((tag, email), delta) in updates {
- let client_clone = self.db.client.clone();
- let sem_clone = semaphore.clone();
-
- tasks.spawn(async move {
- let _permit = sem_clone.acquire().await.unwrap_or_else(|_| unreachable!());
- // optimized SurrealDB update using atomic increments
- // This query finds the user in the clients array and updates their traffic counters.
- // It uses the += operator for atomic persistence without read-modify-write cycles.
- let query = "
- UPDATE inbound
- SET settings.clients[WHERE email = $email].up += $up,
- settings.clients[WHERE email = $email].down += $down
- WHERE tag = $tag;
- ";
-
- let vars = serde_json::json!({
- "tag": tag,
- "email": email,
- "up": delta.up,
- "down": delta.down,
- });
-
- if let Err(e) = client_clone.query(query).bind(vars).await {
- error!("Atomic billing sync failed for user {} in tag {}: {}", email, tag, e);
- // In production, we might want to put these back in a retry buffer
- }
- });
- }
-
- while let Some(res) = tasks.join_next().await {
- if let Err(e) = res {
- error!("Task execution failed during billing delta sync: {}", e);
- }
+ let mut query_string = String::from("BEGIN TRANSACTION;\n");
+ let mut vars = serde_json::Map::new();
+
+ for (i, ((tag, email), delta)) in updates.into_iter().enumerate() {
+ query_string.push_str(&format!(
+ "UPDATE inbound
+ SET settings.clients[WHERE email = $email_{i}].up += $up_{i},
+ settings.clients[WHERE email = $email_{i}].down += $down_{i}
+ WHERE tag = $tag_{i};\n"
+ ));
+ vars.insert(format!("tag_{i}"), serde_json::json!(tag));
+ vars.insert(format!("email_{i}"), serde_json::json!(email));
+ vars.insert(format!("up_{i}"), serde_json::json!(delta.up));
+ vars.insert(format!("down_{i}"), serde_json::json!(delta.down));
}
+ query_string.push_str("COMMIT TRANSACTION;");
+
+ if let Err(e) = self.db.client.query(query_string).bind(serde_json::Value::Object(vars)).await {
+ error!("Atomic billing sync batch failed: {}", e);
+ }
+
Ok(())
}
}