diff --git a/.gitignore b/.gitignore index b6e4761..b10b0bd 100644 --- a/.gitignore +++ b/.gitignore @@ -127,3 +127,6 @@ dmypy.json # Pyre type checker .pyre/ + +# +coredumps/ diff --git a/.travis.yml b/.travis.yml index 4e4c72d..bee9494 100644 --- a/.travis.yml +++ b/.travis.yml @@ -16,11 +16,11 @@ install: - python setup.py install script: - - python tests/test.py -l javascript tests/stuff.js - - python tests/test.py -l python tests/stuff.py - - python tests/test.py -l ruby tests/stuff.rb - - python tests/test.py -l c tests/stuff.c - - python tests/test.py -l cpp tests/stuff.cpp - - python tests/test.py -l java tests/stuff.java - - python tests/test.py -l rust tests/stuff.rs - - python tests/test.py -l go tests/stuff.go + - wsyntree-collector file -l javascript tests/stuff.js + - wsyntree-collector file -l python tests/stuff.py + - wsyntree-collector file -l ruby tests/stuff.rb + - wsyntree-collector file -l c tests/stuff.c + - wsyntree-collector file -l cpp tests/stuff.cpp + - wsyntree-collector file -l java tests/stuff.java + - wsyntree-collector file -l rust tests/stuff.rs + - wsyntree-collector file -l go tests/stuff.go diff --git a/LICENSE.md b/LICENSE.md index c2a43a7..2efcca8 100644 --- a/LICENSE.md +++ b/LICENSE.md @@ -1,4 +1,4 @@ -Copyright (c) 2020-2021; +Copyright (c) 2020-2023; Ben Klein (robobenklein) *et al* (`git shortlog -sn`) diff --git a/README.md b/README.md index ca1eb6c..c70d5da 100644 --- a/README.md +++ b/README.md @@ -12,11 +12,8 @@ Scales to incredible size: the goal is to build a system capable of storing, par WorldSyntaxTree is built upon the following technologies: - Python: we chose this as it is quick to develop and has a large ecosystem of scientific libraries that we aim to be able to support integration with. In our field of research Python is the most popular to use for quickly wrangling large and complex datasets. + - [NetworkX](https://networkx.org/): graph wrangling library of choice - [Tree-Sitter](https://tree-sitter.github.io/tree-sitter/): this is the integral component to enable us to generate the concrete syntax trees from code quickly while still generating a somewhat useful result even on broken code. -- [ArangoDB](https://www.arangodb.com/): our choice of database stemmed from the following requirements: - - Must be open source (free for use and improvement by all) - - Must support our incredibly large data size (many terabytes) - - Must have native/serverside graph processing capabilities - Git: the outer / top-level structure for the whole tree is based upon Git's structure of repositories, commits, and files, thus we aren't currently exploring other VCS systems (though we might in the far future) For a full list of libraries used, check the `setup.py` or `requirements.txt`. @@ -54,9 +51,15 @@ Requirements: - Standard development tooling (git, pip, python-dev, setuptools, etc) - C++ compiler (ubuntu: `libc++-dev libc++abi-dev`, plus any other dependencies needed for WST to auto-compile Tree-Sitter languages) - Python 3.8+ -- Optional: an ArangoDB instance -Install steps: +Recommended: perform these steps in a python Virtual Environment: + +``` +virtualenv -p python3 venv +source venv/bin/activate # run again to activate the virtualenv again later +``` + +Install dependencies and wsyntree itself: ``` python -m pip install -r requirements.txt diff --git a/docker-compose.yml b/docker-compose.yml index 436aa72..a0a8ce4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,23 +2,26 @@ version: '3' services: - wst-test: - build: - context: . - dockerfile: docker/testing.dockerfile - depends_on: - - neo4j - restart: "no" - command: "wait-for-it neo4j:7687 -- /wst/docker/test-collector.sh" - environment: - - NEO4J_BOLT_URL=bolt://neo4j:pass@neo4j:7687 +# wst-test: +# build: +# context: . +# dockerfile: docker/testing.dockerfile +# depends_on: +# - neo4j +# restart: "no" +# command: "wait-for-it neo4j:7687 -- /wst/docker/test-collector.sh" +# environment: +# - NEO4J_BOLT_URL=bolt://neo4j:pass@neo4j:7687 neo4j: - image: neo4j:4.2 + image: neo4j:5 ports: - 127.0.0.1:9784:7474 - 127.0.0.1:9787:7687 environment: - NEO4J_AUTH=none + - "NEO4JLABS_PLUGINS=[\"apoc\"]" + - "NEO4J_dbms_security_procedures_unrestricted=apoc.\\*" + - "NEO4J_apoc_import_file_enabled=true" # volumes: # - wst_neo4j_data:/data diff --git a/requirements.txt b/requirements.txt index 5cf0483..420362b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ tree_sitter@git+https://github.com/utk-se/py-tree-sitter.git@master#egg=tree_sitter +grand-cypher@git+https://github.com/aplbrain/grand-cypher.git@master#egg=grand-cypher python-arango coloredlogs pygit2 @@ -11,3 +12,5 @@ enlighten==1.9.0 psutil bpython orjson>=3.0.0 +networkx[default] +redis[hiredis] diff --git a/setup.py b/setup.py index 79a6619..0830101 100755 --- a/setup.py +++ b/setup.py @@ -51,6 +51,7 @@ "psutil", "bpython", "orjson>=3.0.0", + "networkx[default]" ], entry_points={ 'console_scripts': [ diff --git a/tests/stuff.rb b/tests/stuff.rb index b85a042..07d461b 100644 --- a/tests/stuff.rb +++ b/tests/stuff.rb @@ -1 +1,6 @@ -puts "Hello World" + +def hello_world + puts "Hello World" +end + +hello_world diff --git a/woc-support/Dockerfile b/woc-support/Dockerfile new file mode 100644 index 0000000..d071433 --- /dev/null +++ b/woc-support/Dockerfile @@ -0,0 +1,18 @@ +FROM robobenklein/home:latest + +USER root +# enable sshd +RUN rm -f /etc/service/sshd/down +RUN /etc/my_init.d/00_regen_ssh_host_keys.sh +RUN install_clean python3-dev libghc-bzlib-dev +ARG UNAME=testuser +ARG UID=1000 +ARG GID=1000 +RUN groupadd -g $GID -o $UNAME +RUN useradd -m -u $UID -g $GID -G sudo -o -s /bin/zsh $UNAME +RUN usermod -aG docker_env $UNAME +RUN chown -R :docker_env /etc/container_environment /etc/container_environment.sh /etc/container_environment.json +RUN chmod -R g+rwX /etc/container_environment /etc/container_environment.sh /etc/container_environment.json +USER $UNAME +CMD sudo /sbin/my_init +WORKDIR /home/$UNAME diff --git a/woc-support/docker-compose.yml b/woc-support/docker-compose.yml new file mode 100644 index 0000000..cf3bf9d --- /dev/null +++ b/woc-support/docker-compose.yml @@ -0,0 +1,40 @@ + +version: '2' + +services: + wst-jobrunner: + build: + context: . + args: + # remember to export these in the shell before build: + UNAME: ${USER} + GID: ${GID} + UID: ${UID} + #image: robobenklein/home:latest + restart: "no" + command: "sudo /sbin/my_init" + environment: + - OSCAR_TEST=1 + volumes: + - "/da0_data:/da0_data:ro" + - "/da1_data:/da1_data:ro" + - "/da2_data:/da2_data:ro" + - "/da3_data:/da3_data:ro" + - "/da4_data:/da4_data:ro" + - "/da4_fast:/da4_fast:ro" + - "/da5_data:/da5_data:ro" + - "/da5_fast:/da5_fast:ro" + - "/da7_data:/da7_data:ro" + - "/da7_data/WorldSyntaxTree:/da7_data/WorldSyntaxTree" # RW: output in here + - "/home/bklein3:/home/bklein3" + mem_limit: 512G + + wst-redis: + image: redis:7-alpine + mem_limit: 64G + + wst-telegraf: + image: telegraf:latest + volumes: + - "./telegraf.conf:/etc/telegraf/telegraf.conf:ro" + hostname: ${HOSTNAME:-nohost}-wst-telegraf diff --git a/wsyntree/constants.py b/wsyntree/constants.py index dd8e0ef..702c62c 100644 --- a/wsyntree/constants.py +++ b/wsyntree/constants.py @@ -23,10 +23,10 @@ "tsrepo": "https://github.com/tree-sitter/tree-sitter-ruby.git", "file_ext": "\.rb$", }, - # "csharp": { - # "tsrepo": "https://github.com/tree-sitter/tree-sitter-c-sharp.git", - # "file_ext": "\.cs$", - # }, + "c_sharp": { # name in compiled language uses underscore + "tsrepo": "https://github.com/tree-sitter/tree-sitter-c-sharp.git", + "file_ext": "\.cs$", + }, "c": { "tsrepo": "https://github.com/tree-sitter/tree-sitter-c.git", "file_ext": "\.(c|h)$", diff --git a/wsyntree/exceptions.py b/wsyntree/exceptions.py index 12a0d53..e9ac257 100644 --- a/wsyntree/exceptions.py +++ b/wsyntree/exceptions.py @@ -31,6 +31,9 @@ class UnhandledGitFileMode(ValueError, WSTBaseError): class DeduplicatedObjectMismatch(ValueError, WSTBaseError): pass +class RootTreeSitterNodeIsError(ValueError, WSTBaseError): + pass + def isArangoWriteWriteConflict(e: ArangoDocumentInsertError) -> bool: """Is an exception a Write-Write conflict?""" if isinstance(e, ArangoDocumentInsertError): diff --git a/wsyntree/hashtypes/__init__.py b/wsyntree/hashtypes/__init__.py new file mode 100644 index 0000000..74c8ef1 --- /dev/null +++ b/wsyntree/hashtypes/__init__.py @@ -0,0 +1,59 @@ +""" +# Why are there multiple node hash types? + +Multiple types are needed because different properties can be included or excluded +from the hash, e.g. to include or not to include named nodes. + + +""" +from enum import Enum +import hashlib +import functools + +from wsyntree import log + +import orjson +import networkx as nx + +class WSTNodeHashV1(): + __included__ = ["named", "type"] + def __init__(self, G, node): + """""" + self._graph = G + self._node = node + self._nodes = [] + nodes = nx.dfs_preorder_nodes(G, source=node) + for node in nodes: + self._nodes.append(node) + + @functools.lru_cache(maxsize=None) # functools.cache added in 3.9 + def _get_hashable_repr(self): + s = bytearray(b"WSTNodeHashV1<") + nodedata = list(map( + lambda x: {k:v for k,v in x.items() if k in self.__included__}, + [self._graph.nodes[n] for n in self._nodes] + )) + # s += ",".join([f"{list(nd.items())}" for nd in nodedata]) + # we must sort keys here in case a python version is used that does not + # preserve dict ordering is used + s += orjson.dumps(nodedata, option=orjson.OPT_SORT_KEYS) + # log.debug(f"{self}, {nodedata}") + s += b">" + return s + + @property + def _node_props(self): + return self._graph.nodes[self._node] + + def _get_sha512_hex(self): + h = hashlib.sha512() + h.update(self._get_hashable_repr()) + return h.hexdigest() + + def __str__(self): + return f"WSTNodeHashV1" + +# Once defined here the behavior should not change (stable versions) +class WSTNodeHashType(Enum): + # V1 = WSTNodeHashV1 + pass diff --git a/wsyntree/log.py b/wsyntree/log.py index 2513b9d..af2bad2 100644 --- a/wsyntree/log.py +++ b/wsyntree/log.py @@ -69,7 +69,7 @@ def __exit__(self, type, value, traceback): logger.setLevel(self._prev_state) -class supress_stdout(): +class suppress_stdout(): """ Stops the logger's StreamHandlers temporarily. @@ -77,7 +77,7 @@ class supress_stdout(): or other terminal full-windows views. Use a 'with' statement: - with supress_stdout(): + with suppress_stdout(): # your code """ diff --git a/wsyntree/wrap_tree_sitter.py b/wsyntree/wrap_tree_sitter.py index bde056a..55f902b 100644 --- a/wsyntree/wrap_tree_sitter.py +++ b/wsyntree/wrap_tree_sitter.py @@ -4,6 +4,7 @@ import functools import re import time +import warnings import pebble from tree_sitter import Language, Parser, TreeCursor, Node @@ -18,6 +19,7 @@ class TreeSitterAutoBuiltLanguage(): def __init__(self, lang): + assert lang in wsyntree_langs, f"{lang} not found or not yet available in WorldSyntaxTree" self.lang = lang self.parser = None self.ts_language = None @@ -42,6 +44,7 @@ def _get_language_repo(self): if not repodir.exists(): repodir.mkdir(mode=0o770) log.debug(f"cloning treesitter repo for {self}") + warnings.warn(f"WorldSyntaxTree cloning parser repo for {self.lang}, this might be slow.") return git.clone_repository( wsyntree_langs[self.lang]["tsrepo"], repodir.resolve() @@ -56,25 +59,25 @@ def _get_language_repo(self): def _get_language_library(self): try: - self.ts_lang_cache_lock.acquire(timeout=300) lib = self._get_language_cache_dir() / "language.so" repo = self._get_language_repo() repodir = self._get_language_repo_path() - if not lib.exists(): - log.warn(f"building library for {self}, this could take a while...") - start = time.time() - Language.build_library( - str(lib.resolve()), - [repodir] - ) - log.debug(f"library build of {self} completed after {round(time.time() - start)} seconds") + with self.ts_lang_cache_lock.acquire(timeout=600): + if not lib.exists(): + log.warn(f"building library for {self}, this could take a while...") + start = time.time() + Language.build_library( + str(lib.resolve()), + [repodir] + ) + log.debug(f"library build of {self} completed after {round(time.time() - start)} seconds") return lib except filelock.Timeout as e: - log.error(f"Failed to acquire lock on TSABL {self}") + log.error(f"Failed to acquire lock on TSABL {self} (needed to build language lib)") log.debug(f"lock object is {self.ts_lang_cache_lock}") raise e - finally: - self.ts_lang_cache_lock.release() + #finally: + # self.ts_lang_cache_lock.release() def _get_ts_language(self): if self.ts_language is not None: diff --git a/wsyntree_collector/__main__.py b/wsyntree_collector/__main__.py index ae5528f..4a48b7e 100644 --- a/wsyntree_collector/__main__.py +++ b/wsyntree_collector/__main__.py @@ -30,6 +30,8 @@ from .jsonl_collector import WST_JSONLCollector from .batch_analyzer import set_batch_analyze_args +from . import commands + def analyze(args): @@ -248,6 +250,16 @@ def __main__(): help="Delete any existing data in the database", action="store_true", ) + + # run for a single file + cmd_file = subcmds.add_parser( + 'file', aliases=[], help="Run WST on a single file") + commands.file.set_args(cmd_file) + + cmd_node_hash_v1 = subcmds.add_parser( + 'node_hash_v1', aliases=['nhv1'], help="Hash syntax nodes (V1)") + commands.node_hash_v1.set_args(cmd_node_hash_v1) + args = parser.parse_args() if args.verbose: diff --git a/wsyntree_collector/commands/__init__.py b/wsyntree_collector/commands/__init__.py new file mode 100644 index 0000000..716ed88 --- /dev/null +++ b/wsyntree_collector/commands/__init__.py @@ -0,0 +1,3 @@ + +from . import file +from . import node_hash_v1 diff --git a/wsyntree_collector/commands/file.py b/wsyntree_collector/commands/file.py new file mode 100644 index 0000000..86d0361 --- /dev/null +++ b/wsyntree_collector/commands/file.py @@ -0,0 +1,60 @@ + +import sys +from pathlib import Path + +import networkx as nx +from networkx.readwrite import json_graph +import orjson + +from wsyntree import log +from wsyntree.wrap_tree_sitter import TreeSitterAutoBuiltLanguage, TreeSitterCursorIterator + +from ..file.parse_file_treesitter import build_networkx_graph + +def set_args(parser): + parser.set_defaults(func=run) + parser.add_argument("which_file", type=Path, help="Path to input file") + parser.add_argument("-l", "--lang", help="Language parser to use") + parser.add_argument("-o", "--output", help="File to write result to", type=Path) + +def run(args): + log.info(f"Running for file {args.which_file}") + + if not args.lang: + raise NotImplementedError(f"Automatic detection of file language not supported. Please specify a TreeSitter parser to use.") + + lang = TreeSitterAutoBuiltLanguage(args.lang) + + graph = build_networkx_graph(lang, args.which_file, include_text=True) + + log.info(f"Graph result: {graph}") + + # node_link_data = json_graph.node_link_data(graph) + # log.debug(f"Graph node_link_data: {node_link_data}") + + # adjacency_data = json_graph.adjacency_data(graph) + # log.debug(f"Graph adjacency_data: {adjacency_data}") + + # cytoscape_data = json_graph.cytoscape_data(graph) + # log.debug(f"Graph cytoscape_data: {cytoscape_data}") + + tree_data = json_graph.tree_data(graph, 0) + log.debug(f"Graph tree_data: {tree_data}") + + # jit_data = json_graph.jit_data(graph) + # log.debug(f"Graph jit_data: {jit_data}") + + if args.output: + if str(args.output) == "-": + #raise NotImplementedError(f"output to stdout not yet supported") + sys.stdout.buffer.write(orjson.dumps(tree_data, option=orjson.OPT_APPEND_NEWLINE)) + elif str(args.output).endswith(".graphml"): + log.info(f"Writing to {args.output} ...") + log.info("Writing GraphML") + nx.write_graphml(graph, args.output) + else: + log.info(f"Writing to {args.output} ...") + with args.output.open('wb') as f: + f.write(orjson.dumps( + tree_data, option=orjson.OPT_APPEND_NEWLINE + )) diff --git a/wsyntree_collector/commands/node_hash_v1.py b/wsyntree_collector/commands/node_hash_v1.py new file mode 100644 index 0000000..4d73bed --- /dev/null +++ b/wsyntree_collector/commands/node_hash_v1.py @@ -0,0 +1,68 @@ + +from pathlib import Path + +import networkx as nx +from networkx.readwrite import json_graph +import orjson + +from wsyntree import log +from wsyntree.wrap_tree_sitter import TreeSitterAutoBuiltLanguage, TreeSitterCursorIterator +from wsyntree.hashtypes import WSTNodeHashV1 + +from ..file.parse_file_treesitter import build_networkx_graph + +def set_args(parser): + parser.set_defaults(func=run) + parser.add_argument("which_file", type=Path, help="Path to input file") + parser.add_argument("what_node_type", type=str, help="What kind of nodes to hash") + parser.add_argument("-l", "--lang", help="Language parser to use") + parser.add_argument("-o", "--output", help="File to write result to", type=Path) + +def run(args): + log.info(f"Running for file {args.which_file}") + + if not args.lang: + raise NotImplementedError(f"Automatic detection of file language not supported. Please specify a TreeSitter parser to use.") + + lang = TreeSitterAutoBuiltLanguage(args.lang) + + graph = build_networkx_graph(lang, args.which_file, include_text=True) + + log.info(f"Graph result: {graph}") + + log.info(f"Searching for nodes of type {args.what_node_type}") + hashed_nodes = [] + for node in nx.dfs_preorder_nodes(graph): + if graph.nodes[node]['type'] == args.what_node_type: + log.debug(f"Node {node} (x1:{graph.nodes[node]['x1']}) matched type {args.what_node_type}, hashing...") + hashed_nodes.append(WSTNodeHashV1(graph, node)) + + # tree_data = json_graph.tree_data(graph, 0) + # log.debug(f"Graph tree_data: {tree_data}") + + if args.output: + if str(args.output) == "-": + raise NotImplementedError(f"output to stdout not yet supported") + + log.info(f"Writing to {args.output} ...") + with args.output.open('wb') as f: + for nh in hashed_nodes: + f.write(orjson.dumps({ + "sha512": nh._get_sha512_hex(), + "file": str(args.which_file), + # obviously we want coords to allow humans to quickly find the referenced code + "x1": nh._node_props['x1'], + "x2": nh._node_props['x2'], + "y1": nh._node_props['y1'], + # while the node's hash *is* unique, make it easier to find: + "type": nh._node_props['type'], + # TODO: "objectid": git-object-id, (when run in batch mode) + }, option=orjson.OPT_APPEND_NEWLINE)) + # if str(args.output).endswith(".graphml"): + # log.info("Writing GraphML") + # nx.write_graphml(graph, args.output) + # else: + # with args.output.open('wb') as f: + # f.write(orjson.dumps( + # tree_data, option=orjson.OPT_APPEND_NEWLINE + # )) diff --git a/wsyntree_collector/commands/woc/count_redis_stats.py b/wsyntree_collector/commands/woc/count_redis_stats.py new file mode 100644 index 0000000..54ae8f7 --- /dev/null +++ b/wsyntree_collector/commands/woc/count_redis_stats.py @@ -0,0 +1,81 @@ +""" +This "command" is only to be run as a job on the DA cluster, not used as a subcommand. +""" + +import argparse +import subprocess +import traceback +import time +import tempfile +import enum +import os +from typing import List, Optional +from pathlib import Path, PurePath, PurePosixPath +from collections import Counter, namedtuple, deque +from concurrent import futures +from itertools import islice + +import networkx as nx +from networkx.readwrite import json_graph +import orjson +import pebble +import tenacity +from pebble import concurrent +from tqdm import tqdm +from tqdm.contrib.logging import logging_redirect_tqdm +from pympler import tracker +import redis + +import wsyntree.exceptions +from wsyntree import log +from wsyntree.wrap_tree_sitter import ( + TreeSitterAutoBuiltLanguage, TreeSitterCursorIterator, get_TSABL_for_file, +) +from wsyntree.hashtypes import WSTNodeHashV1 + +from wsyntree_collector.file.parse_file_treesitter import build_networkx_graph +from wsyntree_collector.wociterators import all_blobs as all_blobs_iterator, BlobStatus + +#import oscar + + +tqdm_smoothing_factor = 0.01 + +redis_pool = redis.ConnectionPool( + host=os.environ.get("WST_REDIS_HOST", "wst-redis"), + port=6379, db=0, +) +redis_client = redis.Redis(connection_pool=redis_pool) +redis_decoded = redis.Redis(connection_pool=redis_pool, decode_responses=True) + +def batched(iterable, n): + "Batch data into lists of length n. The last batch may be shorter." + # batched('ABCDEFG', 3) --> ABC DEF G + it = iter(iterable) + while True: + batch = list(islice(it, n)) + if not batch: + return + yield batch + +if __name__ == "__main__": + parser = argparse.ArgumentParser("WST Collector NHV1 BlobFuncs v0") + # parser.add_argument("-w", "--workers", type=int, default=4) + parser.add_argument("-v", "--verbose", action="store_true") + # parser.add_argument("--prescan", help="Scan output dir before attempting new blobs", action="store_true") + args = parser.parse_args() + if args.verbose: + log.setLevel(log.DEBUG) + + counter = Counter() + try: + with logging_redirect_tqdm(), log.suppress_stdout(): + key_it = tqdm(redis_decoded.scan_iter(count=5000), unit_scale=True) + for batch in batched(key_it, 5000): + #log.debug(f"batch size {len(batch)}") + vals = redis_decoded.mget(batch) + counter.update(vals) + except KeyboardInterrupt as e: + log.warn(f"Caught KeyboardInterrupt, stopping ...") + for m in counter.most_common(): + print(m) diff --git a/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py b/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py new file mode 100644 index 0000000..cfd22b2 --- /dev/null +++ b/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py @@ -0,0 +1,266 @@ +""" +This "command" is only to be run as a job on the DA cluster, not used as a subcommand. +""" + +import argparse +import subprocess +import traceback +import time +import tempfile +import enum +import os +from typing import List, Optional +from pathlib import Path, PurePath, PurePosixPath +from collections import Counter, namedtuple, deque +from concurrent import futures + +import networkx as nx +from networkx.readwrite import json_graph +import orjson +import pebble +import tenacity +from pebble import concurrent +from tqdm import tqdm +from tqdm.contrib.logging import logging_redirect_tqdm +from pympler import tracker +import redis + +import wsyntree.exceptions +from wsyntree import log +from wsyntree.wrap_tree_sitter import ( + TreeSitterAutoBuiltLanguage, TreeSitterCursorIterator, get_TSABL_for_file, +) +from wsyntree.hashtypes import WSTNodeHashV1 + +from wsyntree_collector.file.parse_file_treesitter import build_networkx_graph +from wsyntree_collector.wociterators import all_blobs as all_blobs_iterator, BlobStatus + +#import oscar + + +tqdm_smoothing_factor = 0.01 + +getValuesScript = Path("~/lookup/getValues").expanduser() +assert getValuesScript.exists(), f"Expected getValues at {getValuesScript}" +output_root_dir = Path("/da7_data/WorldSyntaxTree/nhv1/blobfuncs_v0") +assert output_root_dir.is_dir(), f"Output directory does not exist." +errors_dir = output_root_dir / "errors" +errors_dir.mkdir(exist_ok=True) + +FUNCDEF_TYPES = ( + "function_definition", + "function_declaration", + "method_declaration", + "method", # ruby + "function_item", # rust +) + +redis_pool = redis.ConnectionPool( + host=os.environ.get("WST_REDIS_HOST", "wst-redis"), + port=6379, db=0, +) +redis_client = redis.Redis(connection_pool=redis_pool) +redis_decoded = redis.Redis(connection_pool=redis_pool, decode_responses=True) + +def wst_supports_fnames(fnames): + filenames = fnames + if not filenames: + return False + exts = Counter([x.suffix for x in filenames if x.suffix]) + primary_ext = exts.most_common(1) + if not primary_ext: + return False + primary_ext = primary_ext[0][0] + if get_TSABL_for_file(str(primary_ext)) is not None: + return True + #log.debug(f"WST {primary_ext} not supported: {filenames[0]}") + return False + +def output_path_for_blob(b): + outdir = output_root_dir / b[0:2] / b[2:4] + outdir.mkdir(parents=True, exist_ok=True) + return outdir / f"{b}.jsonl" + +def error_path_for_blob(b): + d = errors_dir / b[0:2] / b[2:4] + return d / f"{b}.txt" + +def blob_not_yet_processed(b): + if redis_decoded.exists(b): + return False + # these should be set in redis by the --prescan argument: + # no longer need to stat every single file over NFS + #p = output_path_for_blob(b) + #if p.is_file(): + # return False + #fail_file = error_path_for_blob(b) + #if fail_file.is_file(): + # return False + return True + +@tenacity.retry( + # OSError: [Errno 14] Bad address + # not sure what is causing this, but likely related to poor/outdated NFS + # or unreliable UTK network + retry=tenacity.retry_if_exception_type(OSError), + stop=tenacity.stop_after_attempt(3), + reraise=True, +) +def run_blob(blob, content, filenames): + """""" + #blob, content, filenames = blob_pair + if not filenames: + return (None, "NO_FILENAMES") + exts = Counter([x.suffix for x in filenames if x.suffix]) + primary_ext = exts.most_common(1) + if not primary_ext: + return (None, "NO_FILENAME_SUFFIX") + primary_ext = primary_ext[0][0] + tsabl = get_TSABL_for_file(str(primary_ext)) # pebble synchronized cache + #log.debug(f"Start parsing {blob} with {tsabl}") + + with tempfile.TemporaryDirectory(prefix="wst_") as tempdir: + log.debug(f"{blob}: working in {tempdir}") + outfile = output_path_for_blob(blob) + tempdir = Path(tempdir) + codepath = (tempdir / f"file{primary_ext}") + with codepath.open("wb") as f: + f.write(content) + graph = build_networkx_graph(tsabl, codepath, include_text=False) + hashed_nodes = [] + for node in nx.dfs_preorder_nodes(graph): + if graph.nodes[node]['type'] in FUNCDEF_TYPES: + hashed_nodes.append(WSTNodeHashV1(graph, node)) + + with outfile.open('wb') as f: + for nh in hashed_nodes: + f.write(orjson.dumps({ + "sha512": nh._get_sha512_hex(), + "blob": blob, + "x1": nh._node_props['x1'], + "x2": nh._node_props['x2'], + "y1": nh._node_props['y1'], + "type": nh._node_props['type'], + "lang": tsabl.lang, + }, option=orjson.OPT_APPEND_NEWLINE)) + + log.debug(f"{blob}: finished") + redis_decoded.set(blob, BlobStatus.done) + return (blob, len(hashed_nodes), outfile) + +@concurrent.process(name="wst-nhv1-blobfuncs", timeout=60*60) +#@concurrent.thread(name="wst-nhv1-blobfuncs") +# @tenacity.retry( # retry here is not working: the C-level 'munmap' error exits the whole python process +# retry=tenacity.retry_if_exception_type(pebble.common.ProcessExpired), +# stop=tenacity.stop_after_attempt(5), +# reraise=True, +# ) +def _rb(*a, **kwa): + try: + return run_blob(*a, **kwa) + #return None + except wsyntree.exceptions.RootTreeSitterNodeIsError as e: + log.debug(f"skip {a[0]}: {e}") + raise e + except Exception as e: + log.trace(log.error, traceback.format_exc()) + log.error(f"{e}") + raise e + +blobjob = namedtuple("BlobJob", ["result", "args", "kwargs", "retry"]) + +def record_failure(job, e): + blob = job.args[0] + fail_file = error_path_for_blob(blob) + fail_file.parent.mkdir(parents=True, exist_ok=True) + with fail_file.open("at") as f: + f.write(f"JOB FAILURE REPORT {blob}:\n") + f.write(f"Current traceback:\n") + f.write(traceback.format_exc()) + f.write("\n") + f.write(str(e)) + f.write("\n") + log.warn(f"Wrote failure report to {fail_file}") + redis_decoded.set(blob, BlobStatus.errored) + return fail_file + +if __name__ == "__main__": + parser = argparse.ArgumentParser("WST Collector NHV1 BlobFuncs v0") + parser.add_argument("-w", "--workers", type=int, default=4) + parser.add_argument("-v", "--verbose", action="store_true") + parser.add_argument("--prescan", help="Scan output dir before attempting new blobs", action="store_true") + args = parser.parse_args() + if args.verbose: + log.setLevel(log.DEBUG) + + if args.prescan: + log.info(f"running prescan ...") + for blobfile in tqdm( + output_root_dir.glob("*/*/*.jsonl"), + desc="scanning existing outputs", unit="blobs", + smoothing=tqdm_smoothing_factor, + ): + redis_decoded.set(blobfile.stem, BlobStatus.done) + for errfile in tqdm( + errors_dir.glob("*/*/*.txt"), desc="scanning errors", + smoothing=tqdm_smoothing_factor, + ): + redis_decoded.set(errfile.stem, BlobStatus.error) + + log.info(f"pool workers {args.workers}") + q = deque(maxlen=args.workers) + try: + blobs = tqdm(all_blobs_iterator( + blobfilter=blob_not_yet_processed, + filefilter=lambda f: wst_supports_fnames(f), + tqdm_position=1, + redis_cl=redis_decoded, + ), desc="blobs processed", unit="blobs", smoothing=tqdm_smoothing_factor, unit_scale=True) + tr = tracker.SummaryTracker() + def collect_or_retry(job: blobjob): + q.remove(job) + try: + job.result.result() + except wsyntree.exceptions.RootTreeSitterNodeIsError as e: + # no retry + log.warn(f"{job.args[0]}: unable to parse: {e}") + record_failure(job, e) + except pebble.common.ProcessExpired as e: + if job.retry > 5: + record_failure(job, e) + return + log.error(f"{job.args[0]} attempt {job.retry} failed: {e}") + q.append(blobjob( + _rb(*job.args), + job.args, + {}, + job.retry + 1, + )) + except concurrent.futures.TimeoutError as e: + log.error(f"{job.args[0]} process timed out") + record_failure(job, e) + + with logging_redirect_tqdm(), log.suppress_stdout(): + #log.logger.addHandler(log.OutputHandler(tqdm.write)) + for blob_pair in blobs: + if len(q) >= args.workers: + # block until slots available + done, not_done = futures.wait([j.result for j in q], return_when=futures.FIRST_COMPLETED) + done = list(done) + for job in list(q): # done jobs + if job.result in done: + collect_or_retry(job) + done.remove(job.result) + assert len(done) == 0 + + jargs = [*blob_pair] + q.append(blobjob(_rb(*jargs), jargs, {}, 0)) + # finish up: + for job in list(q): + collect_or_retry(job) + except KeyboardInterrupt as e: + log.warn(f"Caught KeyboardInterrupt, stopping ...") + tr.print_diff() + for j in q: + j.result.cancel() + tr.print_diff() diff --git a/wsyntree_collector/file/parse_file_treesitter.py b/wsyntree_collector/file/parse_file_treesitter.py index b437c59..7ed69ae 100644 --- a/wsyntree_collector/file/parse_file_treesitter.py +++ b/wsyntree_collector/file/parse_file_treesitter.py @@ -1,29 +1,97 @@ -from dask import dataframe as dd -from dask import bag as db +from itertools import chain +from pathlib import Path + +# from dask import dataframe as dd +# from dask import bag as db + +import networkx as nx from wsyntree import log +from wsyntree.exceptions import RootTreeSitterNodeIsError +from wsyntree.utils import dotdict from wsyntree.wrap_tree_sitter import TreeSitterAutoBuiltLanguage, TreeSitterCursorIterator -def build_dask_dataframe_for_file(lang: TreeSitterAutoBuiltLanguage, file: str): +# def build_dask_dataframe_for_file(lang: TreeSitterAutoBuiltLanguage, file: str): +# tree = lang.parse_file(file) +# cur = tree.walk() +# # cur = TreeSitterCursorIterator(cur, nodefilter=lambda x: x.is_named) +# cur = TreeSitterCursorIterator(cur) +# +# log.debug(f"{cur}") +# +# cols = ["repo", "file", "x1", "y1", "x2", "y2", "type", "text"] +# +# nl = [] +# +# for node in cur: +# # log.trace(log.debug, f"{node.type}: {node.text.tobytes().decode('utf-8')}") +# nl.append( +# [-1, file, *node.start_point, *node.end_point, node.type, node.text.tobytes()] +# ) +# +# ndb = db.from_sequence(nl) +# ndf = ndb.to_dataframe(columns=cols) +# +# return ndf.persist().repartition(1) + +def build_networkx_graph( + lang: TreeSitterAutoBuiltLanguage, + file: Path, + only_named_nodes: bool = False, + include_text: bool = False, + node_name_prefix="", + ): tree = lang.parse_file(file) cur = tree.walk() - # cur = TreeSitterCursorIterator(cur, nodefilter=lambda x: x.is_named) - cur = TreeSitterCursorIterator(cur) - log.debug(f"{cur}") + if only_named_nodes: + cursor = TreeSitterCursorIterator(cur, nodefilter=lambda x: x.is_named) + else: + cursor = TreeSitterCursorIterator(cur) + + G = nx.DiGraph(lang=lang.lang) + + parent_stack = [] + ts_id_to_preorder = {} + + root = cursor.peek() + if root.type == "ERROR": + raise RootTreeSitterNodeIsError(f"the file content or language is likely wrong for this '{lang.lang}' parser") + # ts_id_to_preorder[root.id] = 0 + + for cur_node in chain([root], cursor): + preorder = cursor._preorder + + nn = dotdict({ + # preorder=preorder, + "id": cur_node.id, + "named": cur_node.is_named, + "type": cur_node.type, + }) + (nn.x1,nn.y1) = cur_node.start_point + (nn.x2,nn.y2) = cur_node.end_point - cols = ["repo", "file", "x1", "y1", "x2", "y2", "type", "text"] + ts_id_to_preorder[cur_node.id] = preorder + parent_order = parent_stack[-1] if parent_stack else None - nl = [] + if include_text: + try: + nn.text = cur_node.text.decode() + except: + log.warn(f"Cannot decode text.") - for node in cur: - # log.trace(log.debug, f"{node.type}: {node.text.tobytes().decode('utf-8')}") - nl.append( - [-1, file, *node.start_point, *node.end_point, node.type, node.text.tobytes()] - ) + #log.debug(f"adding node {preorder}: {nn}") + # insert node and it's data + G.add_node(preorder, **nn) - ndb = db.from_sequence(nl) - ndf = ndb.to_dataframe(columns=cols) + # add the edge + if cur_node.parent is not None: + parent_preorder = ts_id_to_preorder[cur_node.parent.id] + #log.debug(f"connecting node {preorder}, to {parent_preorder}") + G.add_edge( + parent_preorder, + preorder + ) - return ndf.persist().repartition(1) + return G diff --git a/wsyntree_collector/wociterators.py b/wsyntree_collector/wociterators.py new file mode 100644 index 0000000..3a43eb8 --- /dev/null +++ b/wsyntree_collector/wociterators.py @@ -0,0 +1,145 @@ +import sys +import os +import gzip +import enum +from pathlib import Path, PurePosixPath +from collections import namedtuple + +from tqdm import tqdm + +from wsyntree import log +import oscar + +blob_sections = 128 +all_blob_sections = range(0, blob_sections) + +blob_fbase = "/da5_data/All.blobs/blob_{section}.{ext}" + +blobresult = namedtuple("BlobResult", ["hash", "content", "filenames"]) + +KIBIBYTE = 2**10 +MEBIBYTE = 2**20 +GIBIBYTE = 2**30 + +class BlobStatus(str, enum.Enum): # enum.StrEnum added in 3.11 + done = "done" + not_supported = "not_supported" + too_large = "too_large" + errored = "errored" + skipped = "skipped" + +def binary_line_iterator(open_file, max_buffer_size=GIBIBYTE): + """only yields lines that are decodable""" + # magic performance number: wanting the average buffer.split() size + # idxf_1: 10849129529/97563749 = 111.2 avg bytes per line + chunksize = 256 + lineno = 0 + buffer = bytearray() + for block in iter(lambda: open_file.read(chunksize), b""): + buffer += block + while b"\n" in buffer: + line, buffer = buffer.split(b"\n", 1) + lineno += 1 + try: + yield line.decode() + except UnicodeDecodeError: + log.warn(f"cannot decode line {lineno} of {open_file.name}") + if len(buffer) >= max_buffer_size: + # skip this line, it's too large + log.warn(f"line {lineno} greater than {max_buffer_size} bytes long, skipping to next line...") + buffer = bytearray() + for block in iter(lambda: open_file.read(chunksize), b""): + if b"\n" in block: + # found next line + _, new_line_start = block.split(b"\n", 1) + lineno += 1 + buffer.extend(new_line_start) + break + else: # no newline before end of file: + log.warn(f"line {lineno} was last in file") + # loop will exit naturally + if buffer: + for line in buffer.split(b"\n"): + try: + yield line.decode() + except UnicodeDecodeError: + log.warn(f"cannot decode line {lineno} of {open_file.name}") + lineno += 1 # does this go before or after? not too important? + +def iter_blobs( + section, + blobfilter=lambda b: True, + filefilter=lambda fnames: True, + max_blob_size=(MEBIBYTE * 32), + tqdm_position=None, + redis_cl=None, + ): + """ + blobfilter(str) -> bool + filefilter(List[PurePosixPath]) -> bool + + all provided filters must pass + """ + p_idx = Path(blob_fbase.format(section=section, ext="idxf")) + p_bin = Path(blob_fbase.format(section=section, ext="bin")) + + # open the index in binary as well, encoding not specified + with gzip.open(p_idx, "rb") as idx_f, p_bin.open("rb") as bin_f: + if tqdm_position is None: + bin_line_it = binary_line_iterator(idx_f) + else: + bin_line_it = tqdm( + binary_line_iterator(idx_f), + position=tqdm_position, + desc="index lines", + unit="lines", + unit_scale=True, + smoothing=0.01, + ) + for idx_line in bin_line_it: + fields = idx_line.rstrip().split(";") + _hash = fields[3] + if not blobfilter(_hash): + continue + offset = int(fields[1]) + length = int(fields[2]) + if length >= max_blob_size: + log.warn(f"compressed blob too large: skip {_hash} of lzf size {length}") + if redis_cl: + redis_cl.set(_hash, BlobStatus.too_large) + continue + filenames = tuple(PurePosixPath(x) for x in fields[4:]) + if not filefilter(filenames): + if redis_cl: + redis_cl.set(_hash, BlobStatus.not_supported) + continue + bin_f.seek(offset, os.SEEK_SET) + lzf_data = bin_f.read(length) + if not lzf_data: + log.warn(f"no data in blob: skip {_hash}") + continue + if lzf_data[0] == 0: + # data not compressed, will be handled by oscar.decomp + pass + else: + header_size, uncompressed_content_length = oscar.lzf_length(lzf_data) + if uncompressed_content_length >= max_blob_size: + log.warn(f"uncompressed blob too large: skip {_hash} of size {uncompressed_content_length}") + if redis_cl: + redis_cl.set(_hash, BlobStatus.too_large) + continue + val = oscar.decomp(lzf_data) + yield blobresult(_hash, val, filenames) + +def all_blobs(**kwargs): + for sec in all_blob_sections: + log.info(f"iter all blob sections: start section {sec}") + for i in iter_blobs(sec, **kwargs): + yield i + +if __name__ == "__main__": + it = all_blobs() + + for _ in range(5): + n = next(it) + print(f"Blob {n[0]}: content length {len(n[1])}")