Skip to content

Commit 95e44a3

Browse files
committed
Implement a background service that auto-syncs package ops
between instances
1 parent c1efe89 commit 95e44a3

File tree

30 files changed

+1469
-842
lines changed

30 files changed

+1469
-842
lines changed
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
2+
ALTER TABLE package_ops ADD COLUMN instance_id TEXT NULL
3+
REFERENCES instances(id) ON DELETE SET NULL;
4+
5+
-- Create index for querying ops by instance
6+
CREATE INDEX IF NOT EXISTS idx_package_ops_instance
7+
ON package_ops(instance_id) WHERE instance_id IS NOT NULL;

backend/src/BuiltinCli/Builtin.fs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ let builtins =
1414
Libs.File.builtins
1515
Libs.Execution.builtins
1616
Libs.Output.builtins
17+
Libs.Process.builtins
1718
Libs.Stdin.builtins
1819
Libs.Time.builtins
1920
Libs.Terminal.builtins ]

backend/src/BuiltinCli/BuiltinCli.fsproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
<Compile Include="Libs/File.fs" />
1515
<Compile Include="Libs/Output.fs" />
1616
<Compile Include="Libs/Execution.fs" />
17+
<Compile Include="Libs/Process.fs" />
1718
<Compile Include="Libs/Stdin.fs" />
1819
<Compile Include="Libs/Time.fs" />
1920
<Compile Include="Libs/Terminal.fs" />
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
/// Standard libraries for process management
2+
module BuiltinCli.Libs.Process
3+
4+
open System.Threading.Tasks
5+
open FSharp.Control.Tasks
6+
7+
open Prelude
8+
open LibExecution.RuntimeTypes
9+
10+
module Dval = LibExecution.Dval
11+
module Builtin = LibExecution.Builtin
12+
open Builtin.Shortcuts
13+
14+
let fns : List<BuiltInFn> =
15+
[ { name = fn "processSpawnBackground" 0
16+
typeParams = []
17+
parameters =
18+
[ Param.make "args" (TList TString) "Arguments to pass to the CLI" ]
19+
returnType = TypeReference.result TInt64 TString
20+
description =
21+
"Spawns the current CLI executable in the background with the given arguments. Returns the process ID (PID) on success."
22+
fn =
23+
(function
24+
| _state, _, _, [ DList(_vtTODO, args) ] ->
25+
uply {
26+
try
27+
let argStrings =
28+
args
29+
|> List.map (fun arg ->
30+
match arg with
31+
| DString s -> s
32+
| _ -> Exception.raiseInternal "Expected string arguments" [])
33+
34+
// Get the current executable path
35+
let currentExe =
36+
System.Diagnostics.Process.GetCurrentProcess().MainModule.FileName
37+
38+
let psi = System.Diagnostics.ProcessStartInfo()
39+
psi.FileName <- currentExe
40+
psi.UseShellExecute <- false
41+
psi.CreateNoWindow <- true
42+
43+
// Add arguments
44+
for arg in argStrings do
45+
psi.ArgumentList.Add(arg)
46+
47+
let proc = System.Diagnostics.Process.Start(psi)
48+
49+
if isNull proc then
50+
return
51+
Dval.resultError
52+
KTInt64
53+
KTString
54+
(DString "Failed to start background process")
55+
else
56+
return Dval.resultOk KTInt64 KTString (DInt64(int64 proc.Id))
57+
with ex ->
58+
return
59+
Dval.resultError
60+
KTInt64
61+
KTString
62+
(DString $"Error spawning process: {ex.Message}")
63+
}
64+
| _ -> incorrectArgs ())
65+
sqlSpec = NotYetImplemented
66+
previewable = Impure
67+
deprecated = NotDeprecated }
68+
69+
70+
{ name = fn "processGetPid" 0
71+
typeParams = []
72+
parameters = [ Param.make "unit" TUnit "" ]
73+
returnType = TInt64
74+
description = "Returns the current process ID (PID)."
75+
fn =
76+
(function
77+
| _, _, _, [ DUnit ] ->
78+
uply {
79+
let pid = System.Diagnostics.Process.GetCurrentProcess().Id
80+
return DInt64(int64 pid)
81+
}
82+
| _ -> incorrectArgs ())
83+
sqlSpec = NotYetImplemented
84+
previewable = Impure
85+
deprecated = NotDeprecated }
86+
87+
88+
{ name = fn "processIsRunning" 0
89+
typeParams = []
90+
parameters = [ Param.make "pid" TInt64 "Process ID to check" ]
91+
returnType = TBool
92+
description = "Checks if a process with the given PID is currently running."
93+
fn =
94+
(function
95+
| _, _, _, [ DInt64 pid ] ->
96+
uply {
97+
try
98+
let proc = System.Diagnostics.Process.GetProcessById(int pid)
99+
let isRunning = not proc.HasExited
100+
return DBool isRunning
101+
with
102+
| :? System.ArgumentException
103+
104+
| :? System.InvalidOperationException ->
105+
// Process doesn't exist or has exited
106+
return DBool false
107+
}
108+
| _ -> incorrectArgs ())
109+
sqlSpec = NotYetImplemented
110+
previewable = Impure
111+
deprecated = NotDeprecated }
112+
113+
114+
{ name = fn "processKill" 0
115+
typeParams = []
116+
parameters = [ Param.make "pid" TInt64 "Process ID to kill" ]
117+
returnType = TypeReference.result TUnit TString
118+
description =
119+
"Kills the process with the given PID. Returns unit on success, or an error message on failure."
120+
fn =
121+
(function
122+
| _state, _, _, [ DInt64 pid ] ->
123+
uply {
124+
try
125+
let proc = System.Diagnostics.Process.GetProcessById(int pid)
126+
proc.Kill()
127+
proc.WaitForExit(5000) |> ignore<bool>
128+
return Dval.resultOk KTUnit KTString DUnit
129+
with
130+
| :? System.ArgumentException ->
131+
return Dval.resultError KTUnit KTString (DString "Process not found")
132+
| ex ->
133+
return
134+
Dval.resultError
135+
KTUnit
136+
KTString
137+
(DString $"Error killing process: {ex.Message}")
138+
}
139+
| _ -> incorrectArgs ())
140+
sqlSpec = NotYetImplemented
141+
previewable = Impure
142+
deprecated = NotDeprecated } ]
143+
144+
145+
let builtins : Builtins = Builtin.make [] fns

