-
-
Notifications
You must be signed in to change notification settings - Fork 176
Batch processing for Longest Listening Session #555
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
Hello, thanks a lot for this contribution. I am wondering, maybe this is nitpick, but am I right saying that this would not take into account listening sessions for new year's eve? As this would cut the session in two parts? Also, excluding the issue where mongo stores too much info in memory, do you have the the comparison of the http request time given the old and new implementation? Many thanks again for this pull request. |
Yeap, for sure, but I give you some options:
It's more fast because you built using asyncio and all request are send at same time and mongo was able to process at same time. /api/spotify/top/sessions?start=2020-09-14T00:00:00.000Z&end=2025-09-14T00:00:00.000Z New method for 5 years: ~3sec (2.8-3.2) Send using async small requets to mongo always will be more fast and less resource hungry |
|
Hi @Yooooomi, just a reminder in case you forgot. |
|
hoping this will be pulled. Nice |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR introduces batch processing for the longest listening session query to handle large datasets that previously caused timeouts. The implementation splits the date range into 1-year batches, processes each batch independently, and aggregates the results to return the top 5 longest sessions.
Key changes:
- Refactored the aggregation pipeline into a reusable
buildPipelinefunction that processes data in yearly batches - Added date validation and window creation logic to split the processing into manageable chunks
- Implemented post-aggregation sorting and limiting to combine results from all batches
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| } | ||
|
|
||
| const allSessions: any[] = []; | ||
|
|
Copilot
AI
Jan 3, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The 'buildPipeline' function lacks documentation explaining its purpose, parameters, and the complex MongoDB aggregation logic it contains. Consider adding a JSDoc comment that describes the function's purpose, the meaning of the 'from' and 'to' parameters, and a brief explanation of the session detection algorithm (e.g., sessions are detected when there's a gap greater than sessionBreakThreshold between consecutive tracks).
| /** | |
| * Builds a MongoDB aggregation pipeline that finds listening sessions for a given | |
| * time window. The pipeline filters tracks between the provided dates, orders | |
| * them chronologically, and uses a gap-based algorithm to split them into | |
| * sessions. | |
| * | |
| * A new session is started whenever the gap between the end of one track and | |
| * the start of the next track is greater than `sessionBreakThreshold` | |
| * (computed as the time between `$$value.last.played_at + durationMs` and | |
| * `$$this.played_at`). Consecutive tracks with a gap less than or equal to the | |
| * threshold are considered part of the same session. | |
| * | |
| * @param from - Start of the time window (inclusive) for which sessions are | |
| * computed. | |
| * @param to - End of the time window (exclusive or clamped to this boundary) | |
| * for which sessions are computed. | |
| * @returns A MongoDB aggregation pipeline array that, when run, returns the | |
| * detected sessions and their associated tracks within the given window. | |
| */ |
| const yearsStep = 1; | ||
|
|
||
| const item = { subtract, info: "$$this" }; | ||
| const startDate = new Date(start); | ||
| const endDate = new Date(end); | ||
|
|
||
| const longestSessions = await InfosModel.aggregate([ | ||
| ...basicMatch(userId, start, end), | ||
| { $sort: { played_at: 1 } }, | ||
| { | ||
| $group: { | ||
| _id: "$owner", | ||
| infos: { $push: "$$ROOT" }, | ||
| }, | ||
| }, | ||
| { | ||
| $addFields: { | ||
| distanceToLast: { | ||
| $reduce: { | ||
| input: "$infos", | ||
| initialValue: { distance: [], current: [] }, | ||
| in: { | ||
| distance: { | ||
| $concatArrays: [ | ||
| "$$value.distance", | ||
| { | ||
| $cond: { | ||
| if: { | ||
| $gt: [subtract, sessionBreakThreshold], | ||
| if (isNaN(startDate.getTime()) || isNaN(endDate.getTime()) || startDate >= endDate) { | ||
| return []; | ||
| } | ||
|
|
||
| const windows: { from: Date; to: Date }[] = []; | ||
| let cursor = new Date(startDate); | ||
| while (cursor < endDate) { | ||
| const next = new Date(cursor); | ||
| next.setFullYear(next.getFullYear() + yearsStep); | ||
| if (next > endDate) next.setTime(endDate.getTime()); | ||
| windows.push({ from: new Date(cursor), to: new Date(next) }); | ||
| cursor = next; | ||
| } |
Copilot
AI
Jan 3, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The batching logic lacks documentation explaining why data is processed in 1-year chunks. Consider adding a comment that explains this design decision, particularly noting that it's a performance optimization to handle large datasets (as mentioned in the PR description, it fixes timeout issues for datasets larger than 7 years).
| allSessions.sort((a, b) => b.sessionLength - a.sessionLength); | ||
| return allSessions.slice(0, 5); |
Copilot
AI
Jan 3, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The magic number '5' is hardcoded in two places (lines 788 and 813) representing the number of top sessions to return. Consider extracting this into a named constant at the function level (e.g., 'TOP_SESSIONS_LIMIT = 5') to improve maintainability and make it clear that these two values should be kept in sync.
| allSessions.sort((a, b) => b.sessionLength - a.sessionLength); | |
| return allSessions.slice(0, 5); | |
| const TOP_SESSIONS_LIMIT = 5; | |
| allSessions.sort((a, b) => b.sessionLength - a.sessionLength); | |
| return allSessions.slice(0, TOP_SESSIONS_LIMIT); |
| sessionBreakThreshold + 1, | ||
| ], | ||
| }; | ||
| const yearsStep = 1; |
Copilot
AI
Jan 3, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The variable name 'yearsStep' is defined but could be more descriptive. Consider renaming it to 'batchSizeYears' or 'yearsBatchSize' to better convey that it represents the batch size in years for processing the data.
|
|
||
| const allSessions: any[] = []; | ||
|
|
||
| const buildPipeline = (from: Date, to: Date): any[] => { |
Copilot
AI
Jan 3, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The pipeline array type is declared as 'any[]' which bypasses TypeScript's type checking. Consider defining a proper type for the MongoDB aggregation pipeline stages to improve type safety and code maintainability.
| cursor = next; | ||
| } | ||
|
|
||
| const allSessions: any[] = []; |
Copilot
AI
Jan 3, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The 'allSessions' array is typed as 'any[]' which bypasses TypeScript's type checking. Consider defining a proper interface for session objects to improve type safety and code maintainability.
| const next = new Date(cursor); | ||
| next.setFullYear(next.getFullYear() + yearsStep); | ||
| if (next > endDate) next.setTime(endDate.getTime()); | ||
| windows.push({ from: new Date(cursor), to: new Date(next) }); |
Copilot
AI
Jan 3, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sessions that span across year boundaries may be incorrectly split into separate sessions. When a listening session starts in one year batch and continues into the next, the current implementation will treat them as two separate sessions instead of one continuous session. Consider implementing logic to handle cross-boundary sessions by either checking the last session of a batch against the first session of the next batch, or by overlapping the batch windows by the sessionBreakThreshold duration.
| windows.push({ from: new Date(cursor), to: new Date(next) }); | |
| // Overlap windows by sessionBreakThreshold to avoid splitting sessions | |
| const isFirstWindow = windows.length === 0; | |
| const overlapFrom = isFirstWindow | |
| ? new Date(cursor) | |
| : new Date(Math.max(startDate.getTime(), cursor.getTime() - sessionBreakThreshold)); | |
| windows.push({ from: overlapFrom, to: new Date(next) }); |
| for (const w of windows) { | ||
| const chunkSessions = await InfosModel.aggregate(buildPipeline(w.from, w.to)); | ||
| chunkSessions.forEach((s: any) => { | ||
| s.full_tracks = Object.fromEntries( | ||
| s.full_tracks.map((t: any) => [t.id, t]), | ||
| ); | ||
| allSessions.push(s); | ||
| }); | ||
| } |
Copilot
AI
Jan 3, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The aggregation queries are executed sequentially in a loop, which means the total processing time will be the sum of all individual query times. Consider using Promise.all to execute all batch queries in parallel, which could significantly reduce the overall processing time especially for large date ranges spanning multiple years.
| for (const w of windows) { | |
| const chunkSessions = await InfosModel.aggregate(buildPipeline(w.from, w.to)); | |
| chunkSessions.forEach((s: any) => { | |
| s.full_tracks = Object.fromEntries( | |
| s.full_tracks.map((t: any) => [t.id, t]), | |
| ); | |
| allSessions.push(s); | |
| }); | |
| } | |
| const sessionPromises = windows.map((w) => | |
| InfosModel.aggregate(buildPipeline(w.from, w.to)), | |
| ); | |
| const chunkSessionsArrays = await Promise.all(sessionPromises); | |
| chunkSessionsArrays.forEach((chunkSessions) => { | |
| chunkSessions.forEach((s: any) => { | |
| s.full_tracks = Object.fromEntries( | |
| s.full_tracks.map((t: any) => [t.id, t]), | |
| ); | |
| allSessions.push(s); | |
| }); | |
| }); |
Split the processing in range of 1 year, exaclty the same pipeline but after loop every year aggr the result and return the top 5 based on
sessionLength:Docker image (based on LSIO) using this code:
docker pull ghcr.io/webysther/your_spotify:nightlyBuild:
docker build --build-arg BUILD_DATE=2025-09-12 --build-arg VERSION=nightly --pull -t ghcr.io/webysther/your_spotify:nightly .Performance
GET /api/spotify/top/sessions?start=2015-12-13T03:12:42.000Z&end=2025-09-12T19:49:25.889Z: ~6secs / 3MBFixes #499
PS: Without this, only worked for ~7 years of data.