@@ -29,7 +29,9 @@ import U.Codebase.Sqlite.Orphans ()
2929import Unison.Debug qualified as Debug
3030import Unison.Hash32 (Hash32 )
3131import Unison.Share.API.Hash (HashJWT , HashJWTClaims (.. ))
32+ import Unison.Share.API.Hash qualified as HashJWT
3233import Unison.SyncV3.Types
34+ import Unison.SyncV3.Utils (entityDependencies )
3335import Unison.Util.Websockets (Queues (.. ), withQueues )
3436import UnliftIO qualified
3537import UnliftIO.STM
@@ -90,6 +92,7 @@ doSyncEmitter mayCallerUserId conn = do
9092 syncState <- case initMsg of
9193 ReceiverInitStream initMsg -> initialize onErr mayCallerUserId initMsg
9294 other -> onErr $ InitializationError (" Expected ReceiverInitStream message, got: " <> tShow other)
95+ Debug. debugLogM Debug. Temp " Initialized sync state, starting sync process."
9396 lift (shareEmitter syncState q)
9497 >>= maybe (pure () ) (onErr)
9598 where
@@ -102,24 +105,29 @@ doSyncEmitter mayCallerUserId conn = do
102105 Right <$> action (fmap absurd . cc . Left )
103106 -- If we get an error, send it to the client then shut down.
104107 handleErr ::
108+ (Show err ) =>
105109 Queues (MsgOrError err a ) o ->
106110 WebApp (Either err () ) ->
107111 WebApp ()
108112 handleErr (Queues {send, shutdown}) action = do
109113 action >>= \ case
110114 Left err -> do
115+ Debug. debugM Debug. Temp " Sync error, shutting down: " err
111116 atomically $ do
112117 send (Err err)
113118 liftIO $ shutdown
114119 Right r -> pure r
115120
116121initialize :: (forall x . SyncError -> SyncM x ) -> (Maybe UserId ) -> InitMsg HashJWT -> SyncM (SyncState sh Hash32 )
117122initialize onErr caller InitMsg {initMsgRootCausal, initMsgBranchRef} = do
123+ let decoded = HashJWT. decodeHashJWT initMsgRootCausal
124+ Debug. debugM Debug. Temp " Decoded root causal hash jwt" decoded
125+ Debug. debugM Debug. Temp " Caller: " caller
118126 HashJWTClaims {hash = initialCausalHash} <-
119127 lift (HashJWT. verifyHashJWT caller initMsgRootCausal) >>= \ case
120128 Right ch -> pure ch
121129 Left err -> onErr $ HashJWTVerificationError (AuthN. authErrMsg err)
122- validRequestsVar <- newTVarIO Set. empty
130+ validRequestsVar <- newTVarIO ( Set. singleton ( CausalEntity , initialCausalHash))
123131 requestedEntitiesVar <- newTVarIO (Set. singleton (CausalEntity , initialCausalHash))
124132 entitiesAlreadySentVar <- newTVarIO Set. empty
125133 (lift . runExceptT $ codebaseForBranchRef initMsgBranchRef) >>= \ case
@@ -160,47 +168,45 @@ shareEmitter SyncState {requestedEntitiesVar, entitiesAlreadySentVar, validReque
160168 let onErrSTM :: SyncError -> STM ()
161169 onErrSTM e = do
162170 UnliftIO. putTMVar errVar e
171+
172+ Debug. debugLogM Debug. Temp " Launching workers"
163173 Ki. fork scope $ sendWorker onErrSTM
164174 Ki. fork scope $ receiveWorker onErrSTM
175+ Debug. debugLogM Debug. Temp " Waiting on errors or completion..."
165176 atomically ((Ki. awaitAll scope $> Nothing ) <|> (Just <$> UnliftIO. takeTMVar errVar))
166177 where
167178 sendWorker :: (SyncError -> STM () ) -> WebApp ()
168179 sendWorker onErrSTM = forever $ do
169- reqs <- atomically $ do
180+ (validRequests, reqs) <- atomically $ do
170181 reqs <- readTVar requestedEntitiesVar
182+ writeTVar requestedEntitiesVar Set. empty
183+ alreadySent <- readTVar entitiesAlreadySentVar
184+ Debug. debugM Debug. Temp " Processing Requested entities: " reqs
171185 validRequests <- readTVar validRequestsVar
172- let forbiddenRequests = Set. difference reqs validRequests
173- validRequests <-
174- if not (Set. null forbiddenRequests)
175- then do
176- onErrSTM (ForbiddenEntityRequest forbiddenRequests)
177- pure $ Set. difference validRequests forbiddenRequests
178- else do
179- pure validRequests
180- guard (not $ Set. null validRequests)
181- -- TODO: Add reasonable batch sizes
182- modifyTVar' requestedEntitiesVar (const Set. empty)
183- sent <- readTVar entitiesAlreadySentVar
184- let unsent = Set. difference reqs sent
185- guard (not $ Set. null unsent)
186- pure unsent
187- newEntities <- fetchEntities codebase reqs
188- -- let hashMappings :: Map HashTag Hash32
189- -- hashMappings =
190- -- newEntities
191- -- & toListOf (folded . entityHashesGetter_)
192- -- & Map.fromList
193- -- atomically $ do
194- -- alreadyMapped <- readTVar mappedHashesVar
195- -- let newMappings = Map.difference hashMappings alreadyMapped
196- -- modifyTVar' mappedHashesVar (Map.union newMappings)
197- -- send (HashMappingsMsg (HashMappings (newMappings)))
198-
199- atomically $ do
200- let newHashes = setOf (folded . to (entityKind &&& entityHash)) newEntities
201- modifyTVar' entitiesAlreadySentVar (Set. union newHashes)
202- for newEntities \ entity -> do
203- send $ Msg (EmitterEntityMsg entity)
186+ pure (validRequests, reqs `Set.difference` alreadySent)
187+ let forbiddenRequests = Set. difference reqs validRequests
188+ validatedRequests <-
189+ if not (Set. null forbiddenRequests)
190+ then do
191+ atomically (onErrSTM (ForbiddenEntityRequest forbiddenRequests))
192+ pure $ Set. difference validRequests forbiddenRequests
193+ else do
194+ pure validRequests
195+ Debug. debugM Debug. Temp " Validated requests: " validatedRequests
196+ when (not $ Set. null validatedRequests) $ do
197+ Debug. debugM Debug. Temp " Fetching Entities." validatedRequests
198+ newEntities <- fetchEntities codebase validatedRequests
199+ Debug. debugM Debug. Temp " Fetched entities: " (length newEntities)
200+ -- Do work outside of transactions to avoid conflicts
201+ deps <- UnliftIO. evaluate $ foldMap entityDependencies newEntities
202+ Debug. debugLogM Debug. Temp " Adding new valid requests"
203+ atomically $ modifyTVar' validRequestsVar (\ s -> Set. union s deps)
204+ Debug. debugM Debug. Temp " Sending entities: " (length newEntities)
205+ atomically $ do
206+ let newHashes = setOf (folded . to (entityKind &&& entityHash)) newEntities
207+ modifyTVar' entitiesAlreadySentVar (Set. union newHashes)
208+ for_ newEntities \ entity -> do
209+ send $ Msg (EmitterEntityMsg entity)
204210
205211 receiveWorker :: (SyncError -> STM () ) -> WebApp ()
206212 receiveWorker onErrSTM = forever $ do
@@ -209,8 +215,12 @@ shareEmitter SyncState {requestedEntitiesVar, entitiesAlreadySentVar, validReque
209215 Err err -> onErrSTM err
210216 Msg (ReceiverInitStream {}) -> onErrSTM (InitializationError " Received duplicate ReceiverInitStream message" )
211217 Msg (ReceiverEntityRequest (EntityRequestMsg {hashes})) -> do
218+ Debug. debugM Debug. Temp " Got new entity requests" hashes
212219 modifyTVar' requestedEntitiesVar (\ s -> Set. union s (Set. fromList hashes))
213220
214221fetchEntities :: CodebaseEnv -> Set (EntityKind , Hash32 ) -> WebApp (Vector (Entity Hash32 Text ))
215222fetchEntities codebase reqs = do
216223 PG. runTransaction $ Q. fetchSerialisedEntities codebase reqs
224+
225+ -- entityDependencies :: Entity hash text -> Set (EntityKind, Hash32)
226+ -- entityDependencies (Entity {entityData}) = do
0 commit comments