diff --git a/volatility3/framework/automagic/symbol_cache.py b/volatility3/framework/automagic/symbol_cache.py index 327575e969..ff75e86c6d 100644 --- a/volatility3/framework/automagic/symbol_cache.py +++ b/volatility3/framework/automagic/symbol_cache.py @@ -239,7 +239,16 @@ def find_location( results = self._database.cursor().execute(statement, parameters).fetchall() result = None for row in results: - result = row["location"] + local_filepath = self._get_local_filepath(row["location"]) + if not ( + local_filepath is None + or local_filepath.startswith(tuple(constants.SYMBOL_BASEPATHS)) + ): + vollog.debug( + f"Location {row['location']} found but outside of the registered symbol paths" + ) + else: + result = row["location"] return result def get_local_locations(self) -> Generator[str, None, None]: @@ -249,7 +258,11 @@ def get_local_locations(self) -> Generator[str, None, None]: .fetchall() ) for row in result: - yield row["location"] + local_filepath = self._get_local_filepath(row["location"]) + if local_filepath and local_filepath.startswith( + tuple(constants.SYMBOL_BASEPATHS) + ): + yield row["location"] def is_url_local(self, url: str) -> bool: """Determines whether an url is local or not""" @@ -296,6 +309,20 @@ def get_hash(self, location: str) -> Optional[str]: return row["hash"] return None + def _get_local_filepath( + self, location: str, local_only: bool = True + ) -> Optional[str]: + # See if the file is a local URL type we can handle: + parsed = urllib.parse.urlparse(location) + pathname = location if not local_only else None + if parsed.scheme == "file": + pathname = parsed.path + if parsed.scheme == "jar": + inner_url = urllib.parse.urlparse(parsed.path) + if inner_url.scheme == "file": + pathname = inner_url.path.split("!")[0] + return pathname + def update(self, progress_callback=None): """Locates all files under the symbol directories. Updates the cache with additions, modifications and removals. This also updates remote locations based on a cache timeout. @@ -340,15 +367,7 @@ def dummy_progress(*args, **kargs) -> None: timestamp = stored_timestamp # Default to requiring update # See if the file is a local URL type we can handle: - parsed = urllib.parse.urlparse(location) - pathname = None - if parsed.scheme == "file": - pathname = urllib.request.url2pathname(parsed.path) - if parsed.scheme == "jar": - inner_url = urllib.parse.urlparse(parsed.path) - if inner_url.scheme == "file": - pathname = inner_url.path.split("!")[0] - + pathname = self._get_local_filepath(location) if pathname and os.path.exists(pathname): timestamp = datetime.datetime.fromtimestamp( os.stat(pathname).st_mtime @@ -461,7 +480,7 @@ def dummy_progress(*args, **kargs) -> None: def get_identifier_dictionary( self, operating_system: Optional[str] = None, local_only: bool = False ) -> Dict[bytes, str]: - output = {} + output: Dict[bytes, str] = {} additions = [] statement = "SELECT location, identifier FROM cache" if local_only: @@ -476,7 +495,15 @@ def get_identifier_dictionary( vollog.debug( f"Duplicate entry for identifier {row['identifier']}: {row['location']} and {output[row['identifier']]}" ) - output[row["identifier"]] = row["location"] + local_filepath = self._get_local_filepath(row["location"]) + if local_filepath and not local_filepath.startswith( + tuple(constants.SYMBOL_BASEPATHS) + ): + vollog.debug( + f"Location {row['location']} was not in the registered symbol paths and therefore not in the identifier dictionary" + ) + else: + output[row["identifier"]] = row["location"] return output def get_identifiers(self, operating_system: Optional[str]) -> List[bytes]: @@ -533,7 +560,10 @@ def __init__(self, *args, **kwargs): def __call__(self, context, config_path, configurable, progress_callback=None): """Runs the automagic over the configurable.""" - self._cache.update(progress_callback) + try: + self._cache.update(progress_callback) + except Exception as excp: + vollog.debug(f"Excption during cache update: {excp}") @classmethod def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]: