Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions backend/migrations/20251119_000000_multi_branch_sync.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@

ALTER TABLE package_ops ADD COLUMN instance_id TEXT NULL
REFERENCES instances(id) ON DELETE SET NULL;

-- Create index for querying ops by instance
CREATE INDEX IF NOT EXISTS idx_package_ops_instance
ON package_ops(instance_id) WHERE instance_id IS NOT NULL;
1 change: 1 addition & 0 deletions backend/src/BuiltinCli/Builtin.fs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ let builtins =
Libs.File.builtins
Libs.Execution.builtins
Libs.Output.builtins
Libs.Process.builtins
Libs.Stdin.builtins
Libs.Time.builtins
Libs.Terminal.builtins ]
Expand Down
1 change: 1 addition & 0 deletions backend/src/BuiltinCli/BuiltinCli.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
<Compile Include="Libs/File.fs" />
<Compile Include="Libs/Output.fs" />
<Compile Include="Libs/Execution.fs" />
<Compile Include="Libs/Process.fs" />
<Compile Include="Libs/Stdin.fs" />
<Compile Include="Libs/Time.fs" />
<Compile Include="Libs/Terminal.fs" />
Expand Down
131 changes: 131 additions & 0 deletions backend/src/BuiltinCli/Libs/Process.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/// Standard libraries for process management
module BuiltinCli.Libs.Process

open System.Threading.Tasks
open FSharp.Control.Tasks

open Prelude
open LibExecution.RuntimeTypes

module Dval = LibExecution.Dval
module Builtin = LibExecution.Builtin
open Builtin.Shortcuts

let fns : List<BuiltInFn> =
[ { name = fn "processSpawnBackground" 0
typeParams = []
parameters =
[ Param.make "args" (TList TString) "Arguments to pass to the CLI" ]
returnType = TypeReference.result TInt64 TString
description =
"Spawns the current CLI executable in the background with the given arguments. Returns the process ID (PID) on success."
fn =
(function
| _state, _, _, [ DList(_vtTODO, args) ] ->
uply {
try
let argStrings =
args
|> List.map (fun arg ->
match arg with
| DString s -> s
| _ -> Exception.raiseInternal "Expected string arguments" [])

// Get the current executable path
let currentExe =
System.Diagnostics.Process.GetCurrentProcess().MainModule.FileName

let psi = System.Diagnostics.ProcessStartInfo()
psi.FileName <- currentExe
psi.UseShellExecute <- false
psi.CreateNoWindow <- true
// Redirect to prevent inheriting parent's streams
psi.RedirectStandardOutput <- true
psi.RedirectStandardError <- true
psi.RedirectStandardInput <- true

// Add arguments
for arg in argStrings do
psi.ArgumentList.Add(arg)

let proc = System.Diagnostics.Process.Start(psi)

if isNull proc then
return
Dval.resultError
KTInt64
KTString
(DString "Failed to start background process")
else
return Dval.resultOk KTInt64 KTString (DInt64(int64 proc.Id))
with ex ->
return
Dval.resultError
KTInt64
KTString
(DString $"Error spawning process: {ex.Message}")
}
| _ -> incorrectArgs ())
sqlSpec = NotYetImplemented
previewable = Impure
deprecated = NotDeprecated }


{ name = fn "processIsRunning" 0
typeParams = []
parameters = [ Param.make "pid" TInt64 "Process ID to check" ]
returnType = TBool
description = "Checks if a process with the given PID is currently running."
fn =
(function
| _, _, _, [ DInt64 pid ] ->
uply {
try
let proc = System.Diagnostics.Process.GetProcessById(int pid)
let isRunning = not proc.HasExited
return DBool isRunning
with
| :? System.ArgumentException

| :? System.InvalidOperationException ->
// Process doesn't exist or has exited
return DBool false
}
| _ -> incorrectArgs ())
sqlSpec = NotYetImplemented
previewable = Impure
deprecated = NotDeprecated }


{ name = fn "processKill" 0
typeParams = []
parameters = [ Param.make "pid" TInt64 "Process ID to kill" ]
returnType = TypeReference.result TUnit TString
description =
"Kills the process with the given PID. Returns unit on success, or an error message on failure."
fn =
(function
| _state, _, _, [ DInt64 pid ] ->
uply {
try
let proc = System.Diagnostics.Process.GetProcessById(int pid)
proc.Kill()
proc.WaitForExit(5000) |> ignore<bool>
return Dval.resultOk KTUnit KTString DUnit
with
| :? System.ArgumentException ->
return Dval.resultError KTUnit KTString (DString "Process not found")
| ex ->
return
Dval.resultError
KTUnit
KTString
(DString $"Error killing process: {ex.Message}")
}
| _ -> incorrectArgs ())
sqlSpec = NotYetImplemented
previewable = Impure
deprecated = NotDeprecated } ]


let builtins : Builtins = Builtin.make [] fns
66 changes: 49 additions & 17 deletions backend/src/BuiltinPM/Libs/PackageOps.fs
Original file line number Diff line number Diff line change
Expand Up @@ -21,32 +21,38 @@ open Builtin.Shortcuts
let packageOpTypeName =
FQTypeName.fqPackage PackageIDs.Type.LanguageTools.ProgramTypes.packageOp

let packageOpBatchTypeName =
FQTypeName.fqPackage PackageIDs.Type.LanguageTools.ProgramTypes.packageOpBatch


// TODO: review/reconsider the accessibility of these fns
let fns : List<BuiltInFn> =
[ { name = fn "scmAddOps" 0
typeParams = []
parameters =
[ Param.make "branchID" (TypeReference.option TUuid) ""
[ Param.make "instanceID" (TypeReference.option TUuid) ""
Param.make "branchID" (TypeReference.option TUuid) ""
Param.make "ops" (TList(TCustomType(Ok packageOpTypeName, []))) "" ]
returnType = TypeReference.result TInt64 TString
description =
"Add package ops to the database and apply them to projections.
Returns Ok(insertedCount) on success (duplicates are skipped), or Error with message on failure."
Pass None for instanceID for local ops, or Some(uuid) for ops from remote instances.
Returns the number of inserted ops on success (duplicates are skipped), or an error message on failure."
fn =
let resultOk = Dval.resultOk KTInt64 KTString
let resultError = Dval.resultError KTInt64 KTString
(function
| _, _, _, [ branchID; DList(_vtTODO, ops) ] ->
| _, _, _, [ instanceID; branchID; DList(_vtTODO, ops) ] ->
uply {
try
// Deserialize dvals
let branchID = C2DT.Option.fromDT D.uuid branchID
let instanceID = C2DT.Option.fromDT D.uuid instanceID
let ops = ops |> List.choose PT2DT.PackageOp.fromDT

// Insert ops with deduplication, get count of actually inserted ops
let! insertedCount =
LibPackageManager.Inserts.insertAndApplyOps branchID ops
LibPackageManager.Inserts.insertAndApplyOps instanceID branchID ops

return resultOk (DInt64 insertedCount)
with ex ->
Expand Down Expand Up @@ -110,23 +116,49 @@ let fns : List<BuiltInFn> =
{ name = fn "scmGetOpsSince" 0
typeParams = []
parameters =
[ Param.make "branchID" (TypeReference.option TUuid) ""
[ Param.make "targetInstanceID" (TypeReference.option TUuid) ""
Param.make "since" TDateTime "" ]
returnType = TList(TCustomType(Ok packageOpTypeName, []))
description = "Get package ops created since the given timestamp."
returnType = TList(TCustomType(Ok packageOpBatchTypeName, []))
description =
"Get all package ops (from ALL branches) created since the given timestamp, grouped by branch and instance.
Optionally filters for a target instance (pass None to get all ops, or Some(uuid) to exclude ops from that target instance).
Returns a list of PackageOpBatch, where each batch contains ops from one branch with the same instanceID."
fn =
function
| _, _, _, [ branchID; DDateTime since ] ->
| _, _, _, [ targetInstanceID; DDateTime since ] ->
uply {
let branchID = C2DT.Option.fromDT D.uuid branchID

let! ops = LibPackageManager.Queries.getOpsSince branchID since

return
DList(
VT.customType PT2DT.PackageOp.typeName [],
ops |> List.map PT2DT.PackageOp.toDT
)
let targetID = C2DT.Option.fromDT D.uuid targetInstanceID

let! opsWithMetadata =
LibPackageManager.Queries.getAllOpsSince targetID since

// Group by (branchID, instanceID)
let grouped =
opsWithMetadata
|> List.groupBy (fun (_, branchID, instanceID) ->
(branchID, instanceID))
|> Map.toList

// Convert each group to a PackageOpBatch record
let batches =
grouped
|> List.map (fun ((branchID, instanceID), ops) ->
let opsList =
ops
|> List.map (fun (op, _, _) -> PT2DT.PackageOp.toDT op)
|> fun opDvals ->
DList(VT.customType packageOpTypeName [], opDvals)

let fields =
[ ("branchID", branchID |> Option.map DUuid |> Dval.option KTUuid)
("instanceID",
instanceID |> Option.map DUuid |> Dval.option KTUuid)
("ops", opsList) ]
|> Map

DRecord(packageOpBatchTypeName, packageOpBatchTypeName, [], fields))

return DList(VT.customType packageOpBatchTypeName [], batches)
}
| _ -> incorrectArgs ()
sqlSpec = NotQueryable
Expand Down
2 changes: 2 additions & 0 deletions backend/src/LibExecution/PackageIDs.fs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,8 @@ module Type =
p [] "SearchResults" "0660f9dc-a816-4185-9e5c-f936325f83d5"

let packageOp = p [] "PackageOp" "7d8e9f0a-1b2c-3d4e-5f6a-7b8c9d0e1f2a"
let packageOpBatch =
p [] "PackageOpBatch" "9f1a2b3c-4d5e-6f7a-8b9c-0d1e2f3a4b5c"

let secret = p [] "Secret" "37427120-d71d-41f2-b094-68757570bc41"
let db = p [] "DB" "7f219668-f8ac-4b17-a404-1171985dadf9"
Expand Down
1 change: 1 addition & 0 deletions backend/src/LibExecution/ProgramTypes.fs
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,7 @@ Short answer: while Ops are played out


type BranchID = uuid
type InstanceID = uuid

/// A package entity paired with its location
type LocatedItem<'T> = { entity : 'T; location : PackageLocation }
Expand Down
11 changes: 9 additions & 2 deletions backend/src/LibPackageManager/Inserts.fs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ let computeOpHash (op : PT.PackageOp) : System.Guid =

/// Insert PackageOps into the package_ops table and apply them to projection tables
/// Returns the count of ops actually inserted (duplicates are skipped via INSERT OR IGNORE)

// CLEANUP: The 'applied' flag is currently always set to true and all ops are applied immediately
let insertAndApplyOps
(instanceID : Option<PT.InstanceID>)
(branchID : Option<PT.BranchID>)
(ops : List<PT.PackageOp>)
: Task<int64> =
Expand All @@ -60,12 +63,16 @@ let insertAndApplyOps

let sql =
"""
INSERT OR IGNORE INTO package_ops (id, branch_id, op_blob, applied)
VALUES (@id, @branch_id, @op_blob, @applied)
INSERT OR IGNORE INTO package_ops (id, instance_id, branch_id, op_blob, applied)
VALUES (@id, @instance_id, @branch_id, @op_blob, @applied)
"""

let parameters =
[ "id", Sql.uuid opId
"instance_id",
(match instanceID with
| Some id -> Sql.uuid id
| None -> Sql.dbnull)
"branch_id",
(match branchID with
| Some id -> Sql.uuid id
Expand Down
52 changes: 33 additions & 19 deletions backend/src/LibPackageManager/Queries.fs
Original file line number Diff line number Diff line change
Expand Up @@ -73,45 +73,59 @@ let getRecentOpsAllBranches (limit : int64) : Task<List<PT.PackageOp>> =
}


/// Get package ops created since the specified datetime
let getOpsSince
(branchID : Option<PT.BranchID>)
/// Get all package ops (from ALL branches) created since the specified datetime
/// Returns ops with their branch IDs for multi-branch sync
///
/// targetInstanceID: Optional target instance ID to filter results for:
/// - None: Return all ops
/// - Some(uuid): Exclude ops where instance_id = uuid (used when pushing to target to avoid sending ops back to their source)
let getAllOpsSince
(targetInstanceID : Option<PT.InstanceID>)
(since : LibExecution.DarkDateTime.T)
: Task<List<PT.PackageOp>> =
: Task<List<PT.PackageOp * Option<PT.BranchID> * Option<PT.InstanceID>>> =
task {
let sinceStr = LibExecution.DarkDateTime.toIsoString since

match branchID with
| Some id ->
// Query specific branch only
match targetInstanceID with
| Some targetID ->
return!
Sql.query
"""
SELECT id, op_blob
SELECT id, op_blob, branch_id, instance_id
FROM package_ops
WHERE branch_id = @branch_id
AND datetime(created_at) > datetime(@since)
ORDER BY created_at ASC
WHERE datetime(created_at) > datetime(@since)
AND (instance_id IS NULL OR instance_id != @target_instance_id)
ORDER BY
CASE WHEN branch_id IS NULL THEN 0 ELSE 1 END,
created_at ASC
"""
|> Sql.parameters [ "branch_id", Sql.uuid id; "since", Sql.string sinceStr ]
|> Sql.parameters
[ "since", Sql.string sinceStr; "target_instance_id", Sql.uuid targetID ]
|> Sql.executeAsync (fun read ->
let opId = read.uuid "id"
let opBlob = read.bytes "op_blob"
BinarySerialization.PT.PackageOp.deserialize opId opBlob)
let branchID = read.uuidOrNone "branch_id"
let instanceID = read.uuidOrNone "instance_id"
let op = BinarySerialization.PT.PackageOp.deserialize opId opBlob
(op, branchID, instanceID))
| None ->
// Query main branch only (branch_id IS NULL)
return!
Sql.query
"""
SELECT id, op_blob
SELECT id, op_blob, branch_id, instance_id
FROM package_ops
WHERE branch_id IS NULL
AND datetime(created_at) > datetime(@since)
ORDER BY created_at ASC
WHERE datetime(created_at) > datetime(@since)
ORDER BY
CASE WHEN branch_id IS NULL THEN 0 ELSE 1 END,
created_at ASC
"""
|> Sql.parameters [ "since", Sql.string sinceStr ]
|> Sql.executeAsync (fun read ->
let opId = read.uuid "id"
let opBlob = read.bytes "op_blob"
BinarySerialization.PT.PackageOp.deserialize opId opBlob)
let branchID = read.uuidOrNone "branch_id"
let instanceID = read.uuidOrNone "instance_id"
let op = BinarySerialization.PT.PackageOp.deserialize opId opBlob
(op, branchID, instanceID))

}
Loading