From 5d55fdd54cbd4ca071f3ec7a5fedcf8e6fc7e758 Mon Sep 17 00:00:00 2001 From: waleed Date: Thu, 15 Jan 2026 18:29:24 -0800 Subject: [PATCH 1/2] improvement(presence): show presence for the same user in another tab, fix z-index of multiplayer cursor to fall behind panel,terminal,sidebar but above blocks, improved connection detection --- .../components/cursors/cursors.tsx | 14 +- .../workflow-item/avatars/avatars.tsx | 11 +- .../workspace/providers/socket-provider.tsx | 136 +++++++----------- 3 files changed, 63 insertions(+), 98 deletions(-) diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/cursors/cursors.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/cursors/cursors.tsx index 1cd5831b69..efbc0741d5 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/cursors/cursors.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/cursors/cursors.tsx @@ -2,7 +2,6 @@ import { memo, useMemo } from 'react' import { useViewport } from 'reactflow' -import { useSession } from '@/lib/auth/auth-client' import { getUserColor } from '@/lib/workspaces/colors' import { usePreventZoom } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks' import { useSocket } from '@/app/workspace/providers/socket-provider' @@ -20,30 +19,31 @@ interface CursorRenderData { } const CursorsComponent = () => { - const { presenceUsers } = useSocket() + const { presenceUsers, currentSocketId } = useSocket() const viewport = useViewport() - const session = useSession() - const currentUserId = session.data?.user?.id const preventZoomRef = usePreventZoom() const cursors = useMemo(() => { return presenceUsers .filter((user): user is typeof user & { cursor: CursorPoint } => Boolean(user.cursor)) - .filter((user) => user.userId !== currentUserId) + .filter((user) => user.socketId !== currentSocketId) .map((user) => ({ id: user.socketId, name: user.userName?.trim() || 'Collaborator', cursor: user.cursor, color: getUserColor(user.userId), })) - }, [currentUserId, presenceUsers]) + }, [currentSocketId, presenceUsers]) if (!cursors.length) { return null } return ( -
+
{cursors.map(({ id, name, cursor, color }) => { const x = cursor.x * viewport.zoom + viewport.x const y = cursor.y * viewport.zoom + viewport.y diff --git a/apps/sim/app/workspace/[workspaceId]/w/components/sidebar/components/workflow-list/components/workflow-item/avatars/avatars.tsx b/apps/sim/app/workspace/[workspaceId]/w/components/sidebar/components/workflow-list/components/workflow-item/avatars/avatars.tsx index 7ffcf90859..fce2782406 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/components/sidebar/components/workflow-list/components/workflow-item/avatars/avatars.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/components/sidebar/components/workflow-list/components/workflow-item/avatars/avatars.tsx @@ -2,7 +2,6 @@ import { type CSSProperties, useMemo } from 'react' import { Avatar, AvatarFallback, AvatarImage, Tooltip } from '@/components/emcn' -import { useSession } from '@/lib/auth/auth-client' import { getUserColor } from '@/lib/workspaces/colors' import { useSocket } from '@/app/workspace/providers/socket-provider' import { SIDEBAR_WIDTH } from '@/stores/constants' @@ -81,9 +80,7 @@ function UserAvatar({ user, index }: UserAvatarProps) { * @returns Avatar stack for workflow presence */ export function Avatars({ workflowId }: AvatarsProps) { - const { presenceUsers, currentWorkflowId } = useSocket() - const { data: session } = useSession() - const currentUserId = session?.user?.id + const { presenceUsers, currentWorkflowId, currentSocketId } = useSocket() const sidebarWidth = useSidebarStore((state) => state.sidebarWidth) /** @@ -99,14 +96,14 @@ export function Avatars({ workflowId }: AvatarsProps) { /** * Only show presence for the currently active workflow. - * Filter out the current user from the list. + * Filter out the current socket connection (allows same user's other tabs to appear). */ const workflowUsers = useMemo(() => { if (currentWorkflowId !== workflowId) { return [] } - return presenceUsers.filter((user) => user.userId !== currentUserId) - }, [presenceUsers, currentWorkflowId, workflowId, currentUserId]) + return presenceUsers.filter((user) => user.socketId !== currentSocketId) + }, [presenceUsers, currentWorkflowId, workflowId, currentSocketId]) /** * Calculate visible users and overflow count diff --git a/apps/sim/app/workspace/providers/socket-provider.tsx b/apps/sim/app/workspace/providers/socket-provider.tsx index 5f6d25cfb9..312036039b 100644 --- a/apps/sim/app/workspace/providers/socket-provider.tsx +++ b/apps/sim/app/workspace/providers/socket-provider.tsx @@ -36,6 +36,7 @@ interface SocketContextType { isConnected: boolean isConnecting: boolean currentWorkflowId: string | null + currentSocketId: string | null presenceUsers: PresenceUser[] joinWorkflow: (workflowId: string) => void leaveWorkflow: () => void @@ -55,7 +56,6 @@ interface SocketContextType { emitCursorUpdate: (cursor: { x: number; y: number } | null) => void emitSelectionUpdate: (selection: { type: 'block' | 'edge' | 'none'; id?: string }) => void - // Event handlers for receiving real-time updates onWorkflowOperation: (handler: (data: any) => void) => void onSubblockUpdate: (handler: (data: any) => void) => void onVariableUpdate: (handler: (data: any) => void) => void @@ -75,6 +75,7 @@ const SocketContext = createContext({ isConnected: false, isConnecting: false, currentWorkflowId: null, + currentSocketId: null, presenceUsers: [], joinWorkflow: () => {}, leaveWorkflow: () => {}, @@ -108,14 +109,13 @@ export function SocketProvider({ children, user }: SocketProviderProps) { const [isConnected, setIsConnected] = useState(false) const [isConnecting, setIsConnecting] = useState(false) const [currentWorkflowId, setCurrentWorkflowId] = useState(null) + const [currentSocketId, setCurrentSocketId] = useState(null) const [presenceUsers, setPresenceUsers] = useState([]) const initializedRef = useRef(false) - // Get current workflow ID from URL params const params = useParams() const urlWorkflowId = params?.workflowId as string | undefined - // Use refs to store event handlers to avoid stale closures const eventHandlers = useRef<{ workflowOperation?: (data: any) => void subblockUpdate?: (data: any) => void @@ -131,9 +131,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { operationFailed?: (data: any) => void }>({}) - // Helper function to generate a fresh socket token const generateSocketToken = async (): Promise => { - // Avoid overlapping token requests const res = await fetch('/api/auth/socket-token', { method: 'POST', credentials: 'include', @@ -146,11 +144,9 @@ export function SocketProvider({ children, user }: SocketProviderProps) { return token } - // Initialize socket when user is available - only once per session useEffect(() => { if (!user?.id) return - // Only initialize if we don't have a socket and aren't already connecting if (initializedRef.current || socket || isConnecting) { logger.info('Socket already exists or is connecting, skipping initialization') return @@ -171,12 +167,12 @@ export function SocketProvider({ children, user }: SocketProviderProps) { }) const socketInstance = io(socketUrl, { - transports: ['websocket', 'polling'], // Keep polling fallback for reliability + transports: ['websocket', 'polling'], withCredentials: true, - reconnectionAttempts: Number.POSITIVE_INFINITY, // Socket.IO handles base reconnection - reconnectionDelay: 1000, // Start with 1 second delay - reconnectionDelayMax: 30000, // Max 30 second delay - timeout: 10000, // Back to original timeout + reconnectionAttempts: Number.POSITIVE_INFINITY, + reconnectionDelay: 1000, + reconnectionDelayMax: 30000, + timeout: 10000, auth: async (cb) => { try { const freshToken = await generateSocketToken() @@ -188,24 +184,21 @@ export function SocketProvider({ children, user }: SocketProviderProps) { }, }) - // Connection events socketInstance.on('connect', () => { setIsConnected(true) setIsConnecting(false) + setCurrentSocketId(socketInstance.id ?? null) logger.info('Socket connected successfully', { socketId: socketInstance.id, connected: socketInstance.connected, transport: socketInstance.io.engine?.transport?.name, }) - // Automatically join the current workflow room based on URL - // This handles both initial connections and reconnections if (urlWorkflowId) { logger.info(`Joining workflow room after connection: ${urlWorkflowId}`) socketInstance.emit('join-workflow', { workflowId: urlWorkflowId, }) - // Update our internal state to match the URL setCurrentWorkflowId(urlWorkflowId) } }) @@ -213,12 +206,12 @@ export function SocketProvider({ children, user }: SocketProviderProps) { socketInstance.on('disconnect', (reason) => { setIsConnected(false) setIsConnecting(false) + setCurrentSocketId(null) logger.info('Socket disconnected', { reason, }) - // Clear presence when disconnected setPresenceUsers([]) }) @@ -232,7 +225,6 @@ export function SocketProvider({ children, user }: SocketProviderProps) { transport: error.transport, }) - // Authentication errors now indicate either session expiry or token generation issues if ( error.message?.includes('Token validation failed') || error.message?.includes('Authentication failed') || @@ -241,19 +233,16 @@ export function SocketProvider({ children, user }: SocketProviderProps) { logger.warn( 'Authentication failed - this could indicate session expiry or token generation issues' ) - // The fresh token generation on each attempt should handle most cases automatically - // If this persists, user may need to refresh page or re-login } }) - // Socket.IO provides reconnection logging with attempt numbers socketInstance.on('reconnect', (attemptNumber) => { + setCurrentSocketId(socketInstance.id ?? null) logger.info('Socket reconnected successfully', { attemptNumber, socketId: socketInstance.id, transport: socketInstance.io.engine?.transport?.name, }) - // Note: Workflow rejoining is handled by the 'connect' event which fires for both initial connections and reconnections }) socketInstance.on('reconnect_attempt', (attemptNumber) => { @@ -276,32 +265,24 @@ export function SocketProvider({ children, user }: SocketProviderProps) { setIsConnecting(false) }) - // Presence events socketInstance.on('presence-update', (users: PresenceUser[]) => { setPresenceUsers(users) }) - // Note: user-joined and user-left events removed in favor of authoritative presence-update - - // Workflow operation events socketInstance.on('workflow-operation', (data) => { eventHandlers.current.workflowOperation?.(data) }) - // Subblock update events socketInstance.on('subblock-update', (data) => { eventHandlers.current.subblockUpdate?.(data) }) - // Variable update events socketInstance.on('variable-update', (data) => { eventHandlers.current.variableUpdate?.(data) }) - // Workflow deletion events socketInstance.on('workflow-deleted', (data) => { logger.warn(`Workflow ${data.workflowId} has been deleted`) - // Clear current workflow ID if it matches the deleted workflow if (currentWorkflowId === data.workflowId) { setCurrentWorkflowId(null) setPresenceUsers([]) @@ -309,19 +290,16 @@ export function SocketProvider({ children, user }: SocketProviderProps) { eventHandlers.current.workflowDeleted?.(data) }) - // Workflow revert events socketInstance.on('workflow-reverted', (data) => { logger.info(`Workflow ${data.workflowId} has been reverted to deployed state`) eventHandlers.current.workflowReverted?.(data) }) - // Shared function to rehydrate workflow stores const rehydrateWorkflowStores = async ( workflowId: string, workflowState: any, source: 'copilot' | 'workflow-state' ) => { - // Import stores dynamically const [ { useOperationQueueStore }, { useWorkflowRegistry }, @@ -336,14 +314,12 @@ export function SocketProvider({ children, user }: SocketProviderProps) { import('@/stores/workflow-diff/store'), ]) - // Only proceed if this is the active workflow const { activeWorkflowId } = useWorkflowRegistry.getState() if (activeWorkflowId !== workflowId) { logger.info(`Skipping rehydration - workflow ${workflowId} is not active`) return false } - // Check for pending operations const hasPending = useOperationQueueStore .getState() .operations.some((op: any) => op.workflowId === workflowId && op.status !== 'confirmed') @@ -352,7 +328,6 @@ export function SocketProvider({ children, user }: SocketProviderProps) { return false } - // Extract subblock values from blocks const subblockValues: Record> = {} Object.entries(workflowState.blocks || {}).forEach(([blockId, block]) => { const blockState = block as any @@ -362,7 +337,6 @@ export function SocketProvider({ children, user }: SocketProviderProps) { }) }) - // Replace local workflow store with authoritative server state useWorkflowStore.setState({ blocks: workflowState.blocks || {}, edges: workflowState.edges || [], @@ -372,7 +346,6 @@ export function SocketProvider({ children, user }: SocketProviderProps) { deploymentStatuses: workflowState.deploymentStatuses || {}, }) - // Replace subblock store values for this workflow useSubBlockStore.setState((state: any) => ({ workflowValues: { ...state.workflowValues, @@ -384,14 +357,12 @@ export function SocketProvider({ children, user }: SocketProviderProps) { return true } - // Copilot workflow edit events (database has been updated, rehydrate stores) socketInstance.on('copilot-workflow-edit', async (data) => { logger.info( `Copilot edited workflow ${data.workflowId} - rehydrating stores from database` ) try { - // Fetch fresh workflow state directly from API const response = await fetch(`/api/workflows/${data.workflowId}`) if (response.ok) { const responseData = await response.json() @@ -408,39 +379,60 @@ export function SocketProvider({ children, user }: SocketProviderProps) { } }) - // Operation confirmation events socketInstance.on('operation-confirmed', (data) => { logger.debug('Operation confirmed', { operationId: data.operationId }) eventHandlers.current.operationConfirmed?.(data) }) - // Operation failure events socketInstance.on('operation-failed', (data) => { logger.warn('Operation failed', { operationId: data.operationId, error: data.error }) eventHandlers.current.operationFailed?.(data) }) - // Cursor update events socketInstance.on('cursor-update', (data) => { - setPresenceUsers((prev) => - prev.map((user) => - user.socketId === data.socketId ? { ...user, cursor: data.cursor } : user - ) - ) + setPresenceUsers((prev) => { + const existingIndex = prev.findIndex((user) => user.socketId === data.socketId) + if (existingIndex !== -1) { + return prev.map((user) => + user.socketId === data.socketId ? { ...user, cursor: data.cursor } : user + ) + } + return [ + ...prev, + { + socketId: data.socketId, + userId: data.userId, + userName: data.userName, + avatarUrl: data.avatarUrl, + cursor: data.cursor, + }, + ] + }) eventHandlers.current.cursorUpdate?.(data) }) - // Selection update events socketInstance.on('selection-update', (data) => { - setPresenceUsers((prev) => - prev.map((user) => - user.socketId === data.socketId ? { ...user, selection: data.selection } : user - ) - ) + setPresenceUsers((prev) => { + const existingIndex = prev.findIndex((user) => user.socketId === data.socketId) + if (existingIndex !== -1) { + return prev.map((user) => + user.socketId === data.socketId ? { ...user, selection: data.selection } : user + ) + } + return [ + ...prev, + { + socketId: data.socketId, + userId: data.userId, + userName: data.userName, + avatarUrl: data.avatarUrl, + selection: data.selection, + }, + ] + }) eventHandlers.current.selectionUpdate?.(data) }) - // Enhanced error handling for new server events socketInstance.on('error', (error) => { logger.error('Socket error:', error) }) @@ -451,7 +443,6 @@ export function SocketProvider({ children, user }: SocketProviderProps) { socketInstance.on('operation-forbidden', (error) => { logger.warn('Operation forbidden:', error) - // Could show a toast notification to user }) socketInstance.on('operation-confirmed', (data) => { @@ -477,10 +468,8 @@ export function SocketProvider({ children, user }: SocketProviderProps) { } } - // Start the socket initialization initializeSocket() - // Cleanup on unmount only (not on user change since socket is session-level) return () => { positionUpdateTimeouts.current.forEach((timeoutId) => { clearTimeout(timeoutId) @@ -490,24 +479,20 @@ export function SocketProvider({ children, user }: SocketProviderProps) { } }, [user?.id]) - // Handle workflow room switching when URL changes (for navigation between workflows) useEffect(() => { if (!socket || !isConnected || !urlWorkflowId) return - // If we're already in the correct workflow room, no need to switch if (currentWorkflowId === urlWorkflowId) return logger.info( `URL workflow changed from ${currentWorkflowId} to ${urlWorkflowId}, switching rooms` ) - // Leave current workflow first if we're in one if (currentWorkflowId) { logger.info(`Leaving current workflow ${currentWorkflowId} before joining ${urlWorkflowId}`) socket.emit('leave-workflow') } - // Join the new workflow room logger.info(`Joining workflow room: ${urlWorkflowId}`) socket.emit('join-workflow', { workflowId: urlWorkflowId, @@ -515,7 +500,6 @@ export function SocketProvider({ children, user }: SocketProviderProps) { setCurrentWorkflowId(urlWorkflowId) }, [socket, isConnected, urlWorkflowId, currentWorkflowId]) - // Cleanup socket on component unmount useEffect(() => { return () => { if (socket) { @@ -525,7 +509,6 @@ export function SocketProvider({ children, user }: SocketProviderProps) { } }, []) - // Join workflow room const joinWorkflow = useCallback( (workflowId: string) => { if (!socket || !user?.id) { @@ -533,13 +516,11 @@ export function SocketProvider({ children, user }: SocketProviderProps) { return } - // Prevent duplicate joins to the same workflow if (currentWorkflowId === workflowId) { logger.info(`Already in workflow ${workflowId}, skipping join`) return } - // Leave current workflow first if we're in one if (currentWorkflowId) { logger.info(`Leaving current workflow ${currentWorkflowId} before joining ${workflowId}`) socket.emit('leave-workflow') @@ -547,14 +528,13 @@ export function SocketProvider({ children, user }: SocketProviderProps) { logger.info(`Joining workflow: ${workflowId}`) socket.emit('join-workflow', { - workflowId, // Server gets user info from authenticated session + workflowId, }) setCurrentWorkflowId(workflowId) }, [socket, user, currentWorkflowId] ) - // Leave current workflow room const leaveWorkflow = useCallback(() => { if (socket && currentWorkflowId) { logger.info(`Leaving workflow: ${currentWorkflowId}`) @@ -566,7 +546,6 @@ export function SocketProvider({ children, user }: SocketProviderProps) { setCurrentWorkflowId(null) setPresenceUsers([]) - // Clean up any pending position updates positionUpdateTimeouts.current.forEach((timeoutId) => { clearTimeout(timeoutId) }) @@ -575,18 +554,15 @@ export function SocketProvider({ children, user }: SocketProviderProps) { } }, [socket, currentWorkflowId]) - // Light throttling for position updates to ensure smooth collaborative movement const positionUpdateTimeouts = useRef>(new Map()) const pendingPositionUpdates = useRef>(new Map()) - // Emit workflow operations (blocks, edges, subflows) const emitWorkflowOperation = useCallback( (operation: string, target: string, payload: any, operationId?: string) => { if (!socket || !currentWorkflowId) { return } - // Apply light throttling only to position updates for smooth collaborative experience const isPositionUpdate = operation === 'update-position' && target === 'block' const { commit = true } = payload || {} @@ -631,30 +607,27 @@ export function SocketProvider({ children, user }: SocketProviderProps) { positionUpdateTimeouts.current.set(blockId, timeoutId) } } else { - // For all non-position updates, emit immediately socket.emit('workflow-operation', { operation, target, payload, timestamp: Date.now(), - operationId, // Include operation ID for queue tracking + operationId, }) } }, [socket, currentWorkflowId] ) - // Emit subblock value updates const emitSubblockUpdate = useCallback( (blockId: string, subblockId: string, value: any, operationId?: string) => { - // Only emit if socket is connected and we're in a valid workflow room if (socket && currentWorkflowId) { socket.emit('subblock-update', { blockId, subblockId, value, timestamp: Date.now(), - operationId, // Include operation ID for queue tracking + operationId, }) } else { logger.warn('Cannot emit subblock update: no socket connection or workflow room', { @@ -668,17 +641,15 @@ export function SocketProvider({ children, user }: SocketProviderProps) { [socket, currentWorkflowId] ) - // Emit variable value updates const emitVariableUpdate = useCallback( (variableId: string, field: string, value: any, operationId?: string) => { - // Only emit if socket is connected and we're in a valid workflow room if (socket && currentWorkflowId) { socket.emit('variable-update', { variableId, field, value, timestamp: Date.now(), - operationId, // Include operation ID for queue tracking + operationId, }) } else { logger.warn('Cannot emit variable update: no socket connection or workflow room', { @@ -692,7 +663,6 @@ export function SocketProvider({ children, user }: SocketProviderProps) { [socket, currentWorkflowId] ) - // Cursor throttling optimized for database connection health const lastCursorEmit = useRef(0) const emitCursorUpdate = useCallback( (cursor: { x: number; y: number } | null) => { @@ -708,7 +678,6 @@ export function SocketProvider({ children, user }: SocketProviderProps) { return } - // Reduced to 30fps (33ms) to reduce database load while maintaining smooth UX if (now - lastCursorEmit.current >= 33) { socket.emit('cursor-update', { cursor }) lastCursorEmit.current = now @@ -717,7 +686,6 @@ export function SocketProvider({ children, user }: SocketProviderProps) { [socket, currentWorkflowId] ) - // Emit selection updates const emitSelectionUpdate = useCallback( (selection: { type: 'block' | 'edge' | 'none'; id?: string }) => { if (socket && currentWorkflowId) { @@ -727,7 +695,6 @@ export function SocketProvider({ children, user }: SocketProviderProps) { [socket, currentWorkflowId] ) - // Event handler registration functions const onWorkflowOperation = useCallback((handler: (data: any) => void) => { eventHandlers.current.workflowOperation = handler }, []) @@ -779,6 +746,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { isConnected, isConnecting, currentWorkflowId, + currentSocketId, presenceUsers, joinWorkflow, leaveWorkflow, From cd6e205d509455d7ddaf3f15fc7ddd289976db44 Mon Sep 17 00:00:00 2001 From: waleed Date: Thu, 15 Jan 2026 18:42:23 -0800 Subject: [PATCH 2/2] upsert users into presence list --- .../app/workspace/providers/socket-provider.tsx | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/apps/sim/app/workspace/providers/socket-provider.tsx b/apps/sim/app/workspace/providers/socket-provider.tsx index 312036039b..f173864273 100644 --- a/apps/sim/app/workspace/providers/socket-provider.tsx +++ b/apps/sim/app/workspace/providers/socket-provider.tsx @@ -266,7 +266,21 @@ export function SocketProvider({ children, user }: SocketProviderProps) { }) socketInstance.on('presence-update', (users: PresenceUser[]) => { - setPresenceUsers(users) + setPresenceUsers((prev) => { + const prevMap = new Map(prev.map((u) => [u.socketId, u])) + + return users.map((user) => { + const existing = prevMap.get(user.socketId) + if (existing) { + return { + ...user, + cursor: user.cursor ?? existing.cursor, + selection: user.selection ?? existing.selection, + } + } + return user + }) + }) }) socketInstance.on('workflow-operation', (data) => {