Fix asymmetric connectivity in voice leading graph

- Compute both c1→c2 and c2→c1 edges independently using _find_valid_edges()
- Fix _is_directly_tunable to properly reorder c2_transposed using movement map
- Clean up unused valid_pairings code in _build_movement_maps
- Add edge_data field to PathStep for debugging
This commit is contained in:
Michael Winter 2026-03-23 18:46:14 +01:00
parent dd637e64e2
commit d0bd15574d
4 changed files with 217 additions and 56 deletions

View file

@ -150,6 +150,7 @@ class HarmonicSpace:
graph.add_node(chord) graph.add_node(chord)
for c1, c2 in combinations(chords, 2): for c1, c2 in combinations(chords, 2):
# Compute c1 -> c2
edges = self._find_valid_edges(c1, c2, symdiff_range) edges = self._find_valid_edges(c1, c2, symdiff_range)
for edge_data in edges: for edge_data in edges:
( (
@ -170,13 +171,25 @@ class HarmonicSpace:
voice_crossing=voice_crossing, voice_crossing=voice_crossing,
is_directly_tunable=is_directly_tunable, is_directly_tunable=is_directly_tunable,
) )
# Compute c2 -> c1 independently
edges = self._find_valid_edges(c2, c1, symdiff_range)
for edge_data in edges:
(
trans,
weight,
movements,
cent_diffs,
voice_crossing,
is_directly_tunable,
) = edge_data
graph.add_edge( graph.add_edge(
c2, c2,
c1, c1,
transposition=self._invert_transposition(trans), transposition=trans,
weight=weight, weight=weight,
movements=self._reverse_movements(movements), movements=movements,
cent_diffs=list(reversed(cent_diffs)), cent_diffs=cent_diffs,
voice_crossing=voice_crossing, voice_crossing=voice_crossing,
is_directly_tunable=is_directly_tunable, is_directly_tunable=is_directly_tunable,
) )
@ -196,12 +209,29 @@ class HarmonicSpace:
c2_transposed_pitches: tuple[Pitch, ...], c2_transposed_pitches: tuple[Pitch, ...],
movements: dict, movements: dict,
) -> bool: ) -> bool:
"""Check if all changing pitches are adjacent (directly tunable) to a staying pitch.""" """Check if all changing pitches are adjacent (directly tunable) to a staying pitch.
staying_indices = [i for i in range(len(c1_pitches)) if movements.get(i) == i]
1. Reorder c2_transposed using movement map to get destination pitches per voice
2. Compare c1[i] with reordered[i]: if equal, pitch stays; if different, pitch changes
3. For each changed position, check if new pitch is adjacent to any staying pitch
"""
# Reorder c2_transposed using movement map
# movements maps: source_index -> destination_index
# After reordering, reordered[i] = c2_transposed at position movements[i]
reordered = [None] * len(c1_pitches)
for i in range(len(c1_pitches)):
dest_idx = movements.get(i, i)
reordered[i] = c2_transposed_pitches[dest_idx]
# Find staying positions (where pitch doesn't change)
staying_indices = [
i for i in range(len(c1_pitches)) if c1_pitches[i] == reordered[i]
]
if not staying_indices: if not staying_indices:
return False return False
# Find changing positions
changing_indices = [ changing_indices = [
i for i in range(len(c1_pitches)) if i not in staying_indices i for i in range(len(c1_pitches)) if i not in staying_indices
] ]
@ -209,8 +239,9 @@ class HarmonicSpace:
if not changing_indices: if not changing_indices:
return True return True
# For each changing position, check if new pitch is adjacent to any staying pitch
for ch_idx in changing_indices: for ch_idx in changing_indices:
ch_pitch = c2_transposed_pitches[ch_idx] ch_pitch = reordered[ch_idx]
is_adjacent_to_staying = False is_adjacent_to_staying = False
for st_idx in staying_indices: for st_idx in staying_indices:
@ -328,27 +359,14 @@ class HarmonicSpace:
c1_changing = [c1_pitches[i] for i in changing_indices_c1] c1_changing = [c1_pitches[i] for i in changing_indices_c1]
c2_changing = [c2_transposed_pitches[i] for i in changing_indices_c2] c2_changing = [c2_transposed_pitches[i] for i in changing_indices_c2]
valid_pairings = []
for p1 in c1_changing:
pairings = []
for p2 in c2_changing:
if self._is_adjacent_pitches(p1, p2):
cents = abs(p1.to_cents() - p2.to_cents())
pairings.append((p1, p2, cents))
valid_pairings.append(pairings)
all_maps = [] all_maps = []
num_changing = len(c2_changing) num_changing = len(c2_changing)
for perm in permutations(range(num_changing)): for perm in permutations(range(num_changing)):
new_map = dict(base_map) new_map = dict(base_map)
valid = True
for i, c1_idx in enumerate(changing_indices_c1): for i, c1_idx in enumerate(changing_indices_c1):
dest_idx = changing_indices_c2[perm[i]] dest_idx = changing_indices_c2[perm[i]]
new_map[c1_idx] = dest_idx new_map[c1_idx] = dest_idx
if valid:
all_maps.append(new_map) all_maps.append(new_map)
return all_maps return all_maps

