diff --git a/docs/how-to-guides.md b/docs/how-to-guides.md index 8fa457e..bf441f0 100644 --- a/docs/how-to-guides.md +++ b/docs/how-to-guides.md @@ -12,7 +12,7 @@ scheme = Scheme.from_file("./primer.bed") scheme = Scheme.from_str('# header!!!\nMN908947.3\t47\t78\tSARS-CoV-2_1_LEFT_1\t1\t+\tCTCTTGTAGATCTGTTCTCTAAACGAACTTT\nMN908947.3\t419\t447\tSARS-CoV-2_1_RIGHT_1\t1\t-\tAAAACGCCTTTTTCAACTTCTACTAAGC\n') # Headers -print(scheme.header) +print(scheme.headers) ['# header!!!'] # Bedlines @@ -58,10 +58,14 @@ print(bl.primername) # change the strand bl.strand = "-" +# See Error +ValueError: The new stand (-) is incompatible with current primer_class (LEFT). Please use method 'force_change' to update both. +# Use correct +bl.force_change("RIGHT", "-") print(bl.primername) 'new-amp-name_1_RIGHT_1' -# this also works in reverse +# this also works in reverse (without force_change) bl.primername = "final_10_LEFT_alt1" print(bl.amplicon_prefix, bl.amplicon_number, bl.strand, bl.primer_suffix, sep= " | " ) "final | 10 | + | alt1" @@ -218,4 +222,36 @@ group_by_class(bedlines) # For each amplicon. It will return ([f bedlines], [r bedlines]) group_primer_pairs(bedlines) [([], [, ]), ([], [])] -``` \ No newline at end of file +``` + +### Export as CSV + +While not a typical use case there will be times when primer.bed files need to be parsed by system out of our control, for example LIMs or Liquid handers. Therefore, all data contained in the bedfile can be exported to a CSV. + +A header row is written by default, containing all bedline information + parsed and separated Attributes. + +``` +# gc=fractiongc +MN908947.3 100 131 example_1_LEFT_1 1 + CTCTTGTAGATCTGTTCTCTAAACGAACTTT pw=1.4;gc=0.35 +MN908947.3 419 447 example_1_RIGHT_1 1 - AAAACGCCTTTTTCAACTTCTACTAAGC pw=1.4;gc=0.36 +MN908947.3 344 366 example_2_LEFT_1 2 + TCGTACGTGGCTTTGGAGACTC pw=1.0;gc=0.55 +MN908947.3 707 732 example_2_RIGHT_1 2 - TCTTCATAAGGATCAGTGCCAAGCT gc=0.44 +``` + +```python +print(scheme.to_delim_str(use_header_aliases=True)) +``` +| chrom | start | end | primername | pool | strand | sequence | amplicon_prefix | amplicon_number | primer_class_str | primer_suffix | pw | fractiongc | +|-------|-------|-----|------------|------|--------|----------|-----------------|-----------------|------------------|---------------|----|----| +| MN908947.3 | 100 | 131 | example_1_LEFT_1 | 1 | + | CTCTTGTAGATCTGTTCTCTAAACGAACTTT | example | 1 | LEFT | 1 | 1.4 | 0.35 | +| MN908947.3 | 419 | 447 | example_1_RIGHT_1 | 1 | - | AAAACGCCTTTTTCAACTTCTACTAAGC | example | 1 | RIGHT | 1 | 1.4 | 0.36 | +| MN908947.3 | 344 | 366 | example_2_LEFT_1 | 2 | + | TCGTACGTGGCTTTGGAGACTC | example | 2 | LEFT | 1 | 1.0 | 0.55 | +| MN908947.3 | 707 | 732 | example_2_RIGHT_1 | 2 | - | TCTTCATAAGGATCAGTGCCAAGCT | example | 2 | RIGHT | 1 | | 0.44 | + +Key points: + +- The table has consistent shape (an empty `pw` was added for `example_2_RIGHT_1`) + +- Header line has been written with all the primer attributes parsed into columns + +- Using `use_header_aliases=True` means that `gc` has been parsed into `fractiongc` due to the `# gc=fractiongc` comment line \ No newline at end of file diff --git a/primalbedtools/amplicons.py b/primalbedtools/amplicons.py index 15812f0..c538101 100644 --- a/primalbedtools/amplicons.py +++ b/primalbedtools/amplicons.py @@ -1,6 +1,6 @@ from typing import Optional -from primalbedtools.bedfiles import BedLine, PrimerClass, group_amplicons +from primalbedtools.bedfiles import BedLine, BedLineParser, PrimerClass, group_amplicons class Amplicon: @@ -87,32 +87,39 @@ def __init__( chroms = set([bedline.chrom for bedline in all_lines]) if len(chroms) != 1: raise ValueError( - f"All bedlines must be on the same chromosome ({','.join(chroms)})" + f"Failed to create amplicon as provided bedlines are on different chromosomes ({','.join(map(str, chroms))}):\n\n" + + BedLineParser.to_str(bedlines=all_lines, headers=None) ) self.chrom = chroms.pop() - # Check all pools are the same - pools = set([bedline.pool for bedline in all_lines]) - if len(pools) != 1: - raise ValueError( - f"All bedlines must be in the same pool ({','.join(map(str, pools))})" - ) - self.pool = pools.pop() + # Check all amplicon numbers are the same amplicon_numbers = set([bedline.amplicon_number for bedline in all_lines]) if len(amplicon_numbers) != 1: raise ValueError( - f"All bedlines must be the same amplicon ({','.join(map(str, amplicon_numbers))})" + f"Failed to create amplicon as provided bedlines have different amplicon numbers ({','.join(map(str, amplicon_numbers))}):\n\n" + + BedLineParser.to_str(bedlines=all_lines, headers=None) ) self.amplicon_number = amplicon_numbers.pop() + # Check all pools are the same + pools = set([bedline.pool for bedline in all_lines]) + if len(pools) != 1: + raise ValueError( + f"Failed to create amplicon ({self.prefix}_{self.amplicon_number}) as provided bedlines have different pools ({','.join(map(str, pools))}):\n\n" + + BedLineParser.to_str(bedlines=all_lines, headers=None) + ) + self.pool = pools.pop() + # Check both forward and reverse primers are present if not self.left: raise ValueError( - f"No forward primers found for {self.prefix}_{self.amplicon_number}" + f"Failed to create amplicon ({self.prefix}_{self.amplicon_number}) as no forward primers found:\n\n" + + BedLineParser.to_str(bedlines=all_lines, headers=None) ) if not self.right: raise ValueError( - f"No reverse primers found for {self.prefix}_{self.amplicon_number}" + f"Failed to create amplicon ({self.prefix}_{self.amplicon_number}) as no reverse primers found:\n\n" + + BedLineParser.to_str(bedlines=all_lines, headers=None) ) def __lt__(self, other): diff --git a/primalbedtools/bedfiles.py b/primalbedtools/bedfiles.py index d9226ff..dc72738 100644 --- a/primalbedtools/bedfiles.py +++ b/primalbedtools/bedfiles.py @@ -2,6 +2,7 @@ import pathlib import re import typing +from functools import total_ordering from typing import Optional, Union from primalbedtools.utils import expand_ambiguous_bases, rc_seq, strip_all_white_space @@ -31,6 +32,14 @@ class PrimerClass(enum.Enum): PROBE = "PROBE" +# Primer class order: LEFT, PROBE, RIGHT +PRIMER_CLASS_ORDER = { + PrimerClass.LEFT: 0, + PrimerClass.PROBE: 1, + PrimerClass.RIGHT: 2, +} + + class Strand(enum.Enum): FORWARD = "+" REVERSE = "-" @@ -291,6 +300,7 @@ def validate_primer_name(primername: str) -> tuple[str, str, str, Union[str, Non return (parts[0], parts[1], parts[2], parts[3]) +@total_ordering class BedLine: """A class representing a single line in a primer.bed file. @@ -364,7 +374,7 @@ class BedLine: _sequence: str # primerAttributes - _attributes: Optional[dict[str, Union[str, float]]] + _attributes: dict[str, Union[str, float]] # primernames components _amplicon_prefix: str @@ -407,6 +417,37 @@ def __init__( f"primername ({self.primername}) implies direction ({self.primer_class_str}), which is incompatible with ({strand})" ) + def __eq__(self, other): + if not isinstance(other, BedLine): + return NotImplemented + return self.to_bed() == other.to_bed() + + def __lt__(self, other): + if not isinstance(other, BedLine): + return NotImplemented + return self._sort_key() < other._sort_key() + + def _sort_key(self): + """Return a tuple for sorting.""" + + # Primer suffix order: int, str, None + suffix = self.primer_suffix + if isinstance(suffix, int): + suffix_key = (0, suffix) + elif isinstance(suffix, str): + suffix_key = (1, suffix) + else: + suffix_key = (2, "") # None + + return ( + self.chrom, + self.amplicon_number, + PRIMER_CLASS_ORDER.get(self.primer_class, 3), + suffix_key, + self.sequence, + self.primername, + ) + @property def chrom(self): """Return the chromosome of the primer""" @@ -586,6 +627,18 @@ def strand(self): """Return the strand of the primer""" return self._strand + @property + def strand_class(self) -> Strand: + """Return the strand class of the primer""" + if self.strand == "+": + return Strand.FORWARD + elif self.strand == "-": + return Strand.REVERSE + else: + raise ValueError( + f"Unknown strand value ({self.strand}) in {self.primername}" + ) + @strand.setter def strand(self, v): new_s = validate_strand(v) @@ -624,13 +677,13 @@ def attributes( elif isinstance(v, dict): new_dict = v elif v is None: - self._attributes = None + self._attributes = {} return else: raise ValueError(f"Invalid primer attributes. Got ({v})") if new_dict is None: - self._attributes = None + self._attributes = {} return # Parse the new dict @@ -711,16 +764,14 @@ def direction_str(self) -> str: """Return 'LEFT' or 'RIGHT' based on strand""" return "LEFT" if self.strand == Strand.FORWARD.value else "RIGHT" - def to_bed(self) -> str: + def to_bed(self, ignore_attr: bool = False) -> str: """Convert the BedLine object to a BED formatted string.""" - # If a attributes is provided print. Else print empty string - attribute_str = create_primer_attributes_str(self.attributes) - if attribute_str is None: + if attribute_str is None or ignore_attr: attribute_str = "" else: - attribute_str = "\t" + attribute_str - return f"{self.chrom}\t{self.start}\t{self.end}\t{self.primername}\t{self.pool}\t{self.strand}\t{self.sequence}{attribute_str}\n" + attribute_str = attribute_str + return f"{self.chrom}\t{self.start}\t{self.end}\t{self.primername}\t{self.pool}\t{self.strand}\t{self.sequence}\t{attribute_str}\n" def to_fasta(self, rc=False) -> str: """Convert the BedLine object to a FASTA formatted string.""" @@ -784,7 +835,11 @@ def from_str(bedfile_str: str) -> tuple[list[str], list[BedLine]]: return bedline_from_str(bedfile_str) @staticmethod - def to_str(headers: typing.Optional[list[str]], bedlines: list[BedLine]) -> str: + def to_str( + headers: typing.Optional[list[str]], + bedlines: list[BedLine], + ignore_attr: bool = False, + ) -> str: """Creates a BED file string from headers and BedLine objects. Combines header lines and BedLine objects into a properly formatted @@ -804,7 +859,7 @@ def to_str(headers: typing.Optional[list[str]], bedlines: list[BedLine]) -> str: >>> headers = ["Track name=primers"] >>> bed_string = BedLineParser.to_str(headers, bedlines) """ - return create_bedfile_str(headers, bedlines) + return create_bedfile_str(headers, bedlines, ignore_attr) @staticmethod def to_file( @@ -887,7 +942,7 @@ def bedline_from_str(bedline_str: str) -> tuple[list[str], list[BedLine]]: if line.startswith("#"): headers.append(line) elif line: - bedlines.append(create_bedline(line.split("\t"))) + bedlines.append(create_bedline(line.split())) return headers, bedlines @@ -901,7 +956,9 @@ def read_bedfile( def create_bedfile_str( - headers: typing.Optional[list[str]], bedlines: list[BedLine] + headers: typing.Optional[list[str]], + bedlines: list[BedLine], + ignore_attr: bool = False, ) -> str: bedfile_str: list[str] = [] if headers: @@ -912,7 +969,7 @@ def create_bedfile_str( bedfile_str.append(header + "\n") # Add bedlines for bedline in bedlines: - bedfile_str.append(bedline.to_bed()) + bedfile_str.append(bedline.to_bed(ignore_attr)) return "".join(bedfile_str) @@ -921,9 +978,10 @@ def write_bedfile( bedfile: typing.Union[str, pathlib.Path], headers: typing.Optional[list[str]], bedlines: list[BedLine], + ignore_attr: bool = False, ): with open(bedfile, "w") as f: - f.write(create_bedfile_str(headers, bedlines)) + f.write(create_bedfile_str(headers, bedlines, ignore_attr)) def group_by_chrom(list_bedlines: list[BedLine]) -> dict[str, list[BedLine]]: @@ -1170,42 +1228,61 @@ def downgrade_primernames(bedlines: list[BedLine]) -> list[BedLine]: return bedlines -def sort_bedlines(bedlines: list[BedLine]) -> list[BedLine]: - """ - Sorts bedlines by chrom, start, end, primername. +def sort_bedlines(bedlines: list[BedLine], by_pos: bool = True) -> list[BedLine]: + """Sorts the bedlines by chrom, amplicon number, class, and sequence. + + Converts the bedlines into amplicons and sorts by chromosome and left primer position (or amplicon number). + Within amplicons bedlines are sorted by PrimerClass (LEFT, PROBE, RIGHT), then PrimerSuffix or position (start, end) then sequence. + + + Groups BedLine objects into primer pairs, sorts those pairs by chromosome, left primer position, then returns a flattened list of the sorted BedLine objects. + + Args: + bedlines: A list of BedLine objects to sort. + by_pos: bool. Sorts the Bedlines by chrom + + Returns: + list[BedLine]: A new list containing the sorted original BedLine objects. + + Examples: + >>> from primalbedtools.bedfiles import BedLine, BedFileModifier + >>> bedlines = [BedLine(...)] # List of BedLine objects + >>> sorted_lines = BedFileModifier.sort_bedlines(bedlines) """ amplicons = group_amplicons(bedlines) - amplicons.sort( - key=lambda x: ( - x[PrimerClass.LEFT.value][0].chrom, - x[PrimerClass.LEFT.value][0].amplicon_number, - ) - ) # Uses left primers + + if by_pos: + amplicons.sort( + key=lambda x: ( + x[PrimerClass.LEFT.value][0].chrom, + x[PrimerClass.LEFT.value][0].end, + ) + ) # Uses left primers + else: + amplicons.sort( + key=lambda x: ( + x[PrimerClass.LEFT.value][0].chrom, + x[PrimerClass.LEFT.value][0].amplicon_number, + ) + ) # Uses left primers # Sorted list sorted_list = [] + # Sort bedlines within amplicons for dicts in amplicons: + bls = [] # Left primers - lp = dicts.get(PrimerClass.LEFT.value, []) - lp.sort( - key=lambda x: x.primer_suffix if x.primer_suffix is not None else x.sequence - ) - sorted_list.extend(lp) + bls.extend(dicts.get(PrimerClass.LEFT.value, [])) # Probes - pp = dicts.get(PrimerClass.PROBE.value, []) - pp.sort( - key=lambda x: x.primer_suffix if x.primer_suffix is not None else x.sequence - ) - sorted_list.extend(pp) + bls.extend(dicts.get(PrimerClass.PROBE.value, [])) # Right Primers - rp = dicts.get(PrimerClass.RIGHT.value, []) - rp.sort( - key=lambda x: x.primer_suffix if x.primer_suffix is not None else x.sequence - ) - sorted_list.extend(rp) + bls.extend(dicts.get(PrimerClass.RIGHT.value, [])) + + bls.sort() + sorted_list.extend(bls) return sorted_list @@ -1343,9 +1420,7 @@ def downgrade_primernames( return downgrade_primernames(bedlines) @staticmethod - def sort_bedlines( - bedlines: list[BedLine], - ) -> list[BedLine]: + def sort_bedlines(bedlines: list[BedLine], by_pos: bool = False) -> list[BedLine]: """Sorts the bedlines by chrom, amplicon number, class, and sequence. Groups BedLine objects into primer pairs, sorts those pairs by chromosome @@ -1353,6 +1428,7 @@ def sort_bedlines( Args: bedlines: A list of BedLine objects to sort. + by_pos: bool. Sorts the Bedlines by chrom Returns: list[BedLine]: A new list containing the sorted original BedLine objects. @@ -1362,7 +1438,7 @@ def sort_bedlines( >>> bedlines = [BedLine(...)] # List of BedLine objects >>> sorted_lines = BedFileModifier.sort_bedlines(bedlines) """ - return sort_bedlines(bedlines) + return sort_bedlines(bedlines, by_pos) @staticmethod def merge_primers( diff --git a/primalbedtools/diff.py b/primalbedtools/diff.py new file mode 100644 index 0000000..57e4032 --- /dev/null +++ b/primalbedtools/diff.py @@ -0,0 +1,186 @@ +from difflib import ndiff, unified_diff +from typing import Optional + +from primalbedtools.bedfiles import BedLine, BedLineParser + + +def create_normalised_bedfile_str( + bedlines1: list[BedLine], + bedlines2: list[BedLine], + header1: Optional[list[str]] = None, + header2: Optional[list[str]] = None, + ignore_order: bool = True, + ignore_attr: bool = False, + ignore_header: bool = False, +) -> tuple[str, str]: + """Creates normalised bedfile strings formatted for diff comparison. + + Generates string representations of two sets of bedlines, optionally normalizing + order, attributes, and headers to facilitate meaningful comparisons. + + Args: + bedlines1 (list[BedLine]): The first list of BedLine objects. + bedlines2 (list[BedLine]): The second list of BedLine objects. + header1 (Optional[list[str]] , optional): Headers for the first bedfile. Defaults to None. + header2 (Optional[list[str]] , optional): Headers for the second bedfile. Defaults to None. + ignore_order (bool, optional): If True, sorts bedlines before string generation. Defaults to True. + ignore_attr (bool, optional): If True, excludes attributes from the string representation. Defaults to False. + ignore_header (bool, optional): If True, excludes headers from the string representation. Defaults to False. + + Returns: + tuple[str, str]: A tuple containing the two normalised bedfile strings. + """ + if ignore_header: + header1 = None + header2 = None + + if ignore_order: + bed_str1 = BedLineParser.to_str(header1, sorted(bedlines1), ignore_attr) + bed_str2 = BedLineParser.to_str(header2, sorted(bedlines2), ignore_attr) + else: + bed_str1 = BedLineParser.to_str(header1, bedlines1, ignore_attr) + bed_str2 = BedLineParser.to_str(header2, bedlines2, ignore_attr) + + return (bed_str1, bed_str2) + + +def diff_primernames( + bedlines1: list[BedLine], bedlines2: list[BedLine] +) -> tuple[set[str], set[str]]: + """Returns difference set operations on the primernames. + + Calculates the set difference of primer names between two lists of BedLines. + + Args: + bedlines1 (list[BedLine]): The first list of BedLine objects. + bedlines2 (list[BedLine]): The second list of BedLine objects. + + Returns: + tuple[set[str], set[str]]: A tuple containing two sets: + - Primer names present in bedlines1 but not in bedlines2. + - Primer names present in bedlines2 but not in bedlines1. + """ + # Get primernames present in each. + primernames1 = set([bl.primername for bl in bedlines1]) + primernames2 = set([bl.primername for bl in bedlines2]) + + # Returns primernames present in X but not Y. + return primernames1.difference(primernames2), primernames2.difference(primernames1) + + +def diff_sequence( + bedlines1: list[BedLine], bedlines2: list[BedLine] +) -> tuple[set[str], set[str]]: + """Returns difference set operations on the sequences. + + Calculates the set difference of sequences between two lists of BedLines. + + Args: + bedlines1 (list[BedLine]): The first list of BedLine objects. + bedlines2 (list[BedLine]): The second list of BedLine objects. + + Returns: + tuple[set[str], set[str]]: A tuple containing two sets: + - Sequences present in bedlines1 but not in bedlines2. + - Sequences present in bedlines2 but not in bedlines1. + """ + # Get primernames present in each. + sequences1 = set([bl.sequence for bl in bedlines1]) + sequences2 = set([bl.sequence for bl in bedlines2]) + + # Returns primernames present in X but not Y. + return sequences1.difference(sequences2), sequences2.difference(sequences1) + + +def ndiff_bedlines( + bedlines1: list[BedLine], + bedlines2: list[BedLine], + header1: Optional[list[str]] = None, + header2: Optional[list[str]] = None, + ignore_order: bool = True, + ignore_attr: bool = False, + ignore_header: bool = False, + ignore_no_diff: bool = False, +): + """Generates a difference report between two sets of bedlines using difflib.ndiff. + + Compares two lists of BedLines and produces a generator of differences, similar to + the `ndiff` tool. Can optionally ignore order, attributes, and headers during comparison. + + Args: + bedlines1 (list[BedLine]): The first list of BedLine objects. + bedlines2 (list[BedLine]): The second list of BedLine objects. + header1 (Optional[list[str]] , optional): Headers for the first bedfile. Defaults to None. + header2 (Optional[list[str]] , optional): Headers for the second bedfile. Defaults to None. + ignore_order (bool, optional): If True, sorts bedlines before comparison. Defaults to True. + ignore_attr (bool, optional): If True, excludes attributes from comparison. Defaults to False. + ignore_header (bool, optional): If True, excludes headers from comparison. Defaults to False. + ignore_no_diff (bool, optional): If True, filters out lines that are identical (starting with " "). Defaults to False. + + Returns: + Iterator[str]: A generator yielding difference lines (strings). + """ + nbed_str1, nbed_str2 = create_normalised_bedfile_str( + bedlines1=bedlines1, + bedlines2=bedlines2, + header1=header1, + header2=header2, + ignore_attr=ignore_attr, + ignore_order=ignore_order, + ignore_header=ignore_header, + ) + + diff_generator = ndiff( + nbed_str1.splitlines(keepends=True), + nbed_str2.splitlines(keepends=True), + ) + if ignore_no_diff: + return (line for line in diff_generator if not line.startswith(" ")) + return diff_generator + + +def unified_diff_bedlines( + bedlines1: list[BedLine], + bedlines2: list[BedLine], + header1: Optional[list[str]] = None, + header2: Optional[list[str]] = None, + ignore_order: bool = True, + ignore_attr: bool = False, + ignore_header: bool = False, +): + """Generates a unified difference report between two sets of bedlines. + + Compares two lists of BedLines and produces a generator of differences in unified diff format. + Can optionally ignore order, attributes, and headers during comparison. + + Args: + bedlines1 (list[BedLine]): The first list of BedLine objects. + bedlines2 (list[BedLine]): The second list of BedLine objects. + header1 (Optional[list[str]] , optional): Headers for the first bedfile. Defaults to None. + header2 (Optional[list[str]] , optional): Headers for the second bedfile. Defaults to None. + ignore_order (bool, optional): If True, sorts bedlines before comparison. Defaults to True. + ignore_attr (bool, optional): If True, excludes attributes from comparison. Defaults to False. + ignore_header (bool, optional): If True, excludes headers from comparison. Defaults to False. + + Returns: + Iterator[str]: A generator yielding unified diff lines (strings). + """ + # Try and match on pn. + + nbed_str1, nbed_str2 = create_normalised_bedfile_str( + bedlines1=bedlines1, + bedlines2=bedlines2, + header1=header1, + header2=header2, + ignore_attr=ignore_attr, + ignore_order=ignore_order, + ignore_header=ignore_header, + ) + + return unified_diff( + nbed_str1.splitlines(keepends=True), + nbed_str2.splitlines(keepends=True), + n=0, + fromfile="bedlines1", + tofile="bedlines2", + ) diff --git a/primalbedtools/main.py b/primalbedtools/main.py index 8dd864b..b998074 100644 --- a/primalbedtools/main.py +++ b/primalbedtools/main.py @@ -2,12 +2,10 @@ from importlib.metadata import version from primalbedtools.amplicons import create_amplicons -from primalbedtools.bedfiles import ( - BedFileModifier, - BedLineParser, -) +from primalbedtools.bedfiles import BedFileModifier from primalbedtools.fasta import read_fasta from primalbedtools.remap import remap +from primalbedtools.scheme import Scheme from primalbedtools.validate import validate, validate_primerbed @@ -31,8 +29,16 @@ def main(): ) # Sort subcommand - sort_parser = subparsers.add_parser("sort", help="Sort BED file") + sort_parser = subparsers.add_parser( + "sort", help="Sort BED file by chrom and amplicon number" + ) sort_parser.add_argument("bed", type=str, help="Input BED file") + sort_parser.add_argument( + "-p", + "--by-pos", + action="store_true", + help="Sorts by chrom and amplicon position", + ) # Update subcommand update_parser = subparsers.add_parser( @@ -85,20 +91,38 @@ def main(): format_parser = subparsers.add_parser("format", help="Format a bed file") format_parser.add_argument("bed", type=str, help="Input BED file") + # format + csv_parser = subparsers.add_parser("csv", help="Convert bed file to CSV") + csv_parser.add_argument("bed", type=str, help="Input BED file") + csv_parser.add_argument( + "--no-headers", help="Remove the header row from the CSV", action="store_true" + ) + csv_parser.add_argument( + "--use-header-aliases", + help="Should header aliases be used.", + action="store_true", + ) + args = parser.parse_args() - # Read in the bed file - _headers, bedlines = BedLineParser.from_file(args.bed) + # Read in the scheme + scheme = Scheme.from_file(args.bed) if args.subparser_name == "remap": msa = read_fasta(args.msa) - bedlines = remap(args.from_id, args.to_id, bedlines, msa) + scheme.bedlines = remap(args.from_id, args.to_id, scheme.bedlines, msa) + print(scheme.to_str(), end="") + exit(0) elif args.subparser_name == "sort": - bedlines = BedFileModifier.sort_bedlines(bedlines) + scheme.bedlines = BedFileModifier.sort_bedlines(scheme.bedlines, args.by_pos) + print(scheme.to_str(), end="") + exit(0) elif args.subparser_name == "update": - bedlines = BedFileModifier.update_primernames(bedlines) + scheme.bedlines = BedFileModifier.update_primernames(scheme.bedlines) + print(scheme.to_str(), end="") + exit(0) elif args.subparser_name == "amplicon": - amplicons = create_amplicons(bedlines) + amplicons = create_amplicons(scheme.bedlines) # Print the amplicons for amplicon in amplicons: @@ -108,14 +132,14 @@ def main(): print(amplicon.to_amplicon_str()) exit(0) # Exit early elif args.subparser_name == "merge": - bedlines = BedFileModifier.merge_primers(bedlines) + scheme.bedlines = BedFileModifier.merge_primers(scheme.bedlines) elif args.subparser_name == "fasta": - for line in bedlines: + for line in scheme.bedlines: print(line.to_fasta(), end="") exit(0) # Exit early elif args.subparser_name == "validate_bedfile": - validate_primerbed(bedlines) + validate_primerbed(scheme.bedlines) exit(0) # early exit elif args.subparser_name == "validate": @@ -124,19 +148,27 @@ def main(): elif args.subparser_name == "downgrade": # merge primers if asked - bedlines = BedFileModifier.downgrade_primernames( - bedlines=bedlines, merge_alts=args.merge_alts + scheme.bedlines = BedFileModifier.downgrade_primernames( + bedlines=scheme.bedlines, merge_alts=args.merge_alts ) - _headers = [] # remove headers + scheme.headers = [] # remove headers + print(scheme.to_str(), end="") + exit(0) elif args.subparser_name == "format": - pass + print(scheme.to_str(), end="") + exit(0) + elif args.subparser_name == "csv": + print( + scheme.to_delim_str( + include_headers=not args.no_headers, + use_header_aliases=args.use_header_aliases, + ) + ) + exit(0) else: parser.print_help() - bedfile_str = BedLineParser.to_str(_headers, bedlines) - print(bedfile_str, end="") - if __name__ == "__main__": main() diff --git a/primalbedtools/remap.py b/primalbedtools/remap.py index e05eee4..91ad55b 100644 --- a/primalbedtools/remap.py +++ b/primalbedtools/remap.py @@ -67,7 +67,9 @@ def remap( continue msa_start = from_index_to_msa_index[bedline.start] - msa_end = from_index_to_msa_index[bedline.end] + msa_end = from_index_to_msa_index.get( + bedline.end, max(from_index_to_msa_index.keys()) + ) # Check for perfect mapping if ( diff --git a/primalbedtools/scheme.py b/primalbedtools/scheme.py index 21035a9..d28ac00 100644 --- a/primalbedtools/scheme.py +++ b/primalbedtools/scheme.py @@ -13,6 +13,20 @@ write_bedfile, ) +DEFAULT_CSV_HEADERS = [ + "chrom", + "start", + "end", + "primername", + "pool", + "strand", + "sequence", + "amplicon_prefix", + "amplicon_number", + "primer_class_str", + "primer_suffix", +] + class Scheme: """A class representing a primer scheme with headers and primer bed lines. @@ -91,13 +105,13 @@ def to_file(self, path: str): return write_bedfile(path, self.headers, self.bedlines) # modifiers - def sort_bedlines(self): + def sort_bedlines(self, by_pos: bool = False): """Sort the bedlines in canonical order in place. - Sorts bedlines by chromosome, amplicon number, direction, and primer suffix + Sorts bedlines by chromosome, amplicon number (or position), direction, and primer suffix to ensure consistent ordering across the scheme. """ - self.bedlines = sort_bedlines(self.bedlines) + self.bedlines = sort_bedlines(self.bedlines, by_pos) def merge_primers(self): """merges bedlines with the same chrom, amplicon number and class in place""" @@ -129,3 +143,57 @@ def header_dict(self) -> dict: to common header formats used in bed files. """ return parse_headers_to_dict(self.headers) + + def to_delim_str( + self, include_headers: bool = True, use_header_aliases: bool = False + ): + return to_delim_str( + self, include_headers=include_headers, use_header_aliases=use_header_aliases + ) + + +def to_delim_str( + scheme: Scheme, include_headers: bool = True, use_header_aliases: bool = False +) -> str: + """ + Turns a bedfile into a full expanded delim separated file + """ + # Define the default headers + headers = DEFAULT_CSV_HEADERS + + lines_to_write: list[str] = [] + + header_aliases = scheme.header_dict + aliases_to_attr = {v: k for k, v in header_aliases.items()} + + # Parse the attr strings add new headers + for bl in scheme.bedlines: + for k in bl.attributes.keys(): + if use_header_aliases: + k = header_aliases.get(k, k) + if k not in headers: + headers.append(k) + + # Create a csv line for each bedline + if include_headers: + lines_to_write.append(",".join(headers)) + + for bl in scheme.bedlines: + bl_csv: list[str] = [] + for h in headers: + r = None + try: + r = bl.__getattribute__(h) + except AttributeError: + # Search _attribute dict + if h in bl.attributes: + r = bl.attributes[h] + elif h in aliases_to_attr: + r = bl.attributes.get(aliases_to_attr[h]) + + bl_csv.append(str(r) if r is not None else "") + + lines_to_write.append(",".join(bl_csv)) + + # write all complete lines + return "\n".join(lines_to_write) diff --git a/pyproject.toml b/pyproject.toml index 13e18b3..9a37d23 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "primalbedtools" -version = "0.10.2" +version = "0.11.0" description = "A collection of tools for working with primer.bed files" authors = [{ name = "ChrisKent", email = "chrisgkent@gmail.com" }] requires-python = ">=3.9, <4" @@ -30,18 +30,3 @@ docs = [ [build-system] requires = ["hatchling"] build-backend = "hatchling.build" - -[tool.ruff.lint] -select = [ - # pycodestyle - "E", - # Pyflakes - "F", - # pyupgrade - "UP", - # flake8-bugbear - "B", - # isort - "I", -] -ignore = ["E501"] diff --git a/tests/infiles.py b/tests/infiles.py new file mode 100644 index 0000000..c96fbef --- /dev/null +++ b/tests/infiles.py @@ -0,0 +1,13 @@ +import pathlib + +# Bedfiles +TEST_BEDFILE = pathlib.Path(__file__).parent / "inputs/test.bed" +TEST_V2_BEDFILE = pathlib.Path(__file__).parent / "inputs/test.v2.bed" +TEST_WEIGHTS_BEDFILE = pathlib.Path(__file__).parent / "inputs/test.weights.bed" +TEST_WEIGHTS_BEDFILE = pathlib.Path(__file__).parent / "inputs/test.weights.bed" +TEST_ATTRIBUTES_BEDFILE = pathlib.Path(__file__).parent / "inputs/test.attributes.bed" +TEST_PROBE_BEDFILE = pathlib.Path(__file__).parent / "inputs/test.probe.bed" +TEST_PANEL_BEDFILE = pathlib.Path(__file__).parent / "inputs/panel.input.bed" + +# fasta +FASTA_PATH = pathlib.Path(__file__).parent / "inputs/msa.input.fasta" diff --git a/tests/inputs/panel.input.bed b/tests/inputs/panel.input.bed new file mode 100644 index 0000000..03e2a67 --- /dev/null +++ b/tests/inputs/panel.input.bed @@ -0,0 +1,18 @@ +# artic-bed-version v3.0 +# Amplicon Number and pos are not in order +NC_000962.3 528752 528772 hsp65_1_LEFT_1 1 + ACCAACGATGGTGTGTCCAT +NC_000962.3 529173 529193 hsp65_1_RIGHT_1 1 - CTTGTCGAACCGCATACCCT +NC_000962.3 1174932 1174951 hsp65_2_LEFT_1 1 + CCTGCTGCACTCCATCTAC +NC_000962.3 1174983 1175003 hsp65_2_RIGHT_1 1 - CGTCGAGTACCCGATCATAT +NC_000962.3 1474598 1474620 7ff56b50_3_LEFT_1 2 + TCCCCGAAATGCATTTAGGTGC +NC_000962.3 1475038 1475060 7ff56b50_3_RIGHT_1 2 - TCGCTACTCATGCCTGCATTCT +NC_000962.3 1474208 1474232 7ff56b50_4_LEFT_1 1 + GTGAGGGAATGGTGAAAAGTACCC +NC_000962.3 1474648 1474668 7ff56b50_4_RIGHT_1 1 - CCATCGGCCATCCAGTAGCT +NC_000962.3 1473811 1473837 7ff56b50_5_LEFT_1 2 + GCATCTGAATATATAGGGTGCGGGAG +NC_000962.3 1474251 1474273 7ff56b50_5_RIGHT_1 2 - TGTAGGCACACGGTTTCAGGTA +NC_000962.3 1474960 1474980 7ff56b50_6_LEFT_1 1 + TTGTGGTGGGTGTGGGTAGG +NC_000962.3 1475400 1475425 7ff56b50_6_RIGHT_1 1 - CTATGCGTCACCTCTCAGGATTAGT +NC_000962.3 5244 5266 7ff56b50_17_LEFT_1 1 + TGCCCAGAAAAAGAAGGCCCAA +NC_000962.3 5684 5708 7ff56b50_17_RIGHT_1 1 - CCAGGGGTTCCGACTTCTCATAAA +NC_000962.3 60087 60112 7ff56b50_284_LEFT_1 1 + CCGAAGAGATTCTTGTCCACACAAA +NC_000962.3 60527 60548 7ff56b50_284_RIGHT_1 1 - CCGTTCCAGTACATCGGCGAT \ No newline at end of file diff --git a/tests/test_bedfile.py b/tests/test_bedfile.py index a1e2e62..a67d041 100644 --- a/tests/test_bedfile.py +++ b/tests/test_bedfile.py @@ -32,13 +32,16 @@ version_primername, write_bedfile, ) +from tests.infiles import ( + TEST_ATTRIBUTES_BEDFILE, + TEST_BEDFILE, + TEST_PANEL_BEDFILE, + TEST_PROBE_BEDFILE, + TEST_V2_BEDFILE, + TEST_WEIGHTS_BEDFILE, +) -TEST_BEDFILE = pathlib.Path(__file__).parent / "inputs/test.bed" -TEST_V2_BEDFILE = pathlib.Path(__file__).parent / "inputs/test.v2.bed" -TEST_WEIGHTS_BEDFILE = pathlib.Path(__file__).parent / "inputs/test.weights.bed" -TEST_WEIGHTS_BEDFILE = pathlib.Path(__file__).parent / "inputs/test.weights.bed" -TEST_ATTRIBUTES_BEDFILE = pathlib.Path(__file__).parent / "inputs/test.attributes.bed" -TEST_PROBE_BEDFILE = pathlib.Path(__file__).parent / "inputs/test.probe.bed" +random.seed(100) class TestValidationFuncs(unittest.TestCase): @@ -229,7 +232,7 @@ def test_create_bedline_with_strand_diff(self): sequence="ACGT", ) - def test_bedline_create(self): + def test_bedline_create_left(self): bedline = BedLine( chrom="chr1", start=100, @@ -259,12 +262,51 @@ def test_bedline_create(self): ) self.assertEqual(bedline.primername, "scheme_1_LEFT") self.assertIsNone(bedline.primer_suffix) + self.assertEqual(Strand.FORWARD, bedline.strand_class) self.assertEqual(bedline.ipool, 0) self.assertEqual(bedline.primer_class, PrimerClass.LEFT) self.assertEqual( bedline.to_bed(), - "chr1\t100\t200\tscheme_1_LEFT\t1\t+\tACGT\n", + "chr1\t100\t200\tscheme_1_LEFT\t1\t+\tACGT\t\n", + ) + + def test_bedline_create_right(self): + bedline = BedLine( + chrom="chr1", + start=100, + end=200, + primername="scheme_1_RIGHT", + pool=1, + strand="-", + sequence="ACGT", + ) + # Provides values + self.assertEqual(bedline.chrom, "chr1") + self.assertEqual(bedline.start, 100) + self.assertEqual(bedline.end, 200) + self.assertEqual(bedline.primername, "scheme_1_RIGHT") + self.assertEqual(bedline.pool, 1) + self.assertEqual(bedline.strand, "-") + self.assertEqual(bedline.sequence, "ACGT") + self.assertIsNone(bedline.weight) + + # Derived values + self.assertEqual(bedline.length, 100) + self.assertEqual(bedline.amplicon_number, 1) + self.assertEqual(bedline.amplicon_prefix, "scheme") + self.assertEqual( + bedline.amplicon_name, + f"{bedline.amplicon_prefix}_{bedline.amplicon_number}", + ) + self.assertIsNone(bedline.primer_suffix) + self.assertEqual(Strand.REVERSE, bedline.strand_class) + + self.assertEqual(bedline.ipool, 0) + self.assertEqual(bedline.primer_class, PrimerClass.RIGHT) + self.assertEqual( + bedline.to_bed(), + "chr1\t100\t200\tscheme_1_RIGHT\t1\t-\tACGT\t\n", ) def test_bedline_create_empty_weight(self): @@ -296,7 +338,7 @@ def test_bedline_create_empty_weight(self): self.assertEqual(bedline.ipool, 0) self.assertEqual( bedline.to_bed(), - "chr1\t100\t200\tscheme_1_LEFT\t1\t+\tACGT\n", + "chr1\t100\t200\tscheme_1_LEFT\t1\t+\tACGT\t\n", ) def test_bedline_create_probe(self): @@ -562,7 +604,7 @@ def test_to_bed(self): ) self.assertEqual( bedline.to_bed(), - "chr1\t100\t200\tscheme_1_LEFT\t1\t+\tACGT\n", + "chr1\t100\t200\tscheme_1_LEFT\t1\t+\tACGT\t\n", ) # Provide weight bedline.weight = 1.0 @@ -583,7 +625,7 @@ def test_to_bed_probe(self): ) self.assertEqual( bedline.to_bed(), - "chr1\t100\t200\tscheme_1_PROBE\t1\t+\tACGT\n", + "chr1\t100\t200\tscheme_1_PROBE\t1\t+\tACGT\t\n", ) # Provide weight bedline.weight = 1.0 @@ -871,7 +913,7 @@ def test_attribute_setter(self): self.assertEqual(bedline.attributes, {}) # Ensure empty dict is not written to bed self.assertEqual( - bedline.to_bed(), "chr1\t100\t200\tscheme_1_LEFT\t1\t+\tACGT\n" + bedline.to_bed(), "chr1\t100\t200\tscheme_1_LEFT\t1\t+\tACGT\t\n" ) # Set string. Test pw is converted to float @@ -884,6 +926,13 @@ def test_attribute_setter(self): bedline.attributes = "pw=A;" self.assertIn("weight must be a float", str(context.exception)) + # Check attribute will be empty dict + bedline.attributes = None + self.assertEqual(bedline.attributes, {}) + + # Check the (if bedline.attributes:) pattern works + self.assertFalse(bedline.attributes) + class TestCreateBedline(unittest.TestCase): def test_create_bedline(self): @@ -959,17 +1008,17 @@ class TestCreateBedfileStr(unittest.TestCase): def test_create_bedfile_str(self): bedfile_str = create_bedfile_str(["#header1"], [self.bedline]) self.assertEqual( - bedfile_str, "#header1\nchr1\t100\t200\tscheme_1_LEFT\t1\t+\tACGT\n" + bedfile_str, "#header1\nchr1\t100\t200\tscheme_1_LEFT\t1\t+\tACGT\t\n" ) def test_create_bedfile_str_no_header(self): bedfile_str = create_bedfile_str([], [self.bedline]) - self.assertEqual(bedfile_str, "chr1\t100\t200\tscheme_1_LEFT\t1\t+\tACGT\n") + self.assertEqual(bedfile_str, "chr1\t100\t200\tscheme_1_LEFT\t1\t+\tACGT\t\n") def test_create_bedfile_str_malformed_header(self): bedfile_str = create_bedfile_str(["header1"], [self.bedline]) self.assertEqual( - bedfile_str, "#header1\nchr1\t100\t200\tscheme_1_LEFT\t1\t+\tACGT\n" + bedfile_str, "#header1\nchr1\t100\t200\tscheme_1_LEFT\t1\t+\tACGT\t\n" ) @@ -991,7 +1040,7 @@ def test_write_bedfile(self): with open(self.output_bed_path) as f: content = f.read() self.assertEqual( - content, "#header1\nchr1\t100\t200\tscheme_1_LEFT\t1\t+\tACGT\n" + content, "#header1\nchr1\t100\t200\tscheme_1_LEFT\t1\t+\tACGT\t\n" ) # Write weighted bedline bedline.weight = 1.0 @@ -1262,7 +1311,7 @@ def test_bedline_parser_to_str(self): ) bedfile_str = BedLineParser.to_str(["#header1"], [bedline]) self.assertEqual( - bedfile_str, "#header1\nchr1\t100\t200\tscheme_1_LEFT\t1\t+\tACGT\n" + bedfile_str, "#header1\nchr1\t100\t200\tscheme_1_LEFT\t1\t+\tACGT\t\n" ) def test_bedline_parser_to_file(self): @@ -1279,7 +1328,7 @@ def test_bedline_parser_to_file(self): with open(self.OUTFILE) as f: content = f.read() self.assertEqual( - content, "#header1\nchr1\t100\t200\tscheme_1_LEFT\t1\t+\tACGT\n" + content, "#header1\nchr1\t100\t200\tscheme_1_LEFT\t1\t+\tACGT\t\n" ) def tearDown(self) -> None: @@ -1376,20 +1425,6 @@ def test_downgrade_primername(self): new_primername = {bl.primername for bl in new_bedlines} self.assertEqual(new_primername, {"test_1_LEFT", "test_1_LEFT_alt1"}) - def test_sort_bedlines(self): - # Read in a bedfile - headers, bedlines = BedLineParser.from_file(TEST_BEDFILE) - - # Randomly shuffle the bedlines - random.seed(100) - random_bedlines = random.sample(bedlines, len(bedlines)) - - # Sort the bedlines - sorted_bedlines = sort_bedlines(random_bedlines) - - # Check that the bedlines are sorted - self.assertEqual(sorted_bedlines, bedlines) - def test_merge_primers_single(self): bedlines = [ BedLine( @@ -1499,5 +1534,118 @@ def test_expand_bedlines(self): self.assertEqual(expected_names, primer_names) +class TestBedLineSortOrder(unittest.TestCase): + """ + Test the default sort order of bedlines + """ + + def test_sort_funcs_bedlines(self): + """ + Tests the custom sort function + """ + # Pass cases + for bl_path in [TEST_PROBE_BEDFILE, TEST_BEDFILE]: + # Read in a bedfile + _headers, bedlines = BedLineParser.from_file(bl_path) + + # Randomly shuffle the bedlines + random_bedlines = random.sample(bedlines, len(bedlines)) + self.assertNotEqual( + random_bedlines, + bedlines, + f"shuffled bedlines are in same order as original. {bl_path.name}", + ) + # Sort the bedlines + sorted_bedlines = sort_bedlines(random_bedlines, by_pos=True) + + # Check that the bedlines are sorted + self.assertEqual( + sorted_bedlines, + bedlines, + f"shuffled bedlines are in different order as original. {bl_path.name}", + ) + + def test_sort_methods_bedlines(self): + """ + Tests the default class order. ie sorted() / sort + """ + # Read in a bedfile + for bl_path in [TEST_PROBE_BEDFILE, TEST_BEDFILE, TEST_PANEL_BEDFILE]: + _headers, bedlines = BedLineParser.from_file(bl_path) + # Randomly shuffle the bedlines + random_bedlines = random.sample(bedlines, len(bedlines)) + # check bedlines are now different + self.assertNotEqual( + random_bedlines, + bedlines, + f"shuffled bedlines are in same order as original. {bl_path.name}", + ) + # Sort the bedlines + sorted_bedlines = sorted(random_bedlines) + # Check that the bedlines back in original order + self.assertEqual( + sorted_bedlines, + bedlines, + f"shuffled bedlines are in different order as original. {bl_path.name}", + ) + + def test_sort_primercloud(self): + """ + This ensures that the correct sort order is applied for primers in the same cloud + + """ + + bedlines = [ + BedLine( + chrom="chr1", + start=100, + end=120, + primername="test_1_LEFT_1", + pool=1, + strand="+", + sequence="ACGT", + ), + BedLine( + chrom="chr1", + start=110, + end=130, + primername="test_1_LEFT_3", + pool=1, + strand="+", + sequence="ACGT", + ), + BedLine( + chrom="chr1", + start=110, + end=130, + primername="test_1_LEFT_2", + pool=1, + strand="+", + sequence="ACGT", + ), + ] + + # Test that bedlines are sorted based on primersuffix + sorted_bls = sorted(bedlines) + self.assertEqual( + [bl.primername for bl in sorted_bls], + ["test_1_LEFT_1", "test_1_LEFT_2", "test_1_LEFT_3"], + ) + # replace suffix with alt1 + bedlines[2].primer_suffix = "alt1" + sorted_bls = sorted(bedlines) + self.assertEqual( + [bl.primername for bl in sorted_bls], + ["test_1_LEFT_1", "test_1_LEFT_3", "test_1_LEFT_alt1"], + ) + # replace suffix with None + bedlines[1].primer_suffix = None + sorted_bls = sorted(bedlines) + self.assertEqual( + [bl.primername for bl in sorted_bls], + ["test_1_LEFT_1", "test_1_LEFT_alt1", "test_1_LEFT"], + ) + + if __name__ == "__main__": unittest.main() diff --git a/tests/test_diff.py b/tests/test_diff.py new file mode 100644 index 0000000..27bdbe1 --- /dev/null +++ b/tests/test_diff.py @@ -0,0 +1,357 @@ +import unittest + +from primalbedtools.diff import ( + diff_primernames, + diff_sequence, + ndiff_bedlines, + unified_diff_bedlines, +) +from primalbedtools.scheme import Scheme +from tests.infiles import ( + TEST_ATTRIBUTES_BEDFILE, +) + + +class TestNDiff(unittest.TestCase): + def setUp(self) -> None: + self.scheme1 = Scheme.from_file(str(TEST_ATTRIBUTES_BEDFILE)) + self.scheme2 = Scheme.from_file(str(TEST_ATTRIBUTES_BEDFILE)) + return super().setUp() + + def test_no_change(self): + # Ignore diff is empty + self.assertEqual( + list( + ndiff_bedlines( + self.scheme1.bedlines, self.scheme1.bedlines, ignore_no_diff=True + ) + ), + [], + ) + + def test_ignore_header(self): + """ + With ignore heading flag false detects a difference, with flag True no diff detected + """ + self.scheme1.headers = ["# header1"] + self.scheme2.headers = ["# header2"] + + # With ignore_header=False, should see differences + diffs = list( + ndiff_bedlines( + self.scheme1.bedlines, + self.scheme2.bedlines, + header1=self.scheme1.headers, + header2=self.scheme2.headers, + ignore_header=False, + ignore_no_diff=True, + ) + ) + self.assertNotEqual(diffs, []) + self.assertTrue(any(line.startswith("- # header1") for line in diffs)) + self.assertTrue(any(line.startswith("+ # header2") for line in diffs)) + + # With ignore_header=True, should see no differences + diffs = list( + ndiff_bedlines( + self.scheme1.bedlines, + self.scheme2.bedlines, + header1=self.scheme1.headers, + header2=self.scheme2.headers, + ignore_header=True, + ignore_no_diff=True, + ) + ) + self.assertEqual(diffs, []) + + def test_ignore_order(self): + """ + With ignore_order flag false detects a difference, with flag True no diff detected + """ + self.scheme2.bedlines.reverse() + + # With ignore_order=False, should see differences + diffs = list( + ndiff_bedlines( + self.scheme1.bedlines, + self.scheme2.bedlines, + ignore_order=False, + ignore_no_diff=True, + ) + ) + self.assertNotEqual(diffs, []) + + # With ignore_order=True, should see no differences (since content is same) + diffs = list( + ndiff_bedlines( + self.scheme1.bedlines, + self.scheme2.bedlines, + ignore_order=True, + ignore_no_diff=True, + ) + ) + self.assertEqual(diffs, []) + + def test_ignore_attr(self): + """ + With ignore_attr flag false detects a difference, with flag True no diff detected + """ + # add a new attribute of 'test=1' to scheme2 + self.scheme2.bedlines[0].attributes = {"test": 1} + + # With ignore_attr=False, should see differences + diffs = list( + ndiff_bedlines( + self.scheme1.bedlines, + self.scheme2.bedlines, + ignore_attr=False, + ignore_no_diff=True, + ) + ) + self.assertNotEqual(diffs, []) + + # With ignore_attr=True, should see no differences + diffs = list( + ndiff_bedlines( + self.scheme1.bedlines, + self.scheme2.bedlines, + ignore_attr=True, + ignore_no_diff=True, + ) + ) + self.assertEqual(diffs, []) + + def test_content_change(self): + """ + Test that changes in content (e.g. start position) are detected. + """ + # Change start position of first bedline + original_start = self.scheme2.bedlines[0].start + self.scheme2.bedlines[0].start = original_start + 10 + + diffs = list( + ndiff_bedlines( + self.scheme1.bedlines, + self.scheme2.bedlines, + ignore_no_diff=True, + ) + ) + self.assertNotEqual(diffs, []) + # Should see removal of old line and addition of new line + self.assertTrue(any(line.startswith("- ") for line in diffs)) + self.assertTrue(any(line.startswith("+ ") for line in diffs)) + + def test_empty_inputs(self): + """ + Test behaviour with empty inputs. + """ + diffs = list( + ndiff_bedlines( + [], + [], + ignore_no_diff=True, + ) + ) + self.assertEqual(diffs, []) + + # One empty + diffs = list( + ndiff_bedlines( + self.scheme1.bedlines, + [], + ignore_no_diff=True, + ) + ) + self.assertNotEqual(diffs, []) + # Should only have deletions + self.assertTrue(all(line.startswith("- ") for line in diffs)) + + def test_different_lengths(self): + """ + Test comparing schemes with different numbers of bedlines. + """ + # Remove the last bedline from scheme2 + self.scheme2.bedlines.pop() + + diffs = list( + ndiff_bedlines( + self.scheme1.bedlines, + self.scheme2.bedlines, + ignore_no_diff=True, + ) + ) + self.assertNotEqual(diffs, []) + # Should see the removed line as a deletion + self.assertTrue(any(line.startswith("- ") for line in diffs)) + + +class TestUnifiedDiff(unittest.TestCase): + def setUp(self) -> None: + self.scheme1 = Scheme.from_file(str(TEST_ATTRIBUTES_BEDFILE)) + self.scheme2 = Scheme.from_file(str(TEST_ATTRIBUTES_BEDFILE)) + return super().setUp() + + def test_no_change(self): + self.assertEqual( + list(unified_diff_bedlines(self.scheme1.bedlines, self.scheme1.bedlines)), + [], + ) + + def test_ignore_header(self): + self.scheme1.headers = ["# header1"] + self.scheme2.headers = ["# header2"] + + # With ignore_header=False + diffs = list( + unified_diff_bedlines( + self.scheme1.bedlines, + self.scheme2.bedlines, + header1=self.scheme1.headers, + header2=self.scheme2.headers, + ignore_header=False, + ) + ) + self.assertNotEqual(diffs, []) + self.assertTrue(any(line.startswith("---") for line in diffs)) + self.assertTrue(any(line.startswith("+++") for line in diffs)) + + # With ignore_header=True + diffs = list( + unified_diff_bedlines( + self.scheme1.bedlines, + self.scheme2.bedlines, + header1=self.scheme1.headers, + header2=self.scheme2.headers, + ignore_header=True, + ) + ) + self.assertEqual(diffs, []) + + def test_ignore_order(self): + self.scheme2.bedlines.reverse() + + # With ignore_order=False + diffs = list( + unified_diff_bedlines( + self.scheme1.bedlines, + self.scheme2.bedlines, + ignore_order=False, + ) + ) + self.assertNotEqual(diffs, []) + + # With ignore_order=True + diffs = list( + unified_diff_bedlines( + self.scheme1.bedlines, + self.scheme2.bedlines, + ignore_order=True, + ) + ) + self.assertEqual(diffs, []) + + def test_ignore_attr(self): + self.scheme2.bedlines[0].attributes = {"test": 1} + + # With ignore_attr=False + diffs = list( + unified_diff_bedlines( + self.scheme1.bedlines, + self.scheme2.bedlines, + ignore_attr=False, + ) + ) + self.assertNotEqual(diffs, []) + + # With ignore_attr=True + diffs = list( + unified_diff_bedlines( + self.scheme1.bedlines, + self.scheme2.bedlines, + ignore_attr=True, + ) + ) + self.assertEqual(diffs, []) + + def test_content_change(self): + original_start = self.scheme2.bedlines[0].start + self.scheme2.bedlines[0].start = original_start + 10 + + diffs = list( + unified_diff_bedlines( + self.scheme1.bedlines, + self.scheme2.bedlines, + ) + ) + self.assertNotEqual(diffs, []) + self.assertTrue(any(line.startswith("-") for line in diffs)) + self.assertTrue(any(line.startswith("+") for line in diffs)) + + def test_empty_inputs(self): + diffs = list( + unified_diff_bedlines( + [], + [], + ) + ) + self.assertEqual(diffs, []) + + diffs = list( + unified_diff_bedlines( + self.scheme1.bedlines, + [], + ) + ) + self.assertNotEqual(diffs, []) + + def test_different_lengths(self): + self.scheme2.bedlines.pop() + + diffs = list( + unified_diff_bedlines( + self.scheme1.bedlines, + self.scheme2.bedlines, + ) + ) + self.assertNotEqual(diffs, []) + + +class TestDiffPrimerNames(unittest.TestCase): + def setUp(self) -> None: + self.scheme1 = Scheme.from_file(str(TEST_ATTRIBUTES_BEDFILE)) + self.scheme2 = Scheme.from_file(str(TEST_ATTRIBUTES_BEDFILE)) + return super().setUp() + + def test_no_change(self): + diff1, diff2 = diff_primernames(self.scheme1.bedlines, self.scheme2.bedlines) + self.assertEqual(diff1, set()) + self.assertEqual(diff2, set()) + + def test_diff(self): + removed_primer = self.scheme2.bedlines.pop(0) + diff1, diff2 = diff_primernames(self.scheme1.bedlines, self.scheme2.bedlines) + self.assertIn(removed_primer.primername, diff1) + self.assertEqual(diff2, set()) + + +class TestDiffSequence(unittest.TestCase): + def setUp(self) -> None: + self.scheme1 = Scheme.from_file(str(TEST_ATTRIBUTES_BEDFILE)) + self.scheme2 = Scheme.from_file(str(TEST_ATTRIBUTES_BEDFILE)) + return super().setUp() + + def test_no_change(self): + diff1, diff2 = diff_sequence(self.scheme1.bedlines, self.scheme2.bedlines) + self.assertEqual(diff1, set()) + self.assertEqual(diff2, set()) + + def test_diff(self): + original_seq = self.scheme2.bedlines[0].sequence + self.scheme2.bedlines[0].sequence = "AAAA" + diff1, diff2 = diff_sequence(self.scheme1.bedlines, self.scheme2.bedlines) + self.assertIn(original_seq, diff1) + self.assertIn("AAAA", diff2) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_fasta.py b/tests/test_fasta.py index 72b413e..dc9bb4d 100644 --- a/tests/test_fasta.py +++ b/tests/test_fasta.py @@ -1,10 +1,8 @@ -import pathlib import unittest from io import StringIO from primalbedtools.fasta import read_fasta - -FASTA_PATH = pathlib.Path(__file__).parent / "inputs/msa.input.fasta" +from tests.infiles import FASTA_PATH class TestFasta(unittest.TestCase): diff --git a/tests/test_remap.py b/tests/test_remap.py index e470b23..e682974 100644 --- a/tests/test_remap.py +++ b/tests/test_remap.py @@ -3,10 +3,7 @@ from primalbedtools.bedfiles import BedLine from primalbedtools.fasta import read_fasta -from primalbedtools.remap import ( - create_mapping_list, - remap, -) +from primalbedtools.remap import create_mapping_list, remap class TestMappingList(unittest.TestCase): diff --git a/tests/test_scheme.py b/tests/test_scheme.py index 0ca94fa..fc97cde 100644 --- a/tests/test_scheme.py +++ b/tests/test_scheme.py @@ -1,15 +1,11 @@ -import pathlib import random import unittest -from primalbedtools.scheme import Scheme - -TEST_BEDFILE = pathlib.Path(__file__).parent / "inputs/test.bed" -TEST_V2_BEDFILE = pathlib.Path(__file__).parent / "inputs/test.v2.bed" -TEST_WEIGHTS_BEDFILE = pathlib.Path(__file__).parent / "inputs/test.weights.bed" -TEST_WEIGHTS_BEDFILE = pathlib.Path(__file__).parent / "inputs/test.weights.bed" -TEST_ATTRIBUTES_BEDFILE = pathlib.Path(__file__).parent / "inputs/test.attributes.bed" -TEST_PROBE_BEDFILE = pathlib.Path(__file__).parent / "inputs/test.probe.bed" +from primalbedtools.scheme import DEFAULT_CSV_HEADERS, Scheme +from tests.infiles import ( + TEST_ATTRIBUTES_BEDFILE, + TEST_PROBE_BEDFILE, +) class TestScheme(unittest.TestCase): @@ -108,3 +104,55 @@ def test_contains_probes(self): scheme = Scheme.from_file(str(TEST_ATTRIBUTES_BEDFILE)) self.assertFalse(scheme.contains_probes) + + def test_to_csv(self): + # Read the scheme in + scheme = Scheme.from_file(str(TEST_ATTRIBUTES_BEDFILE)) + + # Check include_headers=True, use_header_aliases=False + csv_str = scheme.to_delim_str(include_headers=True, use_header_aliases=False) + csv_line_list = csv_str.splitlines() + # Check default headers are present + test_headers = csv_line_list[0].split(",") + for exp_header in DEFAULT_CSV_HEADERS: + self.assertIn(exp_header, test_headers, f"{exp_header} not in first line") + + # Check attribute headers are there with no aliases + self.assertIn("pw", test_headers, "pw not in first line") + self.assertIn("gc", test_headers, "gc not in first line") + + # Check all bedlines are present + self.assertEqual(len(scheme.bedlines) + 1, len(csv_line_list)) + + def test_to_csv_aliases(self): + # Read the scheme in + scheme = Scheme.from_file(str(TEST_ATTRIBUTES_BEDFILE)) + # Check include_headers=True, use_header_aliases=True + csv_str = scheme.to_delim_str(include_headers=True, use_header_aliases=True) + csv_line_list = csv_str.splitlines() + # Check default headers are present + test_headers = csv_line_list[0].split(",") + for exp_header in DEFAULT_CSV_HEADERS: + self.assertIn(exp_header, test_headers, f"{exp_header} not in first line") + + # Check attribute headers are there with no aliases + self.assertIn("pw", test_headers, "pw not in first line") + self.assertIn("fractiongc", test_headers, "fractiongc not in first line") + + # Check all bedlines are present + self.assertEqual(len(scheme.bedlines) + 1, len(csv_line_list)) + + def test_to_csv_no_header(self): + # Read the scheme in + scheme = Scheme.from_file(str(TEST_ATTRIBUTES_BEDFILE)) + # Check include_headers=True, use_header_aliases=True + csv_str = scheme.to_delim_str(include_headers=False, use_header_aliases=True) + csv_line_list = csv_str.splitlines() + # Check default headers are present + test_headers = csv_line_list[0].split(",") + for exp_header in DEFAULT_CSV_HEADERS: + self.assertNotIn( + exp_header, test_headers, f"{exp_header} found in first line" + ) + # Check all bedlines are present + self.assertEqual(len(scheme.bedlines), len(csv_line_list)) diff --git a/tests/test_validate.py b/tests/test_validate.py index 80461bd..8c4fa13 100644 --- a/tests/test_validate.py +++ b/tests/test_validate.py @@ -1,4 +1,3 @@ -import pathlib import unittest from primalbedtools.amplicons import Amplicon @@ -9,8 +8,7 @@ validate_primerbed, validate_ref_and_bed, ) - -FASTA_PATH = pathlib.Path(__file__).parent / "inputs/msa.input.fasta" +from tests.infiles import FASTA_PATH class TestValidate(unittest.TestCase): diff --git a/uv.lock b/uv.lock index fec1eb4..2ef5ff3 100644 --- a/uv.lock +++ b/uv.lock @@ -596,7 +596,7 @@ wheels = [ [[package]] name = "primalbedtools" -version = "0.10.2" +version = "0.11.0" source = { editable = "." } [package.dev-dependencies]