From 6b287304b46e56a488dad18dc209520154eb2140 Mon Sep 17 00:00:00 2001 From: Ben Klein Date: Mon, 4 Apr 2022 08:20:34 -0400 Subject: [PATCH 01/29] using node.id and networkx to construct json out --- requirements.txt | 1 + setup.py | 1 + wsyntree_collector/__main__.py | 8 ++ wsyntree_collector/commands/__init__.py | 2 + wsyntree_collector/commands/file.py | 53 +++++++++++ .../file/parse_file_treesitter.py | 91 +++++++++++++++---- 6 files changed, 140 insertions(+), 16 deletions(-) create mode 100644 wsyntree_collector/commands/__init__.py create mode 100644 wsyntree_collector/commands/file.py diff --git a/requirements.txt b/requirements.txt index 5cf0483..a08d128 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,3 +11,4 @@ enlighten==1.9.0 psutil bpython orjson>=3.0.0 +networkx[default] diff --git a/setup.py b/setup.py index 79a6619..0830101 100755 --- a/setup.py +++ b/setup.py @@ -51,6 +51,7 @@ "psutil", "bpython", "orjson>=3.0.0", + "networkx[default]" ], entry_points={ 'console_scripts': [ diff --git a/wsyntree_collector/__main__.py b/wsyntree_collector/__main__.py index ae5528f..4ca4880 100644 --- a/wsyntree_collector/__main__.py +++ b/wsyntree_collector/__main__.py @@ -30,6 +30,8 @@ from .jsonl_collector import WST_JSONLCollector from .batch_analyzer import set_batch_analyze_args +from . import commands + def analyze(args): @@ -248,6 +250,12 @@ def __main__(): help="Delete any existing data in the database", action="store_true", ) + + # run for a single file + cmd_file = subcmds.add_parser( + 'file', aliases=[], help="Run WST on a single file") + commands.file.set_args(cmd_file) + args = parser.parse_args() if args.verbose: diff --git a/wsyntree_collector/commands/__init__.py b/wsyntree_collector/commands/__init__.py new file mode 100644 index 0000000..930a3f3 --- /dev/null +++ b/wsyntree_collector/commands/__init__.py @@ -0,0 +1,2 @@ + +from . import file diff --git a/wsyntree_collector/commands/file.py b/wsyntree_collector/commands/file.py new file mode 100644 index 0000000..72d1661 --- /dev/null +++ b/wsyntree_collector/commands/file.py @@ -0,0 +1,53 @@ + +from pathlib import Path + +from networkx.readwrite import json_graph +import orjson + +from wsyntree import log +from wsyntree.wrap_tree_sitter import TreeSitterAutoBuiltLanguage, TreeSitterCursorIterator + +from ..file.parse_file_treesitter import build_networkx_graph + +def set_args(parser): + parser.set_defaults(func=run) + parser.add_argument("which_file", type=Path, help="Path to input file") + parser.add_argument("-l", "--lang", help="Language parser to use") + parser.add_argument("-o", "--output", help="File to write result to", type=Path) + +def run(args): + log.info(f"Running for file {args.which_file}") + + if not args.lang: + raise NotImplementedError(f"Automatic detection of file language not supported. Please specify a TreeSitter parser to use.") + + lang = TreeSitterAutoBuiltLanguage(args.lang) + + graph = build_networkx_graph(lang, args.which_file) + + log.info(f"Graph result: {graph}") + + # node_link_data = json_graph.node_link_data(graph) + # log.debug(f"Graph node_link_data: {node_link_data}") + # + # adjacency_data = json_graph.adjacency_data(graph) + # log.debug(f"Graph adjacency_data: {adjacency_data}") + + # cytoscape_data = json_graph.cytoscape_data(graph) + # log.debug(f"Graph cytoscape_data: {cytoscape_data}") + + tree_data = json_graph.tree_data(graph, 0) + log.debug(f"Graph tree_data: {tree_data}") + + # jit_data = json_graph.jit_data(graph) + # log.debug(f"Graph jit_data: {jit_data}") + + if args.output: + if str(args.output) == "-": + raise NotImplementedError(f"output to stdout not yet supported") + + log.info(f"Writing to {args.output} ...") + with args.output.open('wb') as f: + f.write(orjson.dumps( + tree_data, option=orjson.OPT_APPEND_NEWLINE + )) diff --git a/wsyntree_collector/file/parse_file_treesitter.py b/wsyntree_collector/file/parse_file_treesitter.py index b437c59..9f6f326 100644 --- a/wsyntree_collector/file/parse_file_treesitter.py +++ b/wsyntree_collector/file/parse_file_treesitter.py @@ -1,29 +1,88 @@ -from dask import dataframe as dd -from dask import bag as db +from itertools import chain +from pathlib import Path + +# from dask import dataframe as dd +# from dask import bag as db + +import networkx as nx from wsyntree import log +from wsyntree.utils import dotdict from wsyntree.wrap_tree_sitter import TreeSitterAutoBuiltLanguage, TreeSitterCursorIterator -def build_dask_dataframe_for_file(lang: TreeSitterAutoBuiltLanguage, file: str): +# def build_dask_dataframe_for_file(lang: TreeSitterAutoBuiltLanguage, file: str): +# tree = lang.parse_file(file) +# cur = tree.walk() +# # cur = TreeSitterCursorIterator(cur, nodefilter=lambda x: x.is_named) +# cur = TreeSitterCursorIterator(cur) +# +# log.debug(f"{cur}") +# +# cols = ["repo", "file", "x1", "y1", "x2", "y2", "type", "text"] +# +# nl = [] +# +# for node in cur: +# # log.trace(log.debug, f"{node.type}: {node.text.tobytes().decode('utf-8')}") +# nl.append( +# [-1, file, *node.start_point, *node.end_point, node.type, node.text.tobytes()] +# ) +# +# ndb = db.from_sequence(nl) +# ndf = ndb.to_dataframe(columns=cols) +# +# return ndf.persist().repartition(1) + +def build_networkx_graph( + lang: TreeSitterAutoBuiltLanguage, file: Path, + only_named_nodes: bool = False, + include_text: bool = False, + ): tree = lang.parse_file(file) cur = tree.walk() - # cur = TreeSitterCursorIterator(cur, nodefilter=lambda x: x.is_named) - cur = TreeSitterCursorIterator(cur) - log.debug(f"{cur}") + if only_named_nodes: + cursor = TreeSitterCursorIterator(cur, nodefilter=lambda x: x.is_named) + else: + cursor = TreeSitterCursorIterator(cur) + + G = nx.DiGraph(lang=lang.lang) + + parent_stack = [] + ts_id_to_preorder = {} + + root = cursor.peek() + # ts_id_to_preorder[root.id] = 0 + + for cur_node in chain([root], cursor): + preorder = cursor._preorder + + nn = dotdict({ + # preorder=preorder, + "id": cur_node.id, + "named": cur_node.is_named, + "type": cur_node.type, + }) + (nn.x1,nn.y1) = cur_node.start_point + (nn.x2,nn.y2) = cur_node.end_point - cols = ["repo", "file", "x1", "y1", "x2", "y2", "type", "text"] + ts_id_to_preorder[cur_node.id] = preorder + parent_order = parent_stack[-1] if parent_stack else None - nl = [] + if include_text: + raise NotImplementedError(f"text decoding TODO") - for node in cur: - # log.trace(log.debug, f"{node.type}: {node.text.tobytes().decode('utf-8')}") - nl.append( - [-1, file, *node.start_point, *node.end_point, node.type, node.text.tobytes()] - ) + log.debug(f"adding node {preorder}: {nn}") + # insert node and it's data + G.add_node(preorder, **nn) - ndb = db.from_sequence(nl) - ndf = ndb.to_dataframe(columns=cols) + # add the edge + if cur_node.parent is not None: + log.debug(f"connecting node {preorder}, to id {cur_node.parent.id}") + G.add_edge( + ts_id_to_preorder[cur_node.parent.id], + preorder + ) - return ndf.persist().repartition(1) + return G From a0a0215cf035f13745a4f9d8182f6076bbfb4170 Mon Sep 17 00:00:00 2001 From: Ben Klein Date: Sat, 21 May 2022 14:57:58 -0400 Subject: [PATCH 02/29] disable test docker container --- docker-compose.yml | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 436aa72..38e3e2f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,19 +2,19 @@ version: '3' services: - wst-test: - build: - context: . - dockerfile: docker/testing.dockerfile - depends_on: - - neo4j - restart: "no" - command: "wait-for-it neo4j:7687 -- /wst/docker/test-collector.sh" - environment: - - NEO4J_BOLT_URL=bolt://neo4j:pass@neo4j:7687 +# wst-test: +# build: +# context: . +# dockerfile: docker/testing.dockerfile +# depends_on: +# - neo4j +# restart: "no" +# command: "wait-for-it neo4j:7687 -- /wst/docker/test-collector.sh" +# environment: +# - NEO4J_BOLT_URL=bolt://neo4j:pass@neo4j:7687 neo4j: - image: neo4j:4.2 + image: neo4j:4.4 ports: - 127.0.0.1:9784:7474 - 127.0.0.1:9787:7687 From 6de9fd668ce271dc6831c6cedb1341e10775faa7 Mon Sep 17 00:00:00 2001 From: Ben Klein Date: Sat, 21 May 2022 18:43:18 -0400 Subject: [PATCH 03/29] can export text in graphml --- docker-compose.yml | 1 + wsyntree_collector/commands/file.py | 17 +++++++++++------ .../file/parse_file_treesitter.py | 5 ++++- 3 files changed, 16 insertions(+), 7 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 38e3e2f..9e9ec76 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -20,5 +20,6 @@ services: - 127.0.0.1:9787:7687 environment: - NEO4J_AUTH=none + - "NEO4JLABS_PLUGINS=[\"apoc\"]" # volumes: # - wst_neo4j_data:/data diff --git a/wsyntree_collector/commands/file.py b/wsyntree_collector/commands/file.py index 72d1661..f1f90e9 100644 --- a/wsyntree_collector/commands/file.py +++ b/wsyntree_collector/commands/file.py @@ -1,6 +1,7 @@ from pathlib import Path +import networkx as nx from networkx.readwrite import json_graph import orjson @@ -23,13 +24,13 @@ def run(args): lang = TreeSitterAutoBuiltLanguage(args.lang) - graph = build_networkx_graph(lang, args.which_file) + graph = build_networkx_graph(lang, args.which_file, include_text=True) log.info(f"Graph result: {graph}") # node_link_data = json_graph.node_link_data(graph) # log.debug(f"Graph node_link_data: {node_link_data}") - # + # adjacency_data = json_graph.adjacency_data(graph) # log.debug(f"Graph adjacency_data: {adjacency_data}") @@ -47,7 +48,11 @@ def run(args): raise NotImplementedError(f"output to stdout not yet supported") log.info(f"Writing to {args.output} ...") - with args.output.open('wb') as f: - f.write(orjson.dumps( - tree_data, option=orjson.OPT_APPEND_NEWLINE - )) + if str(args.output).endswith(".graphml"): + log.info("Writing GraphML") + nx.write_graphml(graph, args.output) + else: + with args.output.open('wb') as f: + f.write(orjson.dumps( + tree_data, option=orjson.OPT_APPEND_NEWLINE + )) diff --git a/wsyntree_collector/file/parse_file_treesitter.py b/wsyntree_collector/file/parse_file_treesitter.py index 9f6f326..37cc009 100644 --- a/wsyntree_collector/file/parse_file_treesitter.py +++ b/wsyntree_collector/file/parse_file_treesitter.py @@ -71,7 +71,10 @@ def build_networkx_graph( parent_order = parent_stack[-1] if parent_stack else None if include_text: - raise NotImplementedError(f"text decoding TODO") + try: + nn.text = cur_node.text.decode() + except: + log.warn(f"Cannot decode text.") log.debug(f"adding node {preorder}: {nn}") # insert node and it's data From 8de110baa4ca156d6a549c71a665d68adc2568c1 Mon Sep 17 00:00:00 2001 From: Ben Klein Date: Sun, 22 May 2022 14:54:19 -0400 Subject: [PATCH 04/29] allow prefixing nodes in nx export --- docker-compose.yml | 2 ++ wsyntree_collector/file/parse_file_treesitter.py | 3 ++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/docker-compose.yml b/docker-compose.yml index 9e9ec76..82f33a4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -21,5 +21,7 @@ services: environment: - NEO4J_AUTH=none - "NEO4JLABS_PLUGINS=[\"apoc\"]" + - "NEO4J_dbms_security_procedures_unrestricted=apoc.\\*" + - "NEO4J_apoc_import_file_enabled=true" # volumes: # - wst_neo4j_data:/data diff --git a/wsyntree_collector/file/parse_file_treesitter.py b/wsyntree_collector/file/parse_file_treesitter.py index 37cc009..81b4928 100644 --- a/wsyntree_collector/file/parse_file_treesitter.py +++ b/wsyntree_collector/file/parse_file_treesitter.py @@ -38,6 +38,7 @@ def build_networkx_graph( lang: TreeSitterAutoBuiltLanguage, file: Path, only_named_nodes: bool = False, include_text: bool = False, + node_name_prefix="", ): tree = lang.parse_file(file) cur = tree.walk() @@ -78,7 +79,7 @@ def build_networkx_graph( log.debug(f"adding node {preorder}: {nn}") # insert node and it's data - G.add_node(preorder, **nn) + G.add_node(node_name_prefix + str(preorder), **nn) # add the edge if cur_node.parent is not None: From 525059bd7160770d1b3179a49032300bb1d0e4d8 Mon Sep 17 00:00:00 2001 From: Ben Klein Date: Mon, 13 Feb 2023 13:43:05 -0500 Subject: [PATCH 05/29] cleanup before new idea --- .travis.yml | 16 ++++++++-------- LICENSE.md | 2 +- README.md | 15 +++++++++------ 3 files changed, 18 insertions(+), 15 deletions(-) diff --git a/.travis.yml b/.travis.yml index 4e4c72d..bee9494 100644 --- a/.travis.yml +++ b/.travis.yml @@ -16,11 +16,11 @@ install: - python setup.py install script: - - python tests/test.py -l javascript tests/stuff.js - - python tests/test.py -l python tests/stuff.py - - python tests/test.py -l ruby tests/stuff.rb - - python tests/test.py -l c tests/stuff.c - - python tests/test.py -l cpp tests/stuff.cpp - - python tests/test.py -l java tests/stuff.java - - python tests/test.py -l rust tests/stuff.rs - - python tests/test.py -l go tests/stuff.go + - wsyntree-collector file -l javascript tests/stuff.js + - wsyntree-collector file -l python tests/stuff.py + - wsyntree-collector file -l ruby tests/stuff.rb + - wsyntree-collector file -l c tests/stuff.c + - wsyntree-collector file -l cpp tests/stuff.cpp + - wsyntree-collector file -l java tests/stuff.java + - wsyntree-collector file -l rust tests/stuff.rs + - wsyntree-collector file -l go tests/stuff.go diff --git a/LICENSE.md b/LICENSE.md index c2a43a7..2efcca8 100644 --- a/LICENSE.md +++ b/LICENSE.md @@ -1,4 +1,4 @@ -Copyright (c) 2020-2021; +Copyright (c) 2020-2023; Ben Klein (robobenklein) *et al* (`git shortlog -sn`) diff --git a/README.md b/README.md index ca1eb6c..c70d5da 100644 --- a/README.md +++ b/README.md @@ -12,11 +12,8 @@ Scales to incredible size: the goal is to build a system capable of storing, par WorldSyntaxTree is built upon the following technologies: - Python: we chose this as it is quick to develop and has a large ecosystem of scientific libraries that we aim to be able to support integration with. In our field of research Python is the most popular to use for quickly wrangling large and complex datasets. + - [NetworkX](https://networkx.org/): graph wrangling library of choice - [Tree-Sitter](https://tree-sitter.github.io/tree-sitter/): this is the integral component to enable us to generate the concrete syntax trees from code quickly while still generating a somewhat useful result even on broken code. -- [ArangoDB](https://www.arangodb.com/): our choice of database stemmed from the following requirements: - - Must be open source (free for use and improvement by all) - - Must support our incredibly large data size (many terabytes) - - Must have native/serverside graph processing capabilities - Git: the outer / top-level structure for the whole tree is based upon Git's structure of repositories, commits, and files, thus we aren't currently exploring other VCS systems (though we might in the far future) For a full list of libraries used, check the `setup.py` or `requirements.txt`. @@ -54,9 +51,15 @@ Requirements: - Standard development tooling (git, pip, python-dev, setuptools, etc) - C++ compiler (ubuntu: `libc++-dev libc++abi-dev`, plus any other dependencies needed for WST to auto-compile Tree-Sitter languages) - Python 3.8+ -- Optional: an ArangoDB instance -Install steps: +Recommended: perform these steps in a python Virtual Environment: + +``` +virtualenv -p python3 venv +source venv/bin/activate # run again to activate the virtualenv again later +``` + +Install dependencies and wsyntree itself: ``` python -m pip install -r requirements.txt From 9a573018e9a2794dc1bbf3245045bb32714f58e1 Mon Sep 17 00:00:00 2001 From: Ben Klein Date: Mon, 13 Feb 2023 19:45:53 -0500 Subject: [PATCH 06/29] one arg one line --- wsyntree_collector/file/parse_file_treesitter.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/wsyntree_collector/file/parse_file_treesitter.py b/wsyntree_collector/file/parse_file_treesitter.py index 81b4928..2f8f02a 100644 --- a/wsyntree_collector/file/parse_file_treesitter.py +++ b/wsyntree_collector/file/parse_file_treesitter.py @@ -35,7 +35,8 @@ # return ndf.persist().repartition(1) def build_networkx_graph( - lang: TreeSitterAutoBuiltLanguage, file: Path, + lang: TreeSitterAutoBuiltLanguage, + file: Path, only_named_nodes: bool = False, include_text: bool = False, node_name_prefix="", From b5cd3cd74c7768730f6589eb14c0793a2bb79e82 Mon Sep 17 00:00:00 2001 From: Ben Klein Date: Mon, 13 Feb 2023 21:46:27 -0500 Subject: [PATCH 07/29] initial attempt at node hashing --- wsyntree/hashtypes/__init__.py | 59 ++++++++++++++++++ wsyntree_collector/__main__.py | 4 ++ wsyntree_collector/commands/__init__.py | 1 + wsyntree_collector/commands/node_hash_v1.py | 61 +++++++++++++++++++ .../file/parse_file_treesitter.py | 7 ++- 5 files changed, 129 insertions(+), 3 deletions(-) create mode 100644 wsyntree/hashtypes/__init__.py create mode 100644 wsyntree_collector/commands/node_hash_v1.py diff --git a/wsyntree/hashtypes/__init__.py b/wsyntree/hashtypes/__init__.py new file mode 100644 index 0000000..f39fff1 --- /dev/null +++ b/wsyntree/hashtypes/__init__.py @@ -0,0 +1,59 @@ +""" +# Why are there multiple node hash types? + +Multiple types are needed because different properties can be included or excluded +from the hash, e.g. to include or not to include named nodes. + + +""" +from enum import Enum +import hashlib +import functools + +from wsyntree import log + +import orjson +import networkx as nx + +class WSTNodeHashV1(): + __included__ = ["named", "type"] + def __init__(self, G, node): + """""" + self._graph = G + self._node = node + self._nodes = [] + nodes = nx.dfs_preorder_nodes(G, source=node) + for node in nodes: + self._nodes.append(node) + + @functools.cache + def _get_hashable_repr(self): + s = bytearray(b"WSTNodeHashV1<") + nodedata = list(map( + lambda x: {k:v for k,v in x.items() if k in self.__included__}, + [self._graph.nodes[n] for n in self._nodes] + )) + # s += ",".join([f"{list(nd.items())}" for nd in nodedata]) + # we must sort keys here in case a python version is used that does not + # preserve dict ordering is used + s += orjson.dumps(nodedata, option=orjson.OPT_SORT_KEYS) + # log.debug(f"{self}, {nodedata}") + s += b">" + return s + + @property + def _node_props(self): + return self._graph.nodes[self._node] + + def _get_sha512_hex(self): + h = hashlib.sha512() + h.update(self._get_hashable_repr()) + return h.hexdigest() + + def __str__(self): + return f"WSTNodeHashV1" + +# Once defined here the behavior should not change (stable versions) +class WSTNodeHashType(Enum): + # V1 = WSTNodeHashV1 + pass diff --git a/wsyntree_collector/__main__.py b/wsyntree_collector/__main__.py index 4ca4880..4a48b7e 100644 --- a/wsyntree_collector/__main__.py +++ b/wsyntree_collector/__main__.py @@ -256,6 +256,10 @@ def __main__(): 'file', aliases=[], help="Run WST on a single file") commands.file.set_args(cmd_file) + cmd_node_hash_v1 = subcmds.add_parser( + 'node_hash_v1', aliases=['nhv1'], help="Hash syntax nodes (V1)") + commands.node_hash_v1.set_args(cmd_node_hash_v1) + args = parser.parse_args() if args.verbose: diff --git a/wsyntree_collector/commands/__init__.py b/wsyntree_collector/commands/__init__.py index 930a3f3..716ed88 100644 --- a/wsyntree_collector/commands/__init__.py +++ b/wsyntree_collector/commands/__init__.py @@ -1,2 +1,3 @@ from . import file +from . import node_hash_v1 diff --git a/wsyntree_collector/commands/node_hash_v1.py b/wsyntree_collector/commands/node_hash_v1.py new file mode 100644 index 0000000..7992af5 --- /dev/null +++ b/wsyntree_collector/commands/node_hash_v1.py @@ -0,0 +1,61 @@ + +from pathlib import Path + +import networkx as nx +from networkx.readwrite import json_graph +import orjson + +from wsyntree import log +from wsyntree.wrap_tree_sitter import TreeSitterAutoBuiltLanguage, TreeSitterCursorIterator +from wsyntree.hashtypes import WSTNodeHashV1 + +from ..file.parse_file_treesitter import build_networkx_graph + +def set_args(parser): + parser.set_defaults(func=run) + parser.add_argument("which_file", type=Path, help="Path to input file") + parser.add_argument("what_node_type", type=str, help="What kind of nodes to hash") + parser.add_argument("-l", "--lang", help="Language parser to use") + parser.add_argument("-o", "--output", help="File to write result to", type=Path) + +def run(args): + log.info(f"Running for file {args.which_file}") + + if not args.lang: + raise NotImplementedError(f"Automatic detection of file language not supported. Please specify a TreeSitter parser to use.") + + lang = TreeSitterAutoBuiltLanguage(args.lang) + + graph = build_networkx_graph(lang, args.which_file, include_text=True) + + log.info(f"Graph result: {graph}") + + log.info(f"Searching for nodes of type {args.what_node_type}") + hashed_nodes = [] + for node in nx.dfs_preorder_nodes(graph): + if graph.nodes[node]['type'] == args.what_node_type: + log.debug(f"Node {node} (x1:{graph.nodes[node]['x1']}) matched type {args.what_node_type}, hashing...") + hashed_nodes.append(WSTNodeHashV1(graph, node)) + + # tree_data = json_graph.tree_data(graph, 0) + # log.debug(f"Graph tree_data: {tree_data}") + + if args.output: + if str(args.output) == "-": + raise NotImplementedError(f"output to stdout not yet supported") + + log.info(f"Writing to {args.output} ...") + with args.output.open('wb') as f: + for nh in hashed_nodes: + f.write(orjson.dumps({ + "sha512": nh._get_sha512_hex(), + "file": str(args.which_file), + }, option=orjson.OPT_APPEND_NEWLINE)) + # if str(args.output).endswith(".graphml"): + # log.info("Writing GraphML") + # nx.write_graphml(graph, args.output) + # else: + # with args.output.open('wb') as f: + # f.write(orjson.dumps( + # tree_data, option=orjson.OPT_APPEND_NEWLINE + # )) diff --git a/wsyntree_collector/file/parse_file_treesitter.py b/wsyntree_collector/file/parse_file_treesitter.py index 2f8f02a..6e53ce8 100644 --- a/wsyntree_collector/file/parse_file_treesitter.py +++ b/wsyntree_collector/file/parse_file_treesitter.py @@ -80,13 +80,14 @@ def build_networkx_graph( log.debug(f"adding node {preorder}: {nn}") # insert node and it's data - G.add_node(node_name_prefix + str(preorder), **nn) + G.add_node(preorder, **nn) # add the edge if cur_node.parent is not None: - log.debug(f"connecting node {preorder}, to id {cur_node.parent.id}") + parent_preorder = ts_id_to_preorder[cur_node.parent.id] + log.debug(f"connecting node {preorder}, to {parent_preorder}") G.add_edge( - ts_id_to_preorder[cur_node.parent.id], + parent_preorder, preorder ) From 5b84da560076545ad7061f4cb776ce263f09f373 Mon Sep 17 00:00:00 2001 From: Ben Klein Date: Mon, 13 Feb 2023 21:55:43 -0500 Subject: [PATCH 08/29] coords in jsonl output --- wsyntree_collector/commands/node_hash_v1.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/wsyntree_collector/commands/node_hash_v1.py b/wsyntree_collector/commands/node_hash_v1.py index 7992af5..e3e5ca2 100644 --- a/wsyntree_collector/commands/node_hash_v1.py +++ b/wsyntree_collector/commands/node_hash_v1.py @@ -50,6 +50,10 @@ def run(args): f.write(orjson.dumps({ "sha512": nh._get_sha512_hex(), "file": str(args.which_file), + # obviously we want coords to allow humans to quickly find the referenced code + "x1": nh._node_props['x1'], + "y1": nh._node_props['y1'], + # TODO: "objectid": git-object-id, (when run in batch mode) }, option=orjson.OPT_APPEND_NEWLINE)) # if str(args.output).endswith(".graphml"): # log.info("Writing GraphML") From c48ab8756a69091c1220950b023fdd4b77b57872 Mon Sep 17 00:00:00 2001 From: Ben Klein Date: Fri, 10 Mar 2023 14:43:46 -0500 Subject: [PATCH 09/29] type of node and x2 --- wsyntree_collector/commands/node_hash_v1.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/wsyntree_collector/commands/node_hash_v1.py b/wsyntree_collector/commands/node_hash_v1.py index e3e5ca2..4d73bed 100644 --- a/wsyntree_collector/commands/node_hash_v1.py +++ b/wsyntree_collector/commands/node_hash_v1.py @@ -52,7 +52,10 @@ def run(args): "file": str(args.which_file), # obviously we want coords to allow humans to quickly find the referenced code "x1": nh._node_props['x1'], + "x2": nh._node_props['x2'], "y1": nh._node_props['y1'], + # while the node's hash *is* unique, make it easier to find: + "type": nh._node_props['type'], # TODO: "objectid": git-object-id, (when run in batch mode) }, option=orjson.OPT_APPEND_NEWLINE)) # if str(args.output).endswith(".graphml"): From c6760689663601a7f9297149edc0938002f9fde9 Mon Sep 17 00:00:00 2001 From: Ben Klein Date: Fri, 24 Mar 2023 00:24:27 -0400 Subject: [PATCH 10/29] reenable c_sharp and fix the root node ERROR --- docker-compose.yml | 2 +- requirements.txt | 1 + wsyntree/constants.py | 8 ++++---- wsyntree/exceptions.py | 3 +++ wsyntree/wrap_tree_sitter.py | 3 +++ wsyntree_collector/file/parse_file_treesitter.py | 3 +++ 6 files changed, 15 insertions(+), 5 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 82f33a4..a0a8ce4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -14,7 +14,7 @@ services: # - NEO4J_BOLT_URL=bolt://neo4j:pass@neo4j:7687 neo4j: - image: neo4j:4.4 + image: neo4j:5 ports: - 127.0.0.1:9784:7474 - 127.0.0.1:9787:7687 diff --git a/requirements.txt b/requirements.txt index a08d128..683864f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ tree_sitter@git+https://github.com/utk-se/py-tree-sitter.git@master#egg=tree_sitter +grand-cypher@git+https://github.com/aplbrain/grand-cypher.git@master#egg=grand-cypher python-arango coloredlogs pygit2 diff --git a/wsyntree/constants.py b/wsyntree/constants.py index dd8e0ef..702c62c 100644 --- a/wsyntree/constants.py +++ b/wsyntree/constants.py @@ -23,10 +23,10 @@ "tsrepo": "https://github.com/tree-sitter/tree-sitter-ruby.git", "file_ext": "\.rb$", }, - # "csharp": { - # "tsrepo": "https://github.com/tree-sitter/tree-sitter-c-sharp.git", - # "file_ext": "\.cs$", - # }, + "c_sharp": { # name in compiled language uses underscore + "tsrepo": "https://github.com/tree-sitter/tree-sitter-c-sharp.git", + "file_ext": "\.cs$", + }, "c": { "tsrepo": "https://github.com/tree-sitter/tree-sitter-c.git", "file_ext": "\.(c|h)$", diff --git a/wsyntree/exceptions.py b/wsyntree/exceptions.py index 12a0d53..e9ac257 100644 --- a/wsyntree/exceptions.py +++ b/wsyntree/exceptions.py @@ -31,6 +31,9 @@ class UnhandledGitFileMode(ValueError, WSTBaseError): class DeduplicatedObjectMismatch(ValueError, WSTBaseError): pass +class RootTreeSitterNodeIsError(ValueError, WSTBaseError): + pass + def isArangoWriteWriteConflict(e: ArangoDocumentInsertError) -> bool: """Is an exception a Write-Write conflict?""" if isinstance(e, ArangoDocumentInsertError): diff --git a/wsyntree/wrap_tree_sitter.py b/wsyntree/wrap_tree_sitter.py index bde056a..66d3153 100644 --- a/wsyntree/wrap_tree_sitter.py +++ b/wsyntree/wrap_tree_sitter.py @@ -4,6 +4,7 @@ import functools import re import time +import warnings import pebble from tree_sitter import Language, Parser, TreeCursor, Node @@ -18,6 +19,7 @@ class TreeSitterAutoBuiltLanguage(): def __init__(self, lang): + assert lang in wsyntree_langs, f"{lang} not found or not yet available in WorldSyntaxTree" self.lang = lang self.parser = None self.ts_language = None @@ -42,6 +44,7 @@ def _get_language_repo(self): if not repodir.exists(): repodir.mkdir(mode=0o770) log.debug(f"cloning treesitter repo for {self}") + warnings.warn(f"WorldSyntaxTree cloning parser repo for {self.lang}, this might be slow.") return git.clone_repository( wsyntree_langs[self.lang]["tsrepo"], repodir.resolve() diff --git a/wsyntree_collector/file/parse_file_treesitter.py b/wsyntree_collector/file/parse_file_treesitter.py index 6e53ce8..cf489d9 100644 --- a/wsyntree_collector/file/parse_file_treesitter.py +++ b/wsyntree_collector/file/parse_file_treesitter.py @@ -8,6 +8,7 @@ import networkx as nx from wsyntree import log +from wsyntree.exceptions import RootTreeSitterNodeIsError from wsyntree.utils import dotdict from wsyntree.wrap_tree_sitter import TreeSitterAutoBuiltLanguage, TreeSitterCursorIterator @@ -55,6 +56,8 @@ def build_networkx_graph( ts_id_to_preorder = {} root = cursor.peek() + if root.type == "ERROR": + raise RootTreeSitterNodeIsError(f"the file content or language is likely wrong for this parser") # ts_id_to_preorder[root.id] = 0 for cur_node in chain([root], cursor): From 7d74e10f07b9530205a1f36eab903ea03a285ba7 Mon Sep 17 00:00:00 2001 From: Ben Klein Date: Sun, 9 Apr 2023 22:05:56 -0400 Subject: [PATCH 11/29] wip wociterators and nhv1_blobfuncs_v0 --- .../commands/woc/nhv1_blobfuncs_v0.py | 43 +++++++++++++++++++ wsyntree_collector/wociterators.py | 38 ++++++++++++++++ 2 files changed, 81 insertions(+) create mode 100644 wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py create mode 100644 wsyntree_collector/wociterators.py diff --git a/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py b/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py new file mode 100644 index 0000000..aefa907 --- /dev/null +++ b/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py @@ -0,0 +1,43 @@ +""" +This "command" is only to be run as a job on the DA cluster, not used as a subcommand. +""" + +import argparse +import subprocess +from typing import List, Optional +from pathlib import Path + +import networkx as nx +from networkx.readwrite import json_graph +import orjson + +from wsyntree import log +from wsyntree.wrap_tree_sitter import TreeSitterAutoBuiltLanguage, TreeSitterCursorIterator +from wsyntree.hashtypes import WSTNodeHashV1 + +from wsyntree_collector.file.parse_file_treesitter import build_networkx_graph +from wsyntree_collector.wociterators import all_blobs as all_blobs_iterator + +#import oscar + + +output_root_dir = Path("/da7_data/WorldSyntaxTree/nhv1/blobfuncs_v0") +assert output_root_dir.is_dir(), f"Output directory does not exist." + + +def get_filenames_for_blob(b: str) -> List[Path]: + """Inefficient version: calls lookup b2f because I am lazy to rewrite it in python""" + c = subprocess.run( + ["bash", getValuesScript, "b2f"], + stdin=f"{b}\n", + text=True, + capture_output=True, + ) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser("WST Collector NHV1 BlobFuncs v0") + + args = parser.parser_args() + + print(get_filenames_for_blob(0194e44034e001afd2bdb3b54e6c11a288e25806)) diff --git a/wsyntree_collector/wociterators.py b/wsyntree_collector/wociterators.py new file mode 100644 index 0000000..b670c09 --- /dev/null +++ b/wsyntree_collector/wociterators.py @@ -0,0 +1,38 @@ +import sys +import os +from pathlib import Path + +import oscar + +blob_sections = 128 +all_blob_sections = range(0, blob_sections) + +blob_fbase = "/da5_data/All.blobs/blob_{section}.{ext}" + +def iter_blobs(section): + p_idx = Path(blob_fbase.format(section=section, ext="idx")) + p_bin = Path(blob_fbase.format(section=section, ext="bin")) + + with p_idx.open("rt") as idx_f, p_bin.open("rb") as bin_f: + for idx_line in idx_f: + fields = idx_line.rstrip().split(";") + _hash = fields[3] + if len(fields) > 4: + _hash = fields[4] + offset = int(fields[1]) + length = int(fields[2]) + bin_f.seek(offset, os.SEEK_SET) + val = oscar.decomp(bin_f.read(length)) + yield (_hash, val) + +def all_blobs(): + for sec in all_blob_sections: + for i in iter_blobs(sec): + yield i + +if __name__ == "__main__": + it = all_blobs() + + for _ in range(5): + n = next(it) + print(f"Blob {n[0]}: content length {len(n[1])}") From d60060a19ceef411c1e341aff2c3ab604599c0c9 Mon Sep 17 00:00:00 2001 From: Ben Klein Date: Mon, 10 Apr 2023 01:22:22 -0400 Subject: [PATCH 12/29] iterating blobs with filename filter --- woc-support/Dockerfile | 11 +++ woc-support/docker-compose.yml | 29 ++++++++ .../commands/woc/nhv1_blobfuncs_v0.py | 74 ++++++++++++++++--- wsyntree_collector/wociterators.py | 31 +++++--- 4 files changed, 124 insertions(+), 21 deletions(-) create mode 100644 woc-support/Dockerfile create mode 100644 woc-support/docker-compose.yml diff --git a/woc-support/Dockerfile b/woc-support/Dockerfile new file mode 100644 index 0000000..67846cd --- /dev/null +++ b/woc-support/Dockerfile @@ -0,0 +1,11 @@ +FROM robobenklein/home:latest + +USER root +RUN install_clean python3-dev libghc-bzlib-dev +ARG UNAME=testuser +ARG UID=1000 +ARG GID=1000 +RUN groupadd -g $GID -o $UNAME +RUN useradd -m -u $UID -g $GID -G sudo -o -s /bin/zsh $UNAME +USER $UNAME +WORKDIR /home/$UNAME diff --git a/woc-support/docker-compose.yml b/woc-support/docker-compose.yml new file mode 100644 index 0000000..e604f89 --- /dev/null +++ b/woc-support/docker-compose.yml @@ -0,0 +1,29 @@ + +version: '2' + +services: + wst-jobrunner: + build: + context: . + args: + # remember to export these in the shell before build: + UNAME: ${USER} + GID: ${GID} + UID: ${UID} + #image: robobenklein/home:latest + restart: "no" + environment: + - OSCAR_TEST=1 + volumes: + - "/da0_data:/da0_data:ro" + - "/da1_data:/da1_data:ro" + - "/da2_data:/da2_data:ro" + - "/da3_data:/da3_data:ro" + - "/da4_data:/da4_data:ro" + - "/da4_fast:/da4_fast:ro" + - "/da5_data:/da5_data:ro" + - "/da5_fast:/da5_fast:ro" + - "/da7_data:/da7_data:ro" + - "/da7_data/WorldSyntaxTree:/da7_data/WorldSyntaxTree" # RW: output in here + - "/home/bklein3:/home/bklein3" + mem_limit: 16G diff --git a/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py b/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py index aefa907..edc781d 100644 --- a/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py +++ b/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py @@ -5,14 +5,18 @@ import argparse import subprocess from typing import List, Optional -from pathlib import Path +from pathlib import Path, PurePath, PurePosixPath +from collections import Counter, namedtuple import networkx as nx from networkx.readwrite import json_graph import orjson +import pebble from wsyntree import log -from wsyntree.wrap_tree_sitter import TreeSitterAutoBuiltLanguage, TreeSitterCursorIterator +from wsyntree.wrap_tree_sitter import ( + TreeSitterAutoBuiltLanguage, TreeSitterCursorIterator, get_TSABL_for_file, +) from wsyntree.hashtypes import WSTNodeHashV1 from wsyntree_collector.file.parse_file_treesitter import build_networkx_graph @@ -21,23 +25,69 @@ #import oscar +getValuesScript = Path("~/lookup/getValues").expanduser() +assert getValuesScript.exists(), f"Expected getValues at {getValuesScript}" output_root_dir = Path("/da7_data/WorldSyntaxTree/nhv1/blobfuncs_v0") assert output_root_dir.is_dir(), f"Output directory does not exist." -def get_filenames_for_blob(b: str) -> List[Path]: - """Inefficient version: calls lookup b2f because I am lazy to rewrite it in python""" - c = subprocess.run( - ["bash", getValuesScript, "b2f"], - stdin=f"{b}\n", - text=True, - capture_output=True, - ) +#def get_filenames_for_blob(b: str) -> List[PurePosixPath]: +# """Inefficient version: calls lookup b2f because I am lazy to rewrite it in python""" +# c = subprocess.run( +# ["bash", getValuesScript, "b2f"], +# input=f"{b}\n", +# text=True, +# capture_output=True, +# ) +# if c.returncode != 0: +# log.trace(log.warn, c.stdout) +# log.trace(log.warn, c.stderr) +# c.check_returncode() +# lines = c.stdout.strip().split("\n") +# if len(lines) != 1: +# log.warn(f"getValues gave {len(lines)} lines for {b}") +# filenames = [] +# for line in lines: +# for fname in line.split(";")[1:]: +# filenames.append(PurePosixPath(fname)) +# return filenames +def wst_supports_fnames(fnames): + filenames = fnames + if not filenames: + return False + exts = Counter([x.suffix for x in filenames if x.suffix]) + primary_ext = exts.most_common(1) + if not primary_ext: + return False + primary_ext = primary_ext[0][0] + if get_TSABL_for_file(str(primary_ext)) is not None: + return True + log.debug(f"WST {primary_ext} not supported: {filenames[0]}") + return False + +def run_blob(blob_pair): + """""" + blob, content, filenames = blob_pair + if not filenames: + return (None, "NO_FILENAMES") + exts = Counter([x.suffix for x in filenames if x.suffix]) + primary_ext = exts.most_common(1) + if not primary_ext: + return (None, "NO_FILENAME_SUFFIX") + primary_ext = primary_ext[0][0] + tsabl = get_TSABL_for_file(str(primary_ext)) # pebble synchronized cache + log.debug(f"Start parsing {blob} with {tsabl}") if __name__ == "__main__": parser = argparse.ArgumentParser("WST Collector NHV1 BlobFuncs v0") + parser.add_argument("-w", "--workers", type=int, default=4) + parser.add_argument("-v", "--verbose", action="store_true") + args = parser.parse_args() + if args.verbose: + log.setLevel(log.DEBUG) - args = parser.parser_args() + blobs = all_blobs_iterator(filefilter=lambda f: wst_supports_fnames(f)) - print(get_filenames_for_blob(0194e44034e001afd2bdb3b54e6c11a288e25806)) + with pebble.ThreadPool(max_workers=args.workers) as pool: + pool.map(run_blob, blobs) diff --git a/wsyntree_collector/wociterators.py b/wsyntree_collector/wociterators.py index b670c09..972ebe4 100644 --- a/wsyntree_collector/wociterators.py +++ b/wsyntree_collector/wociterators.py @@ -1,6 +1,8 @@ import sys import os -from pathlib import Path +import gzip +from pathlib import Path, PurePosixPath +from collections import namedtuple import oscar @@ -9,25 +11,36 @@ blob_fbase = "/da5_data/All.blobs/blob_{section}.{ext}" -def iter_blobs(section): - p_idx = Path(blob_fbase.format(section=section, ext="idx")) +blobresult = namedtuple("BlobResult", ["hash", "content", "filenames"]) + +def iter_blobs(section, blobfilter=lambda b: True, filefilter=lambda fnames: True): + """ + blobfilter(str) -> bool + filefilter(List[PurePosixPath]) -> bool + + all provided filters must pass + """ + p_idx = Path(blob_fbase.format(section=section, ext="idxf")) p_bin = Path(blob_fbase.format(section=section, ext="bin")) - with p_idx.open("rt") as idx_f, p_bin.open("rb") as bin_f: + with gzip.open(p_idx, "rt") as idx_f, p_bin.open("rb") as bin_f: for idx_line in idx_f: fields = idx_line.rstrip().split(";") _hash = fields[3] - if len(fields) > 4: - _hash = fields[4] + filenames = tuple(PurePosixPath(x) for x in fields[4:]) offset = int(fields[1]) length = int(fields[2]) + if not blobfilter(_hash): + continue + if not filefilter(filenames): + continue bin_f.seek(offset, os.SEEK_SET) val = oscar.decomp(bin_f.read(length)) - yield (_hash, val) + yield blobresult(_hash, val, filenames) -def all_blobs(): +def all_blobs(**kwargs): for sec in all_blob_sections: - for i in iter_blobs(sec): + for i in iter_blobs(sec, **kwargs): yield i if __name__ == "__main__": From 40fc8dc37e1cd7a9d0e973af811e98c3e7aea6f4 Mon Sep 17 00:00:00 2001 From: Ben Klein Date: Mon, 10 Apr 2023 15:44:41 -0400 Subject: [PATCH 13/29] write to stdout for file subcommand --- tests/stuff.rb | 7 ++++++- wsyntree_collector/commands/file.py | 10 ++++++---- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/tests/stuff.rb b/tests/stuff.rb index b85a042..07d461b 100644 --- a/tests/stuff.rb +++ b/tests/stuff.rb @@ -1 +1,6 @@ -puts "Hello World" + +def hello_world + puts "Hello World" +end + +hello_world diff --git a/wsyntree_collector/commands/file.py b/wsyntree_collector/commands/file.py index f1f90e9..86d0361 100644 --- a/wsyntree_collector/commands/file.py +++ b/wsyntree_collector/commands/file.py @@ -1,4 +1,5 @@ +import sys from pathlib import Path import networkx as nx @@ -45,13 +46,14 @@ def run(args): if args.output: if str(args.output) == "-": - raise NotImplementedError(f"output to stdout not yet supported") - - log.info(f"Writing to {args.output} ...") - if str(args.output).endswith(".graphml"): + #raise NotImplementedError(f"output to stdout not yet supported") + sys.stdout.buffer.write(orjson.dumps(tree_data, option=orjson.OPT_APPEND_NEWLINE)) + elif str(args.output).endswith(".graphml"): + log.info(f"Writing to {args.output} ...") log.info("Writing GraphML") nx.write_graphml(graph, args.output) else: + log.info(f"Writing to {args.output} ...") with args.output.open('wb') as f: f.write(orjson.dumps( tree_data, option=orjson.OPT_APPEND_NEWLINE From 47954b443a12de2920337652787c599b4a86004b Mon Sep 17 00:00:00 2001 From: Ben Klein Date: Mon, 10 Apr 2023 15:45:20 -0400 Subject: [PATCH 14/29] py3.8 --- wsyntree/hashtypes/__init__.py | 2 +- .../commands/woc/nhv1_blobfuncs_v0.py | 67 +++++++++++++++++-- 2 files changed, 63 insertions(+), 6 deletions(-) diff --git a/wsyntree/hashtypes/__init__.py b/wsyntree/hashtypes/__init__.py index f39fff1..74c8ef1 100644 --- a/wsyntree/hashtypes/__init__.py +++ b/wsyntree/hashtypes/__init__.py @@ -26,7 +26,7 @@ def __init__(self, G, node): for node in nodes: self._nodes.append(node) - @functools.cache + @functools.lru_cache(maxsize=None) # functools.cache added in 3.9 def _get_hashable_repr(self): s = bytearray(b"WSTNodeHashV1<") nodedata = list(map( diff --git a/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py b/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py index edc781d..3ebf6c3 100644 --- a/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py +++ b/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py @@ -4,6 +4,9 @@ import argparse import subprocess +import traceback +import time +import tempfile from typing import List, Optional from pathlib import Path, PurePath, PurePosixPath from collections import Counter, namedtuple @@ -30,6 +33,13 @@ output_root_dir = Path("/da7_data/WorldSyntaxTree/nhv1/blobfuncs_v0") assert output_root_dir.is_dir(), f"Output directory does not exist." +FUNCDEF_TYPES = ( + "function_definition", + "function_declaration", + "method_declaration", + "method", # ruby + "function_item", # rust +) #def get_filenames_for_blob(b: str) -> List[PurePosixPath]: # """Inefficient version: calls lookup b2f because I am lazy to rewrite it in python""" @@ -63,9 +73,20 @@ def wst_supports_fnames(fnames): primary_ext = primary_ext[0][0] if get_TSABL_for_file(str(primary_ext)) is not None: return True - log.debug(f"WST {primary_ext} not supported: {filenames[0]}") + #log.debug(f"WST {primary_ext} not supported: {filenames[0]}") return False +def output_path_for_blob(b): + outdir = output_root_dir / b[0:2] / b[2:4] + outdir.mkdir(parents=True, exist_ok=True) + return outdir / f"{b}.jsonl" + +def blob_not_yet_processed(b): + p = output_path_for_blob(b) + if p.is_file(): + return False + return True + def run_blob(blob_pair): """""" blob, content, filenames = blob_pair @@ -77,7 +98,23 @@ def run_blob(blob_pair): return (None, "NO_FILENAME_SUFFIX") primary_ext = primary_ext[0][0] tsabl = get_TSABL_for_file(str(primary_ext)) # pebble synchronized cache - log.debug(f"Start parsing {blob} with {tsabl}") + #log.debug(f"Start parsing {blob} with {tsabl}") + + with tempfile.TemporaryDirectory(prefix="wst_") as tempdir: + log.debug(f"{blob}: working in {tempdir}") + outfile = output_path_for_blob(blob) + tempdir = Path(tempdir) + codepath = (tempdir / f"file{primary_ext}") + with codepath.open("wb") as f: + f.write(content) + graph = build_networkx_graph(tsabl, codepath, include_text=False) + hashed_nodes = [] + for node in nx.dfs_preorder_nodes(graph): + if graph.nodes[node]['type'] in FUNCDEF_TYPES: + hashed_nodes.append(WSTNodeHashV1(graph, node)) + + log.debug(f"{blob}: finished") + return None if __name__ == "__main__": parser = argparse.ArgumentParser("WST Collector NHV1 BlobFuncs v0") @@ -87,7 +124,27 @@ def run_blob(blob_pair): if args.verbose: log.setLevel(log.DEBUG) - blobs = all_blobs_iterator(filefilter=lambda f: wst_supports_fnames(f)) - with pebble.ThreadPool(max_workers=args.workers) as pool: - pool.map(run_blob, blobs) + log.info(f"pool workers {args.workers}") + try: + blobs = all_blobs_iterator(blobfilter=blob_not_yet_processed, filefilter=lambda f: wst_supports_fnames(f)) + def _rb(*a, **kwa): + try: + run_blob(*a, **kwa) + except Exception as e: + log.trace(log.error, traceback.format_exc()) + log.error(f"{e}") + results = pool.map(_rb, blobs) + pool.join() + except KeyboardInterrupt as e: + log.warn(f"Caught KeyboardInterrupt, stopping pool...") + pool.close() + pool.stop() + pool.join() + + try: + for res in results.result(): + pass + except Exception as e: + log.trace(log.error, traceback.format_exc()) + log.error(f"{e}") From 574772e42a4c245b0eeffc6ae5f11365b13e2c07 Mon Sep 17 00:00:00 2001 From: Ben Klein Date: Mon, 10 Apr 2023 16:29:23 -0400 Subject: [PATCH 15/29] now recording data outputs --- wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py | 12 ++++++++++++ wsyntree_collector/file/parse_file_treesitter.py | 4 ++-- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py b/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py index 3ebf6c3..f168beb 100644 --- a/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py +++ b/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py @@ -113,6 +113,18 @@ def run_blob(blob_pair): if graph.nodes[node]['type'] in FUNCDEF_TYPES: hashed_nodes.append(WSTNodeHashV1(graph, node)) + with outfile.open('wb') as f: + for nh in hashed_nodes: + f.write(orjson.dumps({ + "sha512": nh._get_sha512_hex(), + "blob": blob, + "x1": nh._node_props['x1'], + "x2": nh._node_props['x2'], + "y1": nh._node_props['y1'], + "type": nh._node_props['type'], + "lang": tsabl.lang, + }, option=orjson.OPT_APPEND_NEWLINE)) + log.debug(f"{blob}: finished") return None diff --git a/wsyntree_collector/file/parse_file_treesitter.py b/wsyntree_collector/file/parse_file_treesitter.py index cf489d9..6bb8bae 100644 --- a/wsyntree_collector/file/parse_file_treesitter.py +++ b/wsyntree_collector/file/parse_file_treesitter.py @@ -81,14 +81,14 @@ def build_networkx_graph( except: log.warn(f"Cannot decode text.") - log.debug(f"adding node {preorder}: {nn}") + #log.debug(f"adding node {preorder}: {nn}") # insert node and it's data G.add_node(preorder, **nn) # add the edge if cur_node.parent is not None: parent_preorder = ts_id_to_preorder[cur_node.parent.id] - log.debug(f"connecting node {preorder}, to {parent_preorder}") + #log.debug(f"connecting node {preorder}, to {parent_preorder}") G.add_edge( parent_preorder, preorder From 98ac961f4aa38e7c4470926404caf4aaa196f982 Mon Sep 17 00:00:00 2001 From: Ben Klein Date: Mon, 10 Apr 2023 23:42:55 -0400 Subject: [PATCH 16/29] manual pool approach --- .../commands/woc/nhv1_blobfuncs_v0.py | 71 ++++++++++++------- 1 file changed, 45 insertions(+), 26 deletions(-) diff --git a/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py b/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py index f168beb..32c2e29 100644 --- a/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py +++ b/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py @@ -9,12 +9,17 @@ import tempfile from typing import List, Optional from pathlib import Path, PurePath, PurePosixPath -from collections import Counter, namedtuple +from collections import Counter, namedtuple, deque +from concurrent import futures import networkx as nx from networkx.readwrite import json_graph import orjson import pebble +from pebble import concurrent +from tqdm import tqdm +from tqdm.contrib.logging import logging_redirect_tqdm +from pympler import tracker from wsyntree import log from wsyntree.wrap_tree_sitter import ( @@ -126,7 +131,17 @@ def run_blob(blob_pair): }, option=orjson.OPT_APPEND_NEWLINE)) log.debug(f"{blob}: finished") - return None + return (blob, len(hashed_nodes), outfile) + +@concurrent.process(name="wst-nhv1-blobfuncs") +#@concurrent.thread(name="wst-nhv1-blobfuncs") +def _rb(*a, **kwa): + try: + return run_blob(*a, **kwa) + #return None + except Exception as e: + log.trace(log.error, traceback.format_exc()) + log.error(f"{e}") if __name__ == "__main__": parser = argparse.ArgumentParser("WST Collector NHV1 BlobFuncs v0") @@ -136,27 +151,31 @@ def run_blob(blob_pair): if args.verbose: log.setLevel(log.DEBUG) - with pebble.ThreadPool(max_workers=args.workers) as pool: - log.info(f"pool workers {args.workers}") - try: - blobs = all_blobs_iterator(blobfilter=blob_not_yet_processed, filefilter=lambda f: wst_supports_fnames(f)) - def _rb(*a, **kwa): - try: - run_blob(*a, **kwa) - except Exception as e: - log.trace(log.error, traceback.format_exc()) - log.error(f"{e}") - results = pool.map(_rb, blobs) - pool.join() - except KeyboardInterrupt as e: - log.warn(f"Caught KeyboardInterrupt, stopping pool...") - pool.close() - pool.stop() - pool.join() - - try: - for res in results.result(): - pass - except Exception as e: - log.trace(log.error, traceback.format_exc()) - log.error(f"{e}") + log.info(f"pool workers {args.workers}") + q = deque(maxlen=args.workers) + try: + blobs = tqdm(all_blobs_iterator(blobfilter=blob_not_yet_processed, filefilter=lambda f: wst_supports_fnames(f))) + tr = tracker.SummaryTracker() + with logging_redirect_tqdm(): + #log.logger.addHandler(log.OutputHandler(tqdm.write)) + for blob_pair in blobs: + for t in list(q): + if t.done(): + t.result() + q.remove(t) + if len(q) >= args.workers: + # wait for workers to finish + #log.debug(f"waiting on {len(q)} workers...") + done, not_done = futures.wait(q, return_when=futures.FIRST_COMPLETED) + #log.debug(f"{len(done)} finished") + for t in done: + t.result() + q.remove(t) + + q.append(_rb(blob_pair)) + except KeyboardInterrupt as e: + log.warn(f"Caught KeyboardInterrupt, stopping ...") + tr.print_diff() + for t in q: + t.cancel() + tr.print_diff() From 93c7e563ec35c5e3b3e75651890626d7faa5802a Mon Sep 17 00:00:00 2001 From: Ben Klein Date: Mon, 10 Apr 2023 23:49:10 -0400 Subject: [PATCH 17/29] binary_line_iterator --- wsyntree_collector/wociterators.py | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/wsyntree_collector/wociterators.py b/wsyntree_collector/wociterators.py index 972ebe4..d1fccbd 100644 --- a/wsyntree_collector/wociterators.py +++ b/wsyntree_collector/wociterators.py @@ -13,6 +13,27 @@ blobresult = namedtuple("BlobResult", ["hash", "content", "filenames"]) +def binary_line_iterator(open_file): + """only yields lines that are decodable""" + chunksize = 4096 + lineno = 0 + buffer = bytearray() + for block in iter(lambda: open_file.read(chunksize), b""): + buffer += block + while b"\n" in buffer: + line, buffer = buffer.split(b"\n", 1) + lineno += 1 + try: + yield line.decode() + except UnicodeDecodeError: + log.warn(f"cannot decode line {lineno} of {open_file.name}") + if buffer: + for line in buffer.split(b"\n"): + try: + yield line.decode() + except UnicodeDecodeError: + log.warn(f"cannot decode line {lineno} of {open_file.name}") + def iter_blobs(section, blobfilter=lambda b: True, filefilter=lambda fnames: True): """ blobfilter(str) -> bool @@ -23,8 +44,9 @@ def iter_blobs(section, blobfilter=lambda b: True, filefilter=lambda fnames: Tru p_idx = Path(blob_fbase.format(section=section, ext="idxf")) p_bin = Path(blob_fbase.format(section=section, ext="bin")) - with gzip.open(p_idx, "rt") as idx_f, p_bin.open("rb") as bin_f: - for idx_line in idx_f: + # open the index in binary as well, encoding not specified + with gzip.open(p_idx, "rb") as idx_f, p_bin.open("rb") as bin_f: + for idx_line in binary_line_iterator(idx_f): fields = idx_line.rstrip().split(";") _hash = fields[3] filenames = tuple(PurePosixPath(x) for x in fields[4:]) From f517e21e43c4eedf4db946927a32cdb1b1877135 Mon Sep 17 00:00:00 2001 From: Ben Klein Date: Mon, 10 Apr 2023 23:57:38 -0400 Subject: [PATCH 18/29] forgot log import --- wsyntree_collector/wociterators.py | 1 + 1 file changed, 1 insertion(+) diff --git a/wsyntree_collector/wociterators.py b/wsyntree_collector/wociterators.py index d1fccbd..bfbf673 100644 --- a/wsyntree_collector/wociterators.py +++ b/wsyntree_collector/wociterators.py @@ -4,6 +4,7 @@ from pathlib import Path, PurePosixPath from collections import namedtuple +from wsyntree import log import oscar blob_sections = 128 From 864091924bb03bd2fc1e83b06dee38d26d423096 Mon Sep 17 00:00:00 2001 From: Ben Klein Date: Tue, 11 Apr 2023 00:34:42 -0400 Subject: [PATCH 19/29] pass specific errors only --- wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py b/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py index 32c2e29..0c49229 100644 --- a/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py +++ b/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py @@ -139,9 +139,13 @@ def _rb(*a, **kwa): try: return run_blob(*a, **kwa) #return None + except wsyntree.exceptions.RootTreeSitterNodeIsError as e: + log.error(f"{e}") + return (e, traceback.format_exc(), a, kwa) except Exception as e: log.trace(log.error, traceback.format_exc()) log.error(f"{e}") + raise e if __name__ == "__main__": parser = argparse.ArgumentParser("WST Collector NHV1 BlobFuncs v0") From 3d107dda2be7c85840314cb34d72232ae05f700d Mon Sep 17 00:00:00 2001 From: Ben Klein Date: Tue, 11 Apr 2023 00:47:35 -0400 Subject: [PATCH 20/29] can't pickle the BlobResult... --- wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py b/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py index 0c49229..8426109 100644 --- a/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py +++ b/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py @@ -21,6 +21,7 @@ from tqdm.contrib.logging import logging_redirect_tqdm from pympler import tracker +import wsyntree.exceptions from wsyntree import log from wsyntree.wrap_tree_sitter import ( TreeSitterAutoBuiltLanguage, TreeSitterCursorIterator, get_TSABL_for_file, @@ -141,7 +142,7 @@ def _rb(*a, **kwa): #return None except wsyntree.exceptions.RootTreeSitterNodeIsError as e: log.error(f"{e}") - return (e, traceback.format_exc(), a, kwa) + return (str(e), traceback.format_exc()) except Exception as e: log.trace(log.error, traceback.format_exc()) log.error(f"{e}") From 7123a41aa0de52ca436f584316321fcf44e39ce4 Mon Sep 17 00:00:00 2001 From: Ben Klein Date: Tue, 11 Apr 2023 15:28:43 -0400 Subject: [PATCH 21/29] retry for the munmap call error --- wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py | 6 ++++++ wsyntree_collector/file/parse_file_treesitter.py | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py b/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py index 8426109..babcdb3 100644 --- a/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py +++ b/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py @@ -16,6 +16,7 @@ from networkx.readwrite import json_graph import orjson import pebble +import tenacity from pebble import concurrent from tqdm import tqdm from tqdm.contrib.logging import logging_redirect_tqdm @@ -136,6 +137,11 @@ def run_blob(blob_pair): @concurrent.process(name="wst-nhv1-blobfuncs") #@concurrent.thread(name="wst-nhv1-blobfuncs") +@tenacity.retry( + retry=tenacity.retry_if_exception_type(pebble.common.ProcessExpired), + stop=tenacity.stop_after_attempt(5), + reraise=True, +) def _rb(*a, **kwa): try: return run_blob(*a, **kwa) diff --git a/wsyntree_collector/file/parse_file_treesitter.py b/wsyntree_collector/file/parse_file_treesitter.py index 6bb8bae..7ed69ae 100644 --- a/wsyntree_collector/file/parse_file_treesitter.py +++ b/wsyntree_collector/file/parse_file_treesitter.py @@ -57,7 +57,7 @@ def build_networkx_graph( root = cursor.peek() if root.type == "ERROR": - raise RootTreeSitterNodeIsError(f"the file content or language is likely wrong for this parser") + raise RootTreeSitterNodeIsError(f"the file content or language is likely wrong for this '{lang.lang}' parser") # ts_id_to_preorder[root.id] = 0 for cur_node in chain([root], cursor): From f4fcc45eb244f3654412be73f7bcf18738f3f043 Mon Sep 17 00:00:00 2001 From: Ben Klein Date: Tue, 11 Apr 2023 16:34:34 -0400 Subject: [PATCH 22/29] refactor to blobjob also su**pp**ress --- wsyntree/log.py | 4 +- .../commands/woc/nhv1_blobfuncs_v0.py | 63 ++++++++++++------- 2 files changed, 43 insertions(+), 24 deletions(-) diff --git a/wsyntree/log.py b/wsyntree/log.py index 2513b9d..af2bad2 100644 --- a/wsyntree/log.py +++ b/wsyntree/log.py @@ -69,7 +69,7 @@ def __exit__(self, type, value, traceback): logger.setLevel(self._prev_state) -class supress_stdout(): +class suppress_stdout(): """ Stops the logger's StreamHandlers temporarily. @@ -77,7 +77,7 @@ class supress_stdout(): or other terminal full-windows views. Use a 'with' statement: - with supress_stdout(): + with suppress_stdout(): # your code """ diff --git a/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py b/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py index babcdb3..1011a9d 100644 --- a/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py +++ b/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py @@ -94,9 +94,9 @@ def blob_not_yet_processed(b): return False return True -def run_blob(blob_pair): +def run_blob(blob, content, filenames): """""" - blob, content, filenames = blob_pair + #blob, content, filenames = blob_pair if not filenames: return (None, "NO_FILENAMES") exts = Counter([x.suffix for x in filenames if x.suffix]) @@ -137,11 +137,11 @@ def run_blob(blob_pair): @concurrent.process(name="wst-nhv1-blobfuncs") #@concurrent.thread(name="wst-nhv1-blobfuncs") -@tenacity.retry( - retry=tenacity.retry_if_exception_type(pebble.common.ProcessExpired), - stop=tenacity.stop_after_attempt(5), - reraise=True, -) +# @tenacity.retry( # retry here is not working: the C-level 'munmap' error exits the whole python process +# retry=tenacity.retry_if_exception_type(pebble.common.ProcessExpired), +# stop=tenacity.stop_after_attempt(5), +# reraise=True, +# ) def _rb(*a, **kwa): try: return run_blob(*a, **kwa) @@ -154,6 +154,8 @@ def _rb(*a, **kwa): log.error(f"{e}") raise e +blobjob = namedtuple("BlobJob", ["result", "args", "kwargs", "retry"]) + if __name__ == "__main__": parser = argparse.ArgumentParser("WST Collector NHV1 BlobFuncs v0") parser.add_argument("-w", "--workers", type=int, default=4) @@ -165,25 +167,42 @@ def _rb(*a, **kwa): log.info(f"pool workers {args.workers}") q = deque(maxlen=args.workers) try: - blobs = tqdm(all_blobs_iterator(blobfilter=blob_not_yet_processed, filefilter=lambda f: wst_supports_fnames(f))) + blobs = tqdm(all_blobs_iterator( + blobfilter=blob_not_yet_processed, + filefilter=lambda f: wst_supports_fnames(f) + )) tr = tracker.SummaryTracker() - with logging_redirect_tqdm(): + def collect_or_retry(job: blobjob): + q.remove(job) + try: + job.result.result() + except pebble.common.ProcessExpired as e: + log.error(f"{job.args[0]} attempt {job.retry} failed: {e}") + q.append(blobjob( + _rb(*job.args), + job.args, + {}, + job.retry + 1, + )) + + with logging_redirect_tqdm(), log.suppress_stdout(): #log.logger.addHandler(log.OutputHandler(tqdm.write)) for blob_pair in blobs: - for t in list(q): - if t.done(): - t.result() - q.remove(t) if len(q) >= args.workers: - # wait for workers to finish - #log.debug(f"waiting on {len(q)} workers...") - done, not_done = futures.wait(q, return_when=futures.FIRST_COMPLETED) - #log.debug(f"{len(done)} finished") - for t in done: - t.result() - q.remove(t) - - q.append(_rb(blob_pair)) + # block until slots available + done, not_done = futures.wait([j.result for j in q], return_when=futures.FIRST_COMPLETED) + done = list(done) + for job in list(q): # done jobs + if job.result in done: + collect_or_retry(job) + done.remove(job.result) + assert len(done) == 0 + + jargs = [*blob_pair] + q.append(blobjob(_rb(*jargs), jargs, {}, 0)) + # finish up: + for job in list(q): + collect_or_retry(job) except KeyboardInterrupt as e: log.warn(f"Caught KeyboardInterrupt, stopping ...") tr.print_diff() From c87e11ff84d95957ec3bea7da75c89417d39f81f Mon Sep 17 00:00:00 2001 From: Ben Klein Date: Thu, 13 Apr 2023 20:11:07 -0400 Subject: [PATCH 23/29] handling lock failures --- wsyntree/wrap_tree_sitter.py | 26 +++++++++---------- .../commands/woc/nhv1_blobfuncs_v0.py | 22 +++++++++++++--- 2 files changed, 32 insertions(+), 16 deletions(-) diff --git a/wsyntree/wrap_tree_sitter.py b/wsyntree/wrap_tree_sitter.py index 66d3153..06e4a8b 100644 --- a/wsyntree/wrap_tree_sitter.py +++ b/wsyntree/wrap_tree_sitter.py @@ -59,25 +59,25 @@ def _get_language_repo(self): def _get_language_library(self): try: - self.ts_lang_cache_lock.acquire(timeout=300) lib = self._get_language_cache_dir() / "language.so" - repo = self._get_language_repo() - repodir = self._get_language_repo_path() if not lib.exists(): - log.warn(f"building library for {self}, this could take a while...") - start = time.time() - Language.build_library( - str(lib.resolve()), - [repodir] - ) - log.debug(f"library build of {self} completed after {round(time.time() - start)} seconds") + repo = self._get_language_repo() + repodir = self._get_language_repo_path() + with self.ts_lang_cache_lock.acquire(timeout=600): + log.warn(f"building library for {self}, this could take a while...") + start = time.time() + Language.build_library( + str(lib.resolve()), + [repodir] + ) + log.debug(f"library build of {self} completed after {round(time.time() - start)} seconds") return lib except filelock.Timeout as e: - log.error(f"Failed to acquire lock on TSABL {self}") + log.error(f"Failed to acquire lock on TSABL {self} (needed to build language lib)") log.debug(f"lock object is {self.ts_lang_cache_lock}") raise e - finally: - self.ts_lang_cache_lock.release() + #finally: + # self.ts_lang_cache_lock.release() def _get_ts_language(self): if self.ts_language is not None: diff --git a/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py b/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py index 1011a9d..cc0de5f 100644 --- a/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py +++ b/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py @@ -39,6 +39,8 @@ assert getValuesScript.exists(), f"Expected getValues at {getValuesScript}" output_root_dir = Path("/da7_data/WorldSyntaxTree/nhv1/blobfuncs_v0") assert output_root_dir.is_dir(), f"Output directory does not exist." +errors_dir = output_root_dir / "errors" +errors_dir.mkdir(exist_ok=True) FUNCDEF_TYPES = ( "function_definition", @@ -147,7 +149,7 @@ def _rb(*a, **kwa): return run_blob(*a, **kwa) #return None except wsyntree.exceptions.RootTreeSitterNodeIsError as e: - log.error(f"{e}") + log.debug(f"skip {a[0]}: {e}") return (str(e), traceback.format_exc()) except Exception as e: log.trace(log.error, traceback.format_exc()) @@ -156,6 +158,17 @@ def _rb(*a, **kwa): blobjob = namedtuple("BlobJob", ["result", "args", "kwargs", "retry"]) +def record_failure(job, e): + fail_file = (errors_dir / f"{job.args[0]}.txt") + with fail_file.open("at") as f: + f.write(f"JOB FAILURE REPORT {job.args[0]}:\n") + f.write(f"Current traceback:\n") + f.write(traceback.format_exc()) + f.write("\n") + f.write(str(e)) + log.warn(f"Wrote failure report to {fail_file}") + return fail_file + if __name__ == "__main__": parser = argparse.ArgumentParser("WST Collector NHV1 BlobFuncs v0") parser.add_argument("-w", "--workers", type=int, default=4) @@ -177,6 +190,9 @@ def collect_or_retry(job: blobjob): try: job.result.result() except pebble.common.ProcessExpired as e: + if job.retry > 5: + record_failure(job, e) + return log.error(f"{job.args[0]} attempt {job.retry} failed: {e}") q.append(blobjob( _rb(*job.args), @@ -206,6 +222,6 @@ def collect_or_retry(job: blobjob): except KeyboardInterrupt as e: log.warn(f"Caught KeyboardInterrupt, stopping ...") tr.print_diff() - for t in q: - t.cancel() + for j in q: + j.result.cancel() tr.print_diff() From 7ae9621431ac306d4bcb58f0398e5bac2a8fd751 Mon Sep 17 00:00:00 2001 From: Ben Klein Date: Thu, 13 Apr 2023 22:01:30 -0400 Subject: [PATCH 24/29] fix order of lock and lib check --- wsyntree/wrap_tree_sitter.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/wsyntree/wrap_tree_sitter.py b/wsyntree/wrap_tree_sitter.py index 06e4a8b..55f902b 100644 --- a/wsyntree/wrap_tree_sitter.py +++ b/wsyntree/wrap_tree_sitter.py @@ -60,10 +60,10 @@ def _get_language_repo(self): def _get_language_library(self): try: lib = self._get_language_cache_dir() / "language.so" - if not lib.exists(): - repo = self._get_language_repo() - repodir = self._get_language_repo_path() - with self.ts_lang_cache_lock.acquire(timeout=600): + repo = self._get_language_repo() + repodir = self._get_language_repo_path() + with self.ts_lang_cache_lock.acquire(timeout=600): + if not lib.exists(): log.warn(f"building library for {self}, this could take a while...") start = time.time() Language.build_library( From a13b0d6faedbf631f2587689d0bd8153cca4cd3c Mon Sep 17 00:00:00 2001 From: Ben Klein Date: Fri, 14 Apr 2023 01:51:15 -0400 Subject: [PATCH 25/29] increase to 64 processes --- .gitignore | 3 +++ woc-support/docker-compose.yml | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index b6e4761..b10b0bd 100644 --- a/.gitignore +++ b/.gitignore @@ -127,3 +127,6 @@ dmypy.json # Pyre type checker .pyre/ + +# +coredumps/ diff --git a/woc-support/docker-compose.yml b/woc-support/docker-compose.yml index e604f89..db28d43 100644 --- a/woc-support/docker-compose.yml +++ b/woc-support/docker-compose.yml @@ -26,4 +26,4 @@ services: - "/da7_data:/da7_data:ro" - "/da7_data/WorldSyntaxTree:/da7_data/WorldSyntaxTree" # RW: output in here - "/home/bklein3:/home/bklein3" - mem_limit: 16G + mem_limit: 64G From bcd8d9cdffc90e114f2b7f3aca7a5ce771e41869 Mon Sep 17 00:00:00 2001 From: Ben Klein Date: Fri, 14 Apr 2023 03:30:57 -0400 Subject: [PATCH 26/29] save more errors --- .../commands/woc/nhv1_blobfuncs_v0.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py b/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py index cc0de5f..70b74ba 100644 --- a/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py +++ b/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py @@ -90,10 +90,17 @@ def output_path_for_blob(b): outdir.mkdir(parents=True, exist_ok=True) return outdir / f"{b}.jsonl" +def error_path_for_blob(b): + d = errors_dir / b[0:2] / b[2:4] + return d / f"{b}.txt" + def blob_not_yet_processed(b): p = output_path_for_blob(b) if p.is_file(): return False + fail_file = error_path_for_blob(b) + if fail_file.is_file(): + return False return True def run_blob(blob, content, filenames): @@ -150,7 +157,7 @@ def _rb(*a, **kwa): #return None except wsyntree.exceptions.RootTreeSitterNodeIsError as e: log.debug(f"skip {a[0]}: {e}") - return (str(e), traceback.format_exc()) + raise e except Exception as e: log.trace(log.error, traceback.format_exc()) log.error(f"{e}") @@ -159,13 +166,15 @@ def _rb(*a, **kwa): blobjob = namedtuple("BlobJob", ["result", "args", "kwargs", "retry"]) def record_failure(job, e): - fail_file = (errors_dir / f"{job.args[0]}.txt") + fail_file = error_path_for_blob(job.args[0]) + fail_file.parent.mkdir(parents=True, exist_ok=True) with fail_file.open("at") as f: f.write(f"JOB FAILURE REPORT {job.args[0]}:\n") f.write(f"Current traceback:\n") f.write(traceback.format_exc()) f.write("\n") f.write(str(e)) + f.write("\n") log.warn(f"Wrote failure report to {fail_file}") return fail_file @@ -189,6 +198,10 @@ def collect_or_retry(job: blobjob): q.remove(job) try: job.result.result() + except wsyntree.exceptions.RootTreeSitterNodeIsError as e: + # no retry + log.warn(f"{job.args[0]}: unable to parse: {e}") + record_failure(job, e) except pebble.common.ProcessExpired as e: if job.retry > 5: record_failure(job, e) From 6ca75a85cd227337a1bd1967e707c7e1f6db65e4 Mon Sep 17 00:00:00 2001 From: Ben Klein Date: Sun, 23 Apr 2023 09:14:00 -0400 Subject: [PATCH 27/29] now using redis to cache BlobStatus NFS is stupidly slow, like 16k+/s on ishia, ~200/s max on da5 --- requirements.txt | 1 + woc-support/Dockerfile | 7 ++ woc-support/docker-compose.yml | 11 +++ .../commands/woc/nhv1_blobfuncs_v0.py | 83 ++++++++++++------- wsyntree_collector/wociterators.py | 80 ++++++++++++++++-- 5 files changed, 147 insertions(+), 35 deletions(-) diff --git a/requirements.txt b/requirements.txt index 683864f..420362b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,3 +13,4 @@ psutil bpython orjson>=3.0.0 networkx[default] +redis[hiredis] diff --git a/woc-support/Dockerfile b/woc-support/Dockerfile index 67846cd..d071433 100644 --- a/woc-support/Dockerfile +++ b/woc-support/Dockerfile @@ -1,11 +1,18 @@ FROM robobenklein/home:latest USER root +# enable sshd +RUN rm -f /etc/service/sshd/down +RUN /etc/my_init.d/00_regen_ssh_host_keys.sh RUN install_clean python3-dev libghc-bzlib-dev ARG UNAME=testuser ARG UID=1000 ARG GID=1000 RUN groupadd -g $GID -o $UNAME RUN useradd -m -u $UID -g $GID -G sudo -o -s /bin/zsh $UNAME +RUN usermod -aG docker_env $UNAME +RUN chown -R :docker_env /etc/container_environment /etc/container_environment.sh /etc/container_environment.json +RUN chmod -R g+rwX /etc/container_environment /etc/container_environment.sh /etc/container_environment.json USER $UNAME +CMD sudo /sbin/my_init WORKDIR /home/$UNAME diff --git a/woc-support/docker-compose.yml b/woc-support/docker-compose.yml index db28d43..cf3bf9d 100644 --- a/woc-support/docker-compose.yml +++ b/woc-support/docker-compose.yml @@ -12,6 +12,7 @@ services: UID: ${UID} #image: robobenklein/home:latest restart: "no" + command: "sudo /sbin/my_init" environment: - OSCAR_TEST=1 volumes: @@ -26,4 +27,14 @@ services: - "/da7_data:/da7_data:ro" - "/da7_data/WorldSyntaxTree:/da7_data/WorldSyntaxTree" # RW: output in here - "/home/bklein3:/home/bklein3" + mem_limit: 512G + + wst-redis: + image: redis:7-alpine mem_limit: 64G + + wst-telegraf: + image: telegraf:latest + volumes: + - "./telegraf.conf:/etc/telegraf/telegraf.conf:ro" + hostname: ${HOSTNAME:-nohost}-wst-telegraf diff --git a/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py b/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py index 70b74ba..af9e453 100644 --- a/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py +++ b/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py @@ -7,6 +7,8 @@ import traceback import time import tempfile +import enum +import os from typing import List, Optional from pathlib import Path, PurePath, PurePosixPath from collections import Counter, namedtuple, deque @@ -21,6 +23,7 @@ from tqdm import tqdm from tqdm.contrib.logging import logging_redirect_tqdm from pympler import tracker +import redis import wsyntree.exceptions from wsyntree import log @@ -30,11 +33,13 @@ from wsyntree.hashtypes import WSTNodeHashV1 from wsyntree_collector.file.parse_file_treesitter import build_networkx_graph -from wsyntree_collector.wociterators import all_blobs as all_blobs_iterator +from wsyntree_collector.wociterators import all_blobs as all_blobs_iterator, BlobStatus #import oscar +tqdm_smoothing_factor = 0.01 + getValuesScript = Path("~/lookup/getValues").expanduser() assert getValuesScript.exists(), f"Expected getValues at {getValuesScript}" output_root_dir = Path("/da7_data/WorldSyntaxTree/nhv1/blobfuncs_v0") @@ -50,26 +55,12 @@ "function_item", # rust ) -#def get_filenames_for_blob(b: str) -> List[PurePosixPath]: -# """Inefficient version: calls lookup b2f because I am lazy to rewrite it in python""" -# c = subprocess.run( -# ["bash", getValuesScript, "b2f"], -# input=f"{b}\n", -# text=True, -# capture_output=True, -# ) -# if c.returncode != 0: -# log.trace(log.warn, c.stdout) -# log.trace(log.warn, c.stderr) -# c.check_returncode() -# lines = c.stdout.strip().split("\n") -# if len(lines) != 1: -# log.warn(f"getValues gave {len(lines)} lines for {b}") -# filenames = [] -# for line in lines: -# for fname in line.split(";")[1:]: -# filenames.append(PurePosixPath(fname)) -# return filenames +redis_pool = redis.ConnectionPool( + host=os.environ.get("WST_REDIS_HOST", "wst-redis"), + port=6379, db=0, +) +redis_client = redis.Redis(connection_pool=redis_pool) +redis_decoded = redis.Redis(connection_pool=redis_pool, decode_responses=True) def wst_supports_fnames(fnames): filenames = fnames @@ -95,14 +86,26 @@ def error_path_for_blob(b): return d / f"{b}.txt" def blob_not_yet_processed(b): - p = output_path_for_blob(b) - if p.is_file(): - return False - fail_file = error_path_for_blob(b) - if fail_file.is_file(): + if redis_decoded.exists(b): return False + # these should be set in redis by the --prescan argument: + # no longer need to stat every single file over NFS + #p = output_path_for_blob(b) + #if p.is_file(): + # return False + #fail_file = error_path_for_blob(b) + #if fail_file.is_file(): + # return False return True +@tenacity.retry( + # OSError: [Errno 14] Bad address + # not sure what is causing this, but likely related to poor/outdated NFS + # or unreliable UTK network + retry=tenacity.retry_if_exception_type(OSError), + stop=tenacity.stop_after_attempt(3), + reraise=True, +) def run_blob(blob, content, filenames): """""" #blob, content, filenames = blob_pair @@ -142,6 +145,7 @@ def run_blob(blob, content, filenames): }, option=orjson.OPT_APPEND_NEWLINE)) log.debug(f"{blob}: finished") + redis_decoded.set(blob, BlobStatus.done) return (blob, len(hashed_nodes), outfile) @concurrent.process(name="wst-nhv1-blobfuncs") @@ -166,33 +170,52 @@ def _rb(*a, **kwa): blobjob = namedtuple("BlobJob", ["result", "args", "kwargs", "retry"]) def record_failure(job, e): - fail_file = error_path_for_blob(job.args[0]) + blob = job.args[0] + fail_file = error_path_for_blob(blob) fail_file.parent.mkdir(parents=True, exist_ok=True) with fail_file.open("at") as f: - f.write(f"JOB FAILURE REPORT {job.args[0]}:\n") + f.write(f"JOB FAILURE REPORT {blob}:\n") f.write(f"Current traceback:\n") f.write(traceback.format_exc()) f.write("\n") f.write(str(e)) f.write("\n") log.warn(f"Wrote failure report to {fail_file}") + redis_decoded.set(blob, BlobStatus.errored) return fail_file if __name__ == "__main__": parser = argparse.ArgumentParser("WST Collector NHV1 BlobFuncs v0") parser.add_argument("-w", "--workers", type=int, default=4) parser.add_argument("-v", "--verbose", action="store_true") + parser.add_argument("--prescan", help="Scan output dir before attempting new blobs", action="store_true") args = parser.parse_args() if args.verbose: log.setLevel(log.DEBUG) + if args.prescan: + log.info(f"running prescan ...") + for blobfile in tqdm( + output_root_dir.glob("*/*/*.jsonl"), + desc="scanning existing outputs", unit="blobs", + smoothing=tqdm_smoothing_factor, + ): + redis_decoded.set(blobfile.stem, BlobStatus.done) + for errfile in tqdm( + errors_dir.glob("*/*/*.txt"), desc="scanning errors", + smoothing=tqdm_smoothing_factor, + ): + redis_decoded.set(errfile.stem, BlobStatus.error) + log.info(f"pool workers {args.workers}") q = deque(maxlen=args.workers) try: blobs = tqdm(all_blobs_iterator( blobfilter=blob_not_yet_processed, - filefilter=lambda f: wst_supports_fnames(f) - )) + filefilter=lambda f: wst_supports_fnames(f), + tqdm_position=1, + redis_cl=redis_decoded, + ), desc="blobs processed", unit="blobs", smoothing=tqdm_smoothing_factor, unit_scale=True) tr = tracker.SummaryTracker() def collect_or_retry(job: blobjob): q.remove(job) diff --git a/wsyntree_collector/wociterators.py b/wsyntree_collector/wociterators.py index bfbf673..e12c9bf 100644 --- a/wsyntree_collector/wociterators.py +++ b/wsyntree_collector/wociterators.py @@ -1,9 +1,12 @@ import sys import os import gzip +import enum from pathlib import Path, PurePosixPath from collections import namedtuple +from tqdm import tqdm + from wsyntree import log import oscar @@ -14,9 +17,21 @@ blobresult = namedtuple("BlobResult", ["hash", "content", "filenames"]) -def binary_line_iterator(open_file): +KIBIBYTE = 2**10 +MEBIBYTE = 2**20 +GIBIBYTE = 2**30 + +class BlobStatus(str, enum.Enum): # enum.StrEnum added in 3.11 + done = "done" + not_supported = "not_supported" + too_large = "too_large" + errored = "errored" + skipped = "skipped" + +def binary_line_iterator(open_file, max_buffer_size=GIBIBYTE): """only yields lines that are decodable""" - chunksize = 4096 + #chunksize = 4096 + chunksize = 4*MEBIBYTE lineno = 0 buffer = bytearray() for block in iter(lambda: open_file.read(chunksize), b""): @@ -28,14 +43,36 @@ def binary_line_iterator(open_file): yield line.decode() except UnicodeDecodeError: log.warn(f"cannot decode line {lineno} of {open_file.name}") + if len(buffer) >= max_buffer_size: + # skip this line, it's too large + log.warn(f"line {lineno} greater than {max_buffer_size} bytes long, skipping to next line...") + buffer = bytearray() + for block in iter(lambda: open_file.read(chunksize), b""): + if b"\n" in block: + # found next line + _, new_line_start = block.split(b"\n", 1) + lineno += 1 + buffer.extend(new_line_start) + break + else: # no newline before end of file: + log.warn(f"line {lineno} was last in file") + # loop will exit naturally if buffer: for line in buffer.split(b"\n"): try: yield line.decode() except UnicodeDecodeError: log.warn(f"cannot decode line {lineno} of {open_file.name}") + lineno += 1 # does this go before or after? not too important? -def iter_blobs(section, blobfilter=lambda b: True, filefilter=lambda fnames: True): +def iter_blobs( + section, + blobfilter=lambda b: True, + filefilter=lambda fnames: True, + max_blob_size=(MEBIBYTE * 32), + tqdm_position=None, + redis_cl=None, + ): """ blobfilter(str) -> bool filefilter(List[PurePosixPath]) -> bool @@ -47,22 +84,55 @@ def iter_blobs(section, blobfilter=lambda b: True, filefilter=lambda fnames: Tru # open the index in binary as well, encoding not specified with gzip.open(p_idx, "rb") as idx_f, p_bin.open("rb") as bin_f: - for idx_line in binary_line_iterator(idx_f): + if tqdm_position is None: + bin_line_it = binary_line_iterator(idx_f) + else: + bin_line_it = tqdm( + binary_line_iterator(idx_f), + position=tqdm_position, + desc="index lines", + unit="lines", + unit_scale=True, + smoothing=0.01, + ) + for idx_line in bin_line_it: fields = idx_line.rstrip().split(";") _hash = fields[3] filenames = tuple(PurePosixPath(x) for x in fields[4:]) offset = int(fields[1]) length = int(fields[2]) + if length >= max_blob_size: + log.warn(f"compressed blob too large: skip {_hash} of lzf size {length}") + if redis_cl: + redis_cl.set(_hash, BlobStatus.too_large) + continue if not blobfilter(_hash): continue if not filefilter(filenames): + if redis_cl: + redis_cl.set(_hash, BlobStatus.not_supported) continue bin_f.seek(offset, os.SEEK_SET) - val = oscar.decomp(bin_f.read(length)) + lzf_data = bin_f.read(length) + if not lzf_data: + log.warn(f"no data in blob: skip {_hash}") + continue + if lzf_data[0] == 0: + # data not compressed, will be handled by oscar.decomp + pass + else: + header_size, uncompressed_content_length = oscar.lzf_length(lzf_data) + if uncompressed_content_length >= max_blob_size: + log.warn(f"uncompressed blob too large: skip {_hash} of size {uncompressed_content_length}") + if redis_cl: + redis_cl.set(_hash, BlobStatus.too_large) + continue + val = oscar.decomp(lzf_data) yield blobresult(_hash, val, filenames) def all_blobs(**kwargs): for sec in all_blob_sections: + log.info(f"iter all blob sections: start section {sec}") for i in iter_blobs(sec, **kwargs): yield i From 642c5da13052ef85f197b009225d552010743227 Mon Sep 17 00:00:00 2001 From: Ben Klein Date: Sun, 23 Apr 2023 23:33:50 -0400 Subject: [PATCH 28/29] counter for BlobStatus from redis --- .../commands/woc/count_redis_stats.py | 81 +++++++++++++++++++ 1 file changed, 81 insertions(+) create mode 100644 wsyntree_collector/commands/woc/count_redis_stats.py diff --git a/wsyntree_collector/commands/woc/count_redis_stats.py b/wsyntree_collector/commands/woc/count_redis_stats.py new file mode 100644 index 0000000..54ae8f7 --- /dev/null +++ b/wsyntree_collector/commands/woc/count_redis_stats.py @@ -0,0 +1,81 @@ +""" +This "command" is only to be run as a job on the DA cluster, not used as a subcommand. +""" + +import argparse +import subprocess +import traceback +import time +import tempfile +import enum +import os +from typing import List, Optional +from pathlib import Path, PurePath, PurePosixPath +from collections import Counter, namedtuple, deque +from concurrent import futures +from itertools import islice + +import networkx as nx +from networkx.readwrite import json_graph +import orjson +import pebble +import tenacity +from pebble import concurrent +from tqdm import tqdm +from tqdm.contrib.logging import logging_redirect_tqdm +from pympler import tracker +import redis + +import wsyntree.exceptions +from wsyntree import log +from wsyntree.wrap_tree_sitter import ( + TreeSitterAutoBuiltLanguage, TreeSitterCursorIterator, get_TSABL_for_file, +) +from wsyntree.hashtypes import WSTNodeHashV1 + +from wsyntree_collector.file.parse_file_treesitter import build_networkx_graph +from wsyntree_collector.wociterators import all_blobs as all_blobs_iterator, BlobStatus + +#import oscar + + +tqdm_smoothing_factor = 0.01 + +redis_pool = redis.ConnectionPool( + host=os.environ.get("WST_REDIS_HOST", "wst-redis"), + port=6379, db=0, +) +redis_client = redis.Redis(connection_pool=redis_pool) +redis_decoded = redis.Redis(connection_pool=redis_pool, decode_responses=True) + +def batched(iterable, n): + "Batch data into lists of length n. The last batch may be shorter." + # batched('ABCDEFG', 3) --> ABC DEF G + it = iter(iterable) + while True: + batch = list(islice(it, n)) + if not batch: + return + yield batch + +if __name__ == "__main__": + parser = argparse.ArgumentParser("WST Collector NHV1 BlobFuncs v0") + # parser.add_argument("-w", "--workers", type=int, default=4) + parser.add_argument("-v", "--verbose", action="store_true") + # parser.add_argument("--prescan", help="Scan output dir before attempting new blobs", action="store_true") + args = parser.parse_args() + if args.verbose: + log.setLevel(log.DEBUG) + + counter = Counter() + try: + with logging_redirect_tqdm(), log.suppress_stdout(): + key_it = tqdm(redis_decoded.scan_iter(count=5000), unit_scale=True) + for batch in batched(key_it, 5000): + #log.debug(f"batch size {len(batch)}") + vals = redis_decoded.mget(batch) + counter.update(vals) + except KeyboardInterrupt as e: + log.warn(f"Caught KeyboardInterrupt, stopping ...") + for m in counter.most_common(): + print(m) From 4132d2c6547a05296f26034581592e85e5821bf7 Mon Sep 17 00:00:00 2001 From: Ben Klein Date: Mon, 24 Apr 2023 04:34:12 -0400 Subject: [PATCH 29/29] 1hr timeout, optimize initial line iter --- wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py | 5 ++++- wsyntree_collector/wociterators.py | 11 ++++++----- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py b/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py index af9e453..cfd22b2 100644 --- a/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py +++ b/wsyntree_collector/commands/woc/nhv1_blobfuncs_v0.py @@ -148,7 +148,7 @@ def run_blob(blob, content, filenames): redis_decoded.set(blob, BlobStatus.done) return (blob, len(hashed_nodes), outfile) -@concurrent.process(name="wst-nhv1-blobfuncs") +@concurrent.process(name="wst-nhv1-blobfuncs", timeout=60*60) #@concurrent.thread(name="wst-nhv1-blobfuncs") # @tenacity.retry( # retry here is not working: the C-level 'munmap' error exits the whole python process # retry=tenacity.retry_if_exception_type(pebble.common.ProcessExpired), @@ -236,6 +236,9 @@ def collect_or_retry(job: blobjob): {}, job.retry + 1, )) + except concurrent.futures.TimeoutError as e: + log.error(f"{job.args[0]} process timed out") + record_failure(job, e) with logging_redirect_tqdm(), log.suppress_stdout(): #log.logger.addHandler(log.OutputHandler(tqdm.write)) diff --git a/wsyntree_collector/wociterators.py b/wsyntree_collector/wociterators.py index e12c9bf..3a43eb8 100644 --- a/wsyntree_collector/wociterators.py +++ b/wsyntree_collector/wociterators.py @@ -30,8 +30,9 @@ class BlobStatus(str, enum.Enum): # enum.StrEnum added in 3.11 def binary_line_iterator(open_file, max_buffer_size=GIBIBYTE): """only yields lines that are decodable""" - #chunksize = 4096 - chunksize = 4*MEBIBYTE + # magic performance number: wanting the average buffer.split() size + # idxf_1: 10849129529/97563749 = 111.2 avg bytes per line + chunksize = 256 lineno = 0 buffer = bytearray() for block in iter(lambda: open_file.read(chunksize), b""): @@ -98,7 +99,8 @@ def iter_blobs( for idx_line in bin_line_it: fields = idx_line.rstrip().split(";") _hash = fields[3] - filenames = tuple(PurePosixPath(x) for x in fields[4:]) + if not blobfilter(_hash): + continue offset = int(fields[1]) length = int(fields[2]) if length >= max_blob_size: @@ -106,8 +108,7 @@ def iter_blobs( if redis_cl: redis_cl.set(_hash, BlobStatus.too_large) continue - if not blobfilter(_hash): - continue + filenames = tuple(PurePosixPath(x) for x in fields[4:]) if not filefilter(filenames): if redis_cl: redis_cl.set(_hash, BlobStatus.not_supported)