-
Notifications
You must be signed in to change notification settings - Fork 12
Chunked trajectory download #172
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
…datasets in the data page. - Add loading indicators to "Export QR Codes" and "Download ZIP" buttons for improved UX. - Refactor ZIP generation for tokens and trajectories to optimize performance and memory usage. - Update `DB_HOST` and `STUDY_CONFIG` in `docker-compose-dev.yml` for NREL Commute study configuration.
…ta, including summaries per day and overall.
|
@Tvpower
Now your latest commit is pretty good, I do advise however to squash/remove commits that are just cleanup/mistake fixes that happened during development such as 17d8f43 or 2bfedf4. For example a good example of a commit is that of @JGreenlee's, for example: add config-update workflow and update_admin_access script |
|
In query_trajectories. -Collect first chunk of 250k and save as a list. When the limit is hit at 249999 get that query time stamp -With the last query time stamp request the trajectories again from that date to the selected end date. -With the next 250k request add that to the zip and check if its less than 250k if it is end at that point. If is more than or equal to check again for the next set of trajectories until the end date is reach and subsequent queries will be adjusted with the new start date. |
|
Better format to keep in mind to finish this:
Why it's a problem:
Solution:
|
…g support for large datasets with timestamp-based pagination and detailed summaries.
utils/db_utils.py
Outdated
| # Build MongoDB query for this chunk | ||
| mongo_query = { | ||
| "metadata.key": {"$in": key_list}, | ||
| "data.ts": {"$gte": current_start_ts, "$lt": end_ts} | ||
| } | ||
|
|
||
| # Add UUID exclusion to the query | ||
| if excluded_uuids: | ||
| excluded_uuid_objects = [UUID(uuid) for uuid in excluded_uuids] | ||
| mongo_query["user_id"] = {"$nin": excluded_uuid_objects} | ||
|
|
||
| # Query this chunk with limit | ||
| db = edb.get_analysis_timeseries_db() | ||
| cursor = db.find(mongo_query).sort("data.ts", 1).limit(chunk_limit) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use methods from https://github.com/e-mission/e-mission-server/blob/master/emission/storage/timeseries/abstract_timeseries.py instead of raw Mongo queries
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use the original query_trajectories as an example
It calls ts.find_entries
| ) | ||
|
|
||
| # Stage 2: Iterate through chunks using timestamp pagination | ||
| while current_start_ts < end_ts: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Everything inside the while seems to be the core of the chunking and I think that can be extracted to a generic function that will allow us to work around this 250k entry limit for ANY type of entry (i.e. not specific to trajectories), and not specific to Plotly Dash (i.e. it could be used outside the admin dashboard)
This function will basically be a wrapper around ts.find_entries with the chunking logic around it. It should essentially take the same arguments as find_entries:
- key_list
- for trajectories, that key was analysis/recreation_location or background/location. In your generic function, it could be anything, and that gets passed to
ts.find_entries
- for trajectories, that key was analysis/recreation_location or background/location. In your generic function, it could be anything, and that gets passed to
- time_query
- geo_query
- extra_query_list
- the query limit (default = 250k)
return a list (or iterator) of all the combined entries (which may be > 250k)
…ries_chunked`, introducing adaptive time windows and cleaner emission library integration.
… to use `date_query` parameter, remove adaptive time windows, and simplify chunked data processing with a fixed record limit. I was able to get 499k entries with this which is weird. Might need a reminder on how many were there originally
| return esds.cleaned2inferred_section_list(sections) | ||
|
|
||
|
|
||
| def query_entries_chunked(key_list, date_query, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current version of query_entries_chunked does not meet the requirements:
This function will basically be a wrapper around
ts.find_entrieswith the chunking logic around it. It should essentially take the same arguments asfind_entries:
- key_list
- for trajectories, that key was analysis/recreation_location or background/location. In your generic function, it could be anything, and that gets passed to
ts.find_entries- time_query
- geo_query
- extra_query_list
- the query limit (default = 250k)
return a list (or iterator) of all the combined entries (which may be > 250k)
Namely, it has date_query instead of time_query, and it yields and returns dataframes instead of simply returning a list.
|
@Tvpower I found out why this approach is not working.
However, This seems to be a deliberate decision made in e-mission/e-mission-server@453b6a8, so I think we should keep this as the default behavior of the But we can add an optional diff --git a/emission/storage/timeseries/builtin_timeseries.py b/emission/storage/timeseries/builtin_timeseries.py
index 93fd46bc..058dcbdb 100644
--- a/emission/storage/timeseries/builtin_timeseries.py
+++ b/emission/storage/timeseries/builtin_timeseries.py
@@ -200,8 +200,9 @@ class BuiltinTimeSeries(esta.TimeSeries):
return (orig_ts_db_keys, analysis_ts_db_keys)
def find_entries(self, key_list = None, time_query = None, geo_query = None,
- extra_query_list=None):
- sort_key = self._get_sort_key(time_query)
+ extra_query_list=None, sort_key=None):
+ if sort_key is None:
+ sort_key = self._get_sort_key(time_query)
logging.debug("curr_query = %s, sort_key = %s" %
(self._get_query(key_list, time_query, geo_query,
extra_query_list), sort_key)) |
|
With this fix applied, I wrote a basic implementation of ( @Tvpower Let me know whether you want to keep working on
|
@JGreenlee