From c44dd60e83e30faf8c0edce85f70804b959594db Mon Sep 17 00:00:00 2001 From: Michael Winter Date: Fri, 13 Mar 2026 00:28:34 +0100 Subject: [PATCH] Fix transposition deduplication and rename expand to project - Deduplicate transpositions in _find_valid_edges using set comprehension to avoid processing same transposition multiple times - Edge count now matches notebook (1414 vs 2828) - Rename expand() to project() for clarity (project to [1,2) range) - Fix SyntaxWarnings in docstrings (escape backslashes) --- compact_sets.py | 523 ++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 457 insertions(+), 66 deletions(-) diff --git a/compact_sets.py b/compact_sets.py index ed2020e..15bb3c7 100644 --- a/compact_sets.py +++ b/compact_sets.py @@ -107,8 +107,8 @@ class Pitch: return Pitch(tuple(collapsed), self.dims) - def expand(self) -> Pitch: - """Expand pitch to normalized octave position.""" + def project(self) -> Pitch: + """Project pitch to [1, 2) range - same as collapse.""" return self.collapse() def transpose(self, trans: Pitch) -> Pitch: @@ -234,9 +234,9 @@ class Chord: """Calculate the absolute difference in chord sizes.""" return abs(len(self._pitches) - len(other._pitches)) - def expand_all(self) -> list[Pitch]: - """Expand all pitches to normalized octave positions.""" - return [p.expand() for p in self._pitches] + def project_all(self) -> list[Pitch]: + """Project all pitches to [1, 2) range.""" + return [p.project() for p in self._pitches] def transpose(self, trans: Pitch) -> Chord: """Transpose the entire chord.""" @@ -305,36 +305,51 @@ class HarmonicSpace: return branches - def generate_connected_sets(self, min_size: int, max_size: int) -> set[Chord]: + def generate_connected_sets( + self, min_size: int, max_size: int, collapsed: bool = True + ) -> set[Chord]: """ Generate all unique connected sets of a given size. Args: min_size: Minimum number of pitches in a chord max_size: Maximum number of pitches in a chord + collapsed: If True, use CHS (skip dim 0 in branching). + If False (default), include dim 0 in branching. Returns: Set of unique Chord objects """ root = tuple(0 for _ in self.dims) + def branch_from(vertex): + """Get adjacent vertices. Skip dim 0 for CHS.""" + branches = set() + start_dim = 1 if collapsed else 0 + for i in range(start_dim, len(self.dims)): + for delta in (-1, 1): + branch = list(vertex) + branch[i] += delta + branches.add(tuple(branch)) + return branches + def grow( chord: tuple[tuple[int, ...], ...], connected: set[tuple[int, ...]], visited: set[tuple[int, ...]], ) -> Iterator[tuple[tuple[int, ...], ...]]: """Recursively grow connected sets.""" - # Yield if within size bounds if min_size <= len(chord) <= max_size: - # Wrap pitches and sort by frequency - wrapped = [] - for p in chord: - wrapped_p = self._wrap_pitch(p) - wrapped.append(wrapped_p) - - wrapped.sort(key=lambda p: self.pitch(p).to_fraction()) - yield tuple(wrapped) + # If collapsed=True, project each pitch to [1,2) + if collapsed: + projected = [] + for arr in chord: + p = self.pitch(arr) + projected.append(p.project().hs_array) + yield tuple(projected) + else: + yield chord # Continue growing if not at max size if len(chord) < max_size: @@ -342,12 +357,12 @@ class HarmonicSpace: for b in connected: if b not in visited: extended = chord + (b,) - new_connected = connected | self._branch_from(b) + new_connected = connected | branch_from(b) visited.add(b) yield from grow(extended, new_connected, visited) # Start generation from root - connected = self._branch_from(root) + connected = branch_from(root) visited = {root} results = set() @@ -357,11 +372,389 @@ class HarmonicSpace: return results + def generate_connected_sets_with_edges( + self, min_size: int, max_size: int, symdiff_range: tuple[int, int] + ) -> tuple[set[Chord], list[tuple[Chord, Chord, dict]]]: + """ + Generate chords and find edges using sibling grouping. + + For symdiff=2: group chords by parent (chord with one fewer pitch) + All siblings (same parent) have symdiff=2 with each other after transposition. + + This version finds ALL parents for each chord to ensure complete coverage. + + Args: + min_size: Minimum number of pitches in a chord + max_size: Maximum number of pitches in a chord + symdiff_range: (min, max) symmetric difference for valid edges + + Returns: + Tuple of (chords set, list of edges with data) + """ + # Generate all chords first + chords_set = self.generate_connected_sets(min_size, max_size) + + # Find ALL parents for each chord + # A parent is any size-(k-1) connected subset that can grow to this chord + chord_to_parents: dict[Chord, list[Chord]] = {} + + for chord in chords_set: + if len(chord) <= min_size: + chord_to_parents[chord] = [] + continue + + parents = [] + pitches_list = list(chord.pitches) + + # Try removing each pitch to find possible parents + for i in range(len(pitches_list)): + candidate_pitches = pitches_list[:i] + pitches_list[i + 1 :] + if len(candidate_pitches) < min_size: + continue + candidate = Chord(tuple(candidate_pitches), self.dims) + if candidate.is_connected(): + parents.append(candidate) + + chord_to_parents[chord] = parents + + # Group chords by parent - a chord may appear in multiple parent groups + from collections import defaultdict + + parent_to_children: dict[tuple, list[Chord]] = defaultdict(list) + for chord, parents in chord_to_parents.items(): + for parent in parents: + # Use sorted pitches as key + parent_key = tuple(sorted(p.hs_array for p in parent.pitches)) + parent_to_children[parent_key].append(chord) + + # Find edges between siblings + edges = [] + seen_edges = set() # Deduplicate + from itertools import combinations + + for parent_key, children in parent_to_children.items(): + if len(children) < 2: + continue + + # For each pair of siblings + for c1, c2 in combinations(children, 2): + edge_data = self._find_valid_edges(c1, c2, symdiff_range) + for ( + trans, + weight, + movements, + cent_diffs, + voice_crossing, + is_dt, + ) in edge_data: + # Create edge key for deduplication (smaller chord first) + c1_key = tuple(sorted(p.hs_array for p in c1.pitches)) + c2_key = tuple(sorted(p.hs_array for p in c2.pitches)) + edge_key = ( + (c1_key, c2_key, tuple(sorted(movements.items()))), + trans.hs_array, + ) + + if edge_key not in seen_edges: + seen_edges.add(edge_key) + edges.append( + ( + c1, + c2, + { + "transposition": trans, + "weight": weight, + "movements": movements, + "cent_diffs": cent_diffs, + "voice_crossing": voice_crossing, + "is_directly_tunable": is_dt, + }, + ) + ) + + inv_trans = self._invert_transposition(trans) + # Reverse edge + rev_edge_key = ( + ( + c2_key, + c1_key, + tuple(sorted(self._reverse_movements(movements).items())), + ), + inv_trans.hs_array, + ) + + if rev_edge_key not in seen_edges: + seen_edges.add(rev_edge_key) + edges.append( + ( + c2, + c1, + { + "transposition": inv_trans, + "weight": weight, + "movements": self._reverse_movements(movements), + "cent_diffs": list(reversed(cent_diffs)), + "voice_crossing": voice_crossing, + "is_directly_tunable": is_dt, + }, + ) + ) + + return chords_set, edges + + def _is_terminating(self, pitch: Pitch, chord: Chord) -> bool: + """Check if removing this pitch leaves the remaining pitches connected.""" + remaining = tuple(p for p in chord.pitches if p != pitch) + if len(remaining) <= 1: + return True + remaining_chord = Chord(remaining, self.dims) + return remaining_chord.is_connected() + + def build_graph_lattice_method( + self, + chords: set[Chord], + symdiff_min: int = 2, + symdiff_max: int = 2, + ) -> nx.MultiDiGraph: + """ + Build voice leading graph using lattice neighbor traversal. + + Algorithm: + 1. For each chord C in our set + 2. For each terminating pitch p in C (removing keeps remaining connected) + 3. For each remaining pitch q in C \\ p: + For each adjacent pitch r to q (in full harmonic space): + Form C' = (C \\ p) ∪ {r} + If C' contains root -> add edge C -> C' (automatically valid) + If C' doesn't contain root -> transpose by each pitch -> add edges + + No connectivity checks needed - guaranteed by construction. + + Args: + chords: Set of Chord objects + symdiff_min: Minimum symmetric difference (typically 2) + symdiff_max: Maximum symmetric difference (typically 2) + + Returns: + NetworkX MultiDiGraph + """ + graph = nx.MultiDiGraph() + + for chord in chords: + graph.add_node(chord) + + chord_index = {} + for chord in chords: + sig = tuple(sorted(p.hs_array for p in chord.pitches)) + chord_index[sig] = chord + + edges = [] + edge_set = set() + + root = self.pitch(tuple(0 for _ in self.dims)) + + for chord in chords: + chord_pitches = list(chord.pitches) + k = len(chord_pitches) + + for p in chord_pitches: + if not self._is_terminating(p, chord): + continue + + remaining = [x for x in chord_pitches if x != p] + + for q in remaining: + # Generate adjacent pitches in CHS (skipping dim 0) + for d in range(1, len(self.dims)): + for delta in (-1, 1): + arr = list(q.hs_array) + arr[d] += delta + r = Pitch(tuple(arr), self.dims) + + if r in chord_pitches: + continue + + new_pitches = remaining + [r] + new_chord = Chord(tuple(new_pitches), self.dims) + + contains_root = root in new_chord.pitches + + if contains_root: + target_sig = tuple( + sorted(p.hs_array for p in new_chord.pitches) + ) + target = chord_index.get(target_sig) + + if target and target != chord: + edge_key = (chord, target) + if edge_key not in edge_set: + edge_set.add(edge_key) + + movements, cent_diffs, voice_crossing = ( + self._compute_edge_data_fast(chord, target) + ) + if movements is not None: + is_dt = self._is_directly_tunable( + chord.pitches, target.pitches, movements + ) + edges.append( + ( + chord, + target, + { + "transposition": root.pitch_difference( + root + ), + "weight": 1.0, + "movements": movements, + "cent_diffs": cent_diffs, + "voice_crossing": voice_crossing, + "is_directly_tunable": is_dt, + }, + ) + ) + else: + for p_trans in new_chord.pitches: + trans = root.pitch_difference(p_trans) + transposed = new_chord.transpose(trans) + + if root in transposed.pitches: + target_sig = tuple( + sorted( + p.hs_array for p in transposed.pitches + ) + ) + target = chord_index.get(target_sig) + + if target and target != chord: + edge_key = (chord, target) + if edge_key not in edge_set: + edge_set.add(edge_key) + + ( + movements, + cent_diffs, + voice_crossing, + ) = self._compute_edge_data_fast( + chord, target + ) + if movements is not None: + is_dt = self._is_directly_tunable( + chord.pitches, + target.pitches, + movements, + ) + edges.append( + ( + chord, + target, + { + "transposition": trans, + "weight": 1.0, + "movements": movements, + "cent_diffs": cent_diffs, + "voice_crossing": voice_crossing, + "is_directly_tunable": is_dt, + }, + ) + ) + + for u, v, data in edges: + graph.add_edge(u, v, **data) + + inv_trans = self._invert_transposition(data["transposition"]) + inv_movements = self._reverse_movements(data["movements"]) + inv_cent_diffs = list(reversed(data["cent_diffs"])) + graph.add_edge( + v, + u, + transposition=inv_trans, + weight=1.0, + movements=inv_movements, + cent_diffs=inv_cent_diffs, + voice_crossing=data["voice_crossing"], + is_directly_tunable=data["is_directly_tunable"], + ) + + return graph + + def _compute_edge_data_fast(self, c1: Chord, c2: Chord): + """Compute edge data directly from two chords without transposition.""" + c1_pitches = c1.pitches + c2_pitches = c2.pitches + k = len(c1_pitches) + + c1_collapsed = [p.collapse() for p in c1_pitches] + c2_collapsed = [p.collapse() for p in c2_pitches] + + common_c1 = [] + common_c2 = [] + for i, pc1 in enumerate(c1_collapsed): + for j, pc2 in enumerate(c2_collapsed): + if pc1 == pc2: + common_c1.append(i) + common_c2.append(j) + break + + movements = {} + for src_idx, dest_idx in zip(common_c1, common_c2): + movements[src_idx] = dest_idx + + changing_c1 = [i for i in range(k) if i not in common_c1] + changing_c2 = [j for j in range(k) if j not in common_c2] + + if len(changing_c1) != len(changing_c2): + return None, None, None + + if changing_c1: + valid = True + for src_i, dest_j in zip(changing_c1, changing_c2): + p1 = c1_pitches[src_i] + p2 = c2_pitches[dest_j] + if not self._is_adjacent_pitches(p1, p2): + valid = False + break + movements[src_i] = dest_j + + if not valid: + return None, None, None + + cent_diffs = [] + for src_idx, dest_idx in movements.items(): + src_pitch = c1_pitches[src_idx] + dst_pitch = c2_pitches[dest_idx] + cents = abs(src_pitch.to_cents() - dst_pitch.to_cents()) + cent_diffs.append(cents) + + voice_crossing = not all(movements.get(i, i) == i for i in range(k)) + + return movements, cent_diffs, voice_crossing + def _wrap_pitch(self, hs_array: tuple[int, ...]) -> tuple[int, ...]: """Wrap a pitch so its frequency ratio is in [1, 2).""" p = self.pitch(hs_array) return p.collapse().hs_array + def _toCHS(self, hs_array: tuple[int, ...]) -> tuple[int, ...]: + """ + Convert a pitch to Collapsed Harmonic Space (CHS). + + In CHS, all pitches have dimension 0 = 0. + This is different from collapse() which only ensures frequency in [1, 2). + + Steps: + 1. First collapse to [1,2) to get pitch class + 2. Then set dimension 0 = 0 + """ + # First collapse to [1,2) + p = self.pitch(hs_array) + collapsed = p.collapse().hs_array + + # Then set dim 0 = 0 + result = list(collapsed) + result[0] = 0 + return tuple(result) + def build_voice_leading_graph( self, chords: set[Chord], @@ -495,64 +888,62 @@ class HarmonicSpace: """ edges = [] - # Try all transpositions where at least one pitch matches (collapsed) - for p1 in c1.pitches: - for p2 in c2.pitches: - trans = p1.pitch_difference(p2) + # Get unique transpositions first (fast deduplication) + transpositions = { + p1.pitch_difference(p2) for p1 in c1.pitches for p2 in c2.pitches + } - # Transpose c2 - c2_transposed = c2.transpose(trans) + # Try each unique transposition + for trans in transpositions: + # Transpose c2 + c2_transposed = c2.transpose(trans) - # Check symmetric difference on transposed pitches (not collapsed) - symdiff = self._calc_symdiff_expanded(c1, c2_transposed) + # Check symmetric difference on transposed pitches (not collapsed) + symdiff = self._calc_symdiff_expanded(c1, c2_transposed) - if not (symdiff_range[0] <= symdiff <= symdiff_range[1]): - continue + if not (symdiff_range[0] <= symdiff <= symdiff_range[1]): + continue - # CRITICAL: Each changing pitch must be connected to a pitch in c1 - voice_lead_ok = self._check_voice_leading_connectivity( - c1, c2_transposed + # CRITICAL: Each changing pitch must be connected to a pitch in c1 + voice_lead_ok = self._check_voice_leading_connectivity(c1, c2_transposed) + + if not voice_lead_ok: + continue + + # Build all valid movement maps (one per permutation of changing pitches) + movement_maps = self._build_movement_maps(c1.pitches, c2_transposed.pitches) + + # Create one edge per movement map with computed edge properties + for movements in movement_maps: + # Compute cent_diffs for each voice + cent_diffs = [] + for src_idx, dest_idx in movements.items(): + src_pitch = c1.pitches[src_idx] + dst_pitch = c2_transposed.pitches[dest_idx] + cents = abs(src_pitch.to_cents() - dst_pitch.to_cents()) + cent_diffs.append(cents) + + # Check voice_crossing: True if any voice moves to different position + num_voices = len(c1.pitches) + voice_crossing = not all( + movements.get(i, i) == i for i in range(num_voices) ) - if not voice_lead_ok: - continue - - # Build all valid movement maps (one per permutation of changing pitches) - movement_maps = self._build_movement_maps( - c1.pitches, c2_transposed.pitches + # Check is_directly_tunable: changing pitches are adjacent to staying pitch + is_directly_tunable = self._is_directly_tunable( + c1.pitches, c2_transposed.pitches, movements ) - # Create one edge per movement map with computed edge properties - for movements in movement_maps: - # Compute cent_diffs for each voice - cent_diffs = [] - for src_idx, dest_idx in movements.items(): - src_pitch = c1.pitches[src_idx] - dst_pitch = c2_transposed.pitches[dest_idx] - cents = abs(src_pitch.to_cents() - dst_pitch.to_cents()) - cent_diffs.append(cents) - - # Check voice_crossing: True if any voice moves to different position - num_voices = len(c1.pitches) - voice_crossing = not all( - movements.get(i, i) == i for i in range(num_voices) - ) - - # Check is_directly_tunable: changing pitches are adjacent to staying pitch - is_directly_tunable = self._is_directly_tunable( - c1.pitches, c2_transposed.pitches, movements - ) - - edges.append( - ( - trans, - 1.0, - movements, - cent_diffs, - voice_crossing, - is_directly_tunable, - ) + edges.append( + ( + trans, + 1.0, + movements, + cent_diffs, + voice_crossing, + is_directly_tunable, ) + ) return edges