View file

@ -55,6 +55,50 @@ def write_chord_sequence_frequencies(
f.write(")\n") f.write(")\n")
def _serialize_edge_data(edge_data: dict) -> dict:
"""Serialize edge data for JSON, converting Pitch objects to lists."""
if not edge_data:
return {}
result = {}
for key, value in edge_data.items():
if hasattr(value, "hs_array"):
result[key] = list(value.hs_array)
elif isinstance(value, list) and value and hasattr(value[0], "hs_array"):
result[key] = [list(p.hs_array) for p in value]
else:
result[key] = value
return result
def write_path_steps(path: "Path", output_path: str) -> None:
"""Write path with all step data to JSON."""
steps_data = []
for i, step in enumerate(path.steps):
step_data = {
"step": i,
"source_node": [list(p.hs_array) for p in step.source_node.pitches],
"destination_node": [
list(p.hs_array) for p in step.destination_node.pitches
],
"source_chord": [list(p.hs_array) for p in step.source_chord.pitches],
"destination_chord": [
list(p.hs_array) for p in step.destination_chord.pitches
],
"transposition": list(step.transposition.hs_array)
if step.transposition
else None,
"movements": {str(k): v for k, v in step.movements.items()},
"scores": step.scores,
"normalized_scores": step.normalized_scores,
"weight": step.weight,
"edge_data": _serialize_edge_data(step.edge_data),
}
steps_data.append(step_data)
with open(output_path, "w") as f:
json.dump(steps_data, f, indent=2)
def graph_to_dict(graph: "nx.MultiDiGraph") -> dict: def graph_to_dict(graph: "nx.MultiDiGraph") -> dict:
"""Serialize graph to a dict for JSON.""" """Serialize graph to a dict for JSON."""
from .pitch import Pitch from .pitch import Pitch
@ -354,6 +398,11 @@ def main():
action="store_true", action="store_true",
help="Show analysis statistics after generation", help="Show analysis statistics after generation",
) )
parser.add_argument(
"--output-path-steps",
action="store_true",
help="Export path steps with full edge data to path_steps.json",
)
parser.add_argument( parser.add_argument(
"--osc-play", "--osc-play",
nargs="?", nargs="?",
@ -521,6 +570,10 @@ def main():
) )
print(f"Written to {args.output_dir}/output_frequencies.txt") print(f"Written to {args.output_dir}/output_frequencies.txt")
if args.output_path_steps:
write_path_steps(path_obj, os.path.join(args.output_dir, "path_steps.json"))
print(f"Written to {args.output_dir}/path_steps.json")
# Show stats if requested # Show stats if requested
if args.stats: if args.stats:
from .analyze import analyze_chords, format_analysis from .analyze import analyze_chords, format_analysis
@ -550,7 +603,7 @@ def main():
print(format_analysis(metrics)) print(format_analysis(metrics))
# OSC playback if enabled # OSC playback if enabled
if args.osc_enable: if args.osc_play:
from .osc_sender import OSCSender from .osc_sender import OSCSender
chords_file = os.path.join(args.output_dir, "output_chords.json") chords_file = os.path.join(args.output_dir, "output_chords.json")

View file

@ -30,6 +30,7 @@ class PathStep:
sustain_counts_after: tuple[int, ...] | None = None sustain_counts_after: tuple[int, ...] | None = None
new_cumulative_trans: Pitch | None = None new_cumulative_trans: Pitch | None = None
new_voice_map: list[int] = field(default_factory=list) new_voice_map: list[int] = field(default_factory=list)
edge_data: dict = field(default_factory=dict)
class Path: class Path:
@ -167,6 +168,7 @@ class Path:
sustain_counts_after=tuple(sustain_after), sustain_counts_after=tuple(sustain_after),
new_cumulative_trans=new_cumulative_trans, new_cumulative_trans=new_cumulative_trans,
new_voice_map=new_voice_map, new_voice_map=new_voice_map,
edge_data=edge_data,
) )
candidates.append(step) candidates.append(step)

View file

@ -47,15 +47,27 @@ NOTE_NAMES_FLATS = [
"b", "b",
] ]
OCTAVE_STRINGS = [",,,", ",,", ",", "", "'", "''", "'''", "''''", "'''''", "''''''"] OCTAVE_STRINGS = [
",,,,",
",,,",
",,",
",",
"",
"'",
"''",
"'''",
"''''",
"'''''",
"''''''",
]
DURATION_MAP = { DURATION_MAP = {
1: "8", 1: "8",
2: "4", 2: "4",
3: "4.", 3: "4.",
4: "2", 4: "1",
6: "2.", 6: "2.",
8: "1", 8: "2",
} }
@ -80,6 +92,19 @@ def midi_to_octave(midi):
return (round(midi) // 12) - 1 return (round(midi) // 12) - 1
def get_clef_for_midi(midi):
"""Determine clef based on MIDI note number.
Two-threshold system at middle C (C4 = MIDI 60):
- MIDI >= 60: treble clef
- MIDI < 60: bass clef
"""
if midi >= 60:
return "treble"
else:
return "bass"
def freq_to_lilypond(freq, spelling="sharps", prev_pitch=None): def freq_to_lilypond(freq, spelling="sharps", prev_pitch=None):
"""Convert frequency to LilyPond note name. """Convert frequency to LilyPond note name.
@ -103,7 +128,7 @@ def freq_to_lilypond(freq, spelling="sharps", prev_pitch=None):
else: else:
note_name = NOTE_NAMES_SHARPS[pc] note_name = NOTE_NAMES_SHARPS[pc]
oct_str = OCTAVE_STRINGS[octave + 4] if octave >= -4 else ",," * (-octave - 4) oct_str = OCTAVE_STRINGS[octave + 1] if octave >= -4 else ",," * (-octave - 4)
return note_name + oct_str return note_name + oct_str
@ -122,10 +147,10 @@ def format_cents_deviation(freq):
deviation = (midi - round(midi)) * 100 deviation = (midi - round(midi)) * 100
deviation = round(deviation) deviation = round(deviation)
if deviation != 0: if deviation > 0:
sign = "+" if deviation > 0 else "" return f"+{deviation}"
return f"{sign}{deviation}" else:
return None return str(deviation)
def format_dim_diff(dim_diff, ref): def format_dim_diff(dim_diff, ref):
@ -144,58 +169,117 @@ def format_dim_diff(dim_diff, ref):
return f'_\\markup {{ \\lower #3 \\pad-markup #0.2 \\concat{{ "{ref_name}"\\normal-size-super " {diff_str}" }} }}' return f'_\\markup {{ \\lower #3 \\pad-markup #0.2 \\concat{{ "{ref_name}"\\normal-size-super " {diff_str}" }} }}'
def generate_part(voice_data, voice_name, voice_idx, beats_per_measure=4): def generate_part(voice_data, voice_name, voice_idx, clef=None, beats_per_measure=4):
"""Generate LilyPond music string for a single voice. """Generate LilyPond music string for a single voice.
Args: Args:
voice_data: List of [freq, duration_beats, ref, dim_diff] events voice_data: List of [freq, duration_beats, ref, dim_diff] events
voice_name: Voice name (e.g., "I", "II", "III") voice_name: Voice name (e.g., "I", "II", "III")
voice_idx: Voice index (0=I, 1=II, 2=III) voice_idx: Voice index (0=I, 1=II, 2=III)
clef: LilyPond clef name (e.g., "treble", "alto", "bass") - optional, determined from first note if not provided
beats_per_measure: Beats per measure (default 4 for 4/4) beats_per_measure: Beats per measure (default 4 for 4/4)
Returns: Returns:
LilyPond music string LilyPond music string with clef and time signature
""" """
measures = [] if not voice_data:
current_measure_notes = [] return "\\numericTimeSignature \\time 4/4\n"
beat_in_measure = 0
last_note = None first_freq = voice_data[0][0]
if clef is None:
initial_clef = get_clef_for_midi(cps_to_midi(first_freq))
else:
initial_clef = clef
prefix = f"\\clef {initial_clef}\n"
prefix += "\\numericTimeSignature \\time 4/4\n"
spelling = "sharps" spelling = "sharps"
notes = []
for event in voice_data: for event in voice_data:
freq = event[0] freq = event[0]
dur_beats = event[1] if len(event) > 1 else 1 dur_beats = event[1] if len(event) > 1 else 1
ref = event[2] if len(event) > 2 else None ref = event[2] if len(event) > 2 else None
dim_diff = event[3] if len(event) > 3 else None dim_diff = event[3] if len(event) > 3 else None
is_rest = freq <= 0
note_str = freq_to_lilypond(freq, spelling) note_str = freq_to_lilypond(freq, spelling)
dur_str = duration_to_lilypond(dur_beats) dur_str = duration_to_lilypond(dur_beats)
is_rest = freq <= 0 notes.append(
is_tied = (note_str == last_note) and not is_rest {
"freq": freq,
"is_rest": is_rest,
"note_str": note_str,
"dur_str": dur_str,
"dur_beats": dur_beats,
"ref": ref,
"dim_diff": dim_diff,
}
)
cents_dev = format_cents_deviation(freq) if not is_rest else None measures = []
dim_markup = format_dim_diff(dim_diff, ref) if not is_rest else "" current_measure_notes = []
beat_in_measure = 0
current_clef = initial_clef
note_str_full = note_str + dur_str for i, note_data in enumerate(notes):
freq = note_data["freq"]
is_rest = note_data["is_rest"]
midi = cps_to_midi(freq)
required_clef = get_clef_for_midi(midi)
clef_change = required_clef != current_clef
if clef_change:
current_clef = required_clef
has_prev = i > 0
prev_freq = notes[i - 1]["freq"] if has_prev else None
prev_is_rest = notes[i - 1]["is_rest"] if has_prev else True
is_tied_from_prev = (
has_prev and not is_rest and not prev_is_rest and freq == prev_freq
)
cents_dev = (
format_cents_deviation(freq)
if not is_rest and not is_tied_from_prev
else None
)
dim_markup = (
format_dim_diff(note_data["dim_diff"], note_data["ref"])
if not is_rest
else ""
)
note_str_full = note_data["note_str"] + note_data["dur_str"]
if cents_dev or dim_markup:
markup = "" markup = ""
if cents_dev or dim_markup:
if cents_dev: if cents_dev:
markup += f'^\\markup {{ \\pad-markup #0.2 "{cents_dev}" }}' markup += f'^\\markup {{ \\pad-markup #0.2 "{cents_dev}" }}'
if dim_markup: if dim_markup:
markup += dim_markup markup += dim_markup
note_str_full += markup note_str_full += markup
if is_tied: has_next = i < len(notes) - 1
note_str_full = " ~ " + note_str_full next_freq = notes[i + 1]["freq"] if has_next else None
next_is_rest = notes[i + 1]["is_rest"] if has_next else True
is_tied_to_next = (
has_next and not is_rest and not next_is_rest and freq == next_freq
)
if is_tied_to_next:
note_str_full += " ~"
else: else:
note_str_full = " " + note_str_full note_str_full = " " + note_str_full
current_measure_notes.append(note_str_full) if clef_change:
last_note = note_str note_str_full = f"\\clef {current_clef} {note_str_full}"
beats_this_event = int(round(dur_beats)) current_measure_notes.append(note_str_full)
beats_this_event = int(round(note_data["dur_beats"]))
beat_in_measure += beats_this_event beat_in_measure += beats_this_event
while beat_in_measure >= beats_per_measure: while beat_in_measure >= beats_per_measure:
@ -214,7 +298,7 @@ def generate_part(voice_data, voice_name, voice_idx, beats_per_measure=4):
music_str += '\n\\bar "|."' music_str += '\n\\bar "|."'
return music_str return prefix + music_str
def generate_parts(music_data, name, output_dir="lilypond"): def generate_parts(music_data, name, output_dir="lilypond"):
@ -228,13 +312,17 @@ def generate_parts(music_data, name, output_dir="lilypond"):
includes_dir = Path(output_dir) / name / "includes" includes_dir = Path(output_dir) / name / "includes"
includes_dir.mkdir(parents=True, exist_ok=True) includes_dir.mkdir(parents=True, exist_ok=True)
voice_names = ["I", "II", "III"] voice_order = [
(3, "I"),
(2, "II"),
(1, "III"),
(0, "IV"),
]
for voice_idx, voice_data in enumerate(music_data): for voice_idx, voice_name in voice_order:
if voice_idx >= 3: if voice_idx >= len(music_data):
break continue
voice_data = music_data[voice_idx]
voice_name = voice_names[voice_idx]
part_str = generate_part(voice_data, voice_name, voice_idx) part_str = generate_part(voice_data, voice_name, voice_idx)
part_file = includes_dir / f"part_{voice_name}.ly" part_file = includes_dir / f"part_{voice_name}.ly"
@ -244,7 +332,7 @@ def generate_parts(music_data, name, output_dir="lilypond"):
print(f"Generated: {part_file}") print(f"Generated: {part_file}")
def output_chords_to_music_data(chords, fundamental=100, chord_duration=1): def output_chords_to_music_data(chords, fundamental=55, chord_duration=4):
"""Convert output_chords.json format to generic music data. """Convert output_chords.json format to generic music data.
Args: Args:
@ -359,7 +447,7 @@ def generate_pdf(name, lilypond_dir="lilypond", output_dir="."):
def transcribe( def transcribe(
chords, chords,
name, name,
fundamental=100, fundamental=55,
template_path="lilypond/score_template.ly", template_path="lilypond/score_template.ly",
output_dir="lilypond", output_dir="lilypond",
generate_pdf_flag=True, generate_pdf_flag=True,
@ -412,7 +500,7 @@ def main():
) )
parser.add_argument("--name", required=True, help="Name for output files") parser.add_argument("--name", required=True, help="Name for output files")
parser.add_argument( parser.add_argument(
"--fundamental", type=float, default=100, help="Fundamental frequency in Hz" "--fundamental", type=float, default=55, help="Fundamental frequency in Hz"
) )
parser.add_argument( parser.add_argument(
"--template", "--template",