backend/src/BuiltinPM/Libs/PackageOps.fs

Lines changed: 49 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,32 +21,38 @@ open Builtin.Shortcuts
2121
let packageOpTypeName =
2222
FQTypeName.fqPackage PackageIDs.Type.LanguageTools.ProgramTypes.packageOp
2323

24+
let packageOpBatchTypeName =
25+
FQTypeName.fqPackage PackageIDs.Type.LanguageTools.ProgramTypes.packageOpBatch
26+
2427

2528
// TODO: review/reconsider the accessibility of these fns
2629
let fns : List<BuiltInFn> =
2730
[ { name = fn "scmAddOps" 0
2831
typeParams = []
2932
parameters =
30-
[ Param.make "branchID" (TypeReference.option TUuid) ""
33+
[ Param.make "instanceID" (TypeReference.option TUuid) ""
34+
Param.make "branchID" (TypeReference.option TUuid) ""
3135
Param.make "ops" (TList(TCustomType(Ok packageOpTypeName, []))) "" ]
3236
returnType = TypeReference.result TInt64 TString
3337
description =
3438
"Add package ops to the database and apply them to projections.
35-
Returns Ok(insertedCount) on success (duplicates are skipped), or Error with message on failure."
39+
Pass None for instanceID for local ops, or Some(uuid) for ops from remote instances.
40+
Returns the number of inserted ops on success (duplicates are skipped), or an error message on failure."
3641
fn =
3742
let resultOk = Dval.resultOk KTInt64 KTString
3843
let resultError = Dval.resultError KTInt64 KTString
3944
(function
40-
| _, _, _, [ branchID; DList(_vtTODO, ops) ] ->
45+
| _, _, _, [ instanceID; branchID; DList(_vtTODO, ops) ] ->
4146
uply {
4247
try
4348
// Deserialize dvals
4449
let branchID = C2DT.Option.fromDT D.uuid branchID
50+
let instanceID = C2DT.Option.fromDT D.uuid instanceID
4551
let ops = ops |> List.choose PT2DT.PackageOp.fromDT
4652

4753
// Insert ops with deduplication, get count of actually inserted ops
4854
let! insertedCount =
49-
LibPackageManager.Inserts.insertAndApplyOps branchID ops
55+
LibPackageManager.Inserts.insertAndApplyOps instanceID branchID ops
5056

5157
return resultOk (DInt64 insertedCount)
5258
with ex ->
@@ -110,23 +116,49 @@ let fns : List<BuiltInFn> =
110116
{ name = fn "scmGetOpsSince" 0
111117
typeParams = []
112118
parameters =
113-
[ Param.make "branchID" (TypeReference.option TUuid) ""
119+
[ Param.make "targetInstanceID" (TypeReference.option TUuid) ""
114120
Param.make "since" TDateTime "" ]
115-
returnType = TList(TCustomType(Ok packageOpTypeName, []))
116-
description = "Get package ops created since the given timestamp."
121+
returnType = TList(TCustomType(Ok packageOpBatchTypeName, []))
122+
description =
123+
"Get all package ops (from ALL branches) created since the given timestamp, grouped by branch and instance.
124+
Optionally filters for a target instance (pass None to get all ops, or Some(uuid) to exclude ops from that target instance).
125+
Returns a list of PackageOpBatch, where each batch contains ops from one branch with the same instanceID."
117126
fn =
118127
function
119-
| _, _, _, [ branchID; DDateTime since ] ->
128+
| _, _, _, [ targetInstanceID; DDateTime since ] ->
120129
uply {
121-
let branchID = C2DT.Option.fromDT D.uuid branchID
122-
123-
let! ops = LibPackageManager.Queries.getOpsSince branchID since
124-
125-
return
126-
DList(
127-
VT.customType PT2DT.PackageOp.typeName [],
128-
ops |> List.map PT2DT.PackageOp.toDT
129-
)
130+
let targetID = C2DT.Option.fromDT D.uuid targetInstanceID
131+
132+
let! opsWithMetadata =
133+
LibPackageManager.Queries.getAllOpsSince targetID since
134+
135+
// Group by (branchID, instanceID)
136+
let grouped =
137+
opsWithMetadata
138+
|> List.groupBy (fun (_, branchID, instanceID) ->
139+
(branchID, instanceID))
140+
|> Map.toList
141+
142+
// Convert each group to a PackageOpBatch record
143+
let batches =
144+
grouped
145+
|> List.map (fun ((branchID, instanceID), ops) ->
146+
let opsList =
147+
ops
148+
|> List.map (fun (op, _, _) -> PT2DT.PackageOp.toDT op)
149+
|> fun opDvals ->
150+
DList(VT.customType packageOpTypeName [], opDvals)
151+
152+
let fields =
153+
[ ("branchID", branchID |> Option.map DUuid |> Dval.option KTUuid)
154+
("instanceID",
155+
instanceID |> Option.map DUuid |> Dval.option KTUuid)
156+
("ops", opsList) ]
157+
|> Map
158+
159+
DRecord(packageOpBatchTypeName, packageOpBatchTypeName, [], fields))
160+
161+
return DList(VT.customType packageOpBatchTypeName [], batches)
130162
}
131163
| _ -> incorrectArgs ()
132164
sqlSpec = NotQueryable

backend/src/LibExecution/PackageIDs.fs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,8 @@ module Type =
313313
p [] "SearchResults" "0660f9dc-a816-4185-9e5c-f936325f83d5"
314314

315315
let packageOp = p [] "PackageOp" "7d8e9f0a-1b2c-3d4e-5f6a-7b8c9d0e1f2a"
316+
let packageOpBatch =
317+
p [] "PackageOpBatch" "9f1a2b3c-4d5e-6f7a-8b9c-0d1e2f3a4b5c"
316318

317319
let secret = p [] "Secret" "37427120-d71d-41f2-b094-68757570bc41"
318320
let db = p [] "DB" "7f219668-f8ac-4b17-a404-1171985dadf9"

backend/src/LibPackageManager/Inserts.fs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,10 @@ let computeOpHash (op : PT.PackageOp) : System.Guid =
3838

3939
/// Insert PackageOps into the package_ops table and apply them to projection tables
4040
/// Returns the count of ops actually inserted (duplicates are skipped via INSERT OR IGNORE)
41+
/// CLEANUP: The 'applied' flag is currently always set to true and all ops are applied
42+
/// immediately. Should we reconsider this?
4143
let insertAndApplyOps
44+
(instanceID : Option<System.Guid>)
4245
(branchID : Option<PT.BranchID>)
4346
(ops : List<PT.PackageOp>)
4447
: Task<int64> =
@@ -60,8 +63,8 @@ let insertAndApplyOps
6063

6164
let sql =
6265
"""
63-
INSERT OR IGNORE INTO package_ops (id, branch_id, op_blob, applied)
64-
VALUES (@id, @branch_id, @op_blob, @applied)
66+
INSERT OR IGNORE INTO package_ops (id, branch_id, op_blob, applied, instance_id)
67+
VALUES (@id, @branch_id, @op_blob, @applied, @instance_id)
6568
"""
6669

6770
let parameters =
@@ -71,7 +74,11 @@ let insertAndApplyOps
7174
| Some id -> Sql.uuid id
7275
| None -> Sql.dbnull)
7376
"op_blob", Sql.bytes opBlob
74-
"applied", Sql.bool true ]
77+
"applied", Sql.bool true
78+
"instance_id",
79+
(match instanceID with
80+
| Some id -> Sql.uuid id
81+
| None -> Sql.dbnull) ]
7582

7683
(sql, [ parameters ]))
7784

0 commit comments

Comments
 (0)