#!/usr/bin/env python """ I/O functions and CLI main entry point. """ import json from fractions import Fraction from pathlib import Path from random import seed def write_chord_sequence(seq: list["Chord"], path: str) -> None: """Write a chord sequence to a JSON file.""" lines = ["["] for chord_idx, chord in enumerate(seq): lines.append(" [") for pitch_idx, pitch in enumerate(chord._pitches): pitch_obj = { "hs_array": list(pitch.hs_array), "fraction": str(pitch.to_fraction()), "cents": pitch.to_cents(), } pitch_json = json.dumps(pitch_obj) comma = "," if pitch_idx < len(chord._pitches) - 1 else "" lines.append(f" {pitch_json}{comma}") chord_bracket = " ]" if chord_idx == len(seq) - 1 else " ]," lines.append(chord_bracket) lines.append("]") with open(path, "w") as f: f.write("\n".join(lines)) def write_chord_sequence_readable(seq: list["Chord"], path: str) -> None: """Write chord sequence as tuple of hs_arrays - one line per chord.""" with open(path, "w") as f: f.write("(\n") for i, chord in enumerate(seq): arrays = tuple(p.hs_array for p in chord._pitches) f.write(f" {arrays},\n") f.write(")\n") def write_chord_sequence_frequencies( seq: list["Chord"], path: str, fundamental: float = 100.0 ) -> None: """Write chord sequence as frequencies in Hz - one line per chord.""" with open(path, "w") as f: f.write("(\n") for chord in seq: freqs = tuple(fundamental * float(p.to_fraction()) for p in chord._pitches) f.write(f" {freqs},\n") f.write(")\n") def _serialize_edge_data(edge_data: dict) -> dict: """Serialize edge data for JSON, converting Pitch objects to lists.""" if not edge_data: return {} result = {} for key, value in edge_data.items(): if hasattr(value, "hs_array"): result[key] = list(value.hs_array) elif isinstance(value, list) and value and hasattr(value[0], "hs_array"): result[key] = [list(p.hs_array) for p in value] else: result[key] = value return result def _format_chord_line(pitches: list) -> str: """Format a chord (list of pitch hs_arrays) as a single line.""" return json.dumps([list(p) if hasattr(p, "hs_array") else p for p in pitches]) def _format_edge_data_compact(edge_data: dict) -> dict: """Format edge data for compact JSON output.""" result = {} for key, value in edge_data.items(): if hasattr(value, "hs_array"): result[key] = list(value.hs_array) elif isinstance(value, list) and value and hasattr(value[0], "hs_array"): result[key] = [list(p.hs_array) for p in value] else: result[key] = value return result def write_path_steps(path: "Path", output_path: str) -> None: """Write path with all step data to JSON.""" lines = ["["] for step_idx, step in enumerate(path.steps): lines.append(" {") lines.append(f' "step": {step_idx},') lines.append( f' "source_node": {_format_chord_line(step.source_node.pitches)},' ) lines.append( f' "source_chord": {_format_chord_line(step.source_chord.pitches)},' ) lines.append( f' "destination_node": {_format_chord_line(step.destination_node.pitches)},' ) lines.append( f' "destination_chord": {_format_chord_line(step.destination_chord.pitches)},' ) if step.transposition: trans = list(step.transposition.hs_array) lines.append(f' "transposition": {json.dumps(trans)},') else: lines.append(' "transposition": null,') lines.append( f' "movements": {json.dumps({str(k): v for k, v in step.movements.items()})},' ) if step.scores: lines.append(f' "scores": {json.dumps(step.scores)},') if step.normalized_scores: lines.append( f' "normalized_scores": {json.dumps(step.normalized_scores)},' ) if step.weight is not None: lines.append(f' "weight": {step.weight},') edge_data = _format_edge_data_compact(step.edge_data) lines.append(f' "edge_data": {json.dumps(edge_data)}') lines.append(" }" + ("," if step_idx < len(path.steps) - 1 else "")) lines.append("]") with open(output_path, "w") as f: f.write("\n".join(lines)) def graph_to_dict(graph: "nx.MultiDiGraph") -> dict: """Serialize graph to a dict for JSON.""" from .pitch import Pitch from .chord import Chord nodes = [] node_to_idx = {} for idx, chord in enumerate(graph.nodes()): nodes.append( { "pitches": [list(p.hs_array) for p in chord.pitches], "dims": list(chord.dims), } ) node_to_idx[id(chord)] = idx edges = [] for u, v, data in graph.edges(data=True): edges.append( { "src_idx": node_to_idx[id(u)], "dst_idx": node_to_idx[id(v)], "transposition": list( data.get( "transposition", Pitch(tuple([0] * len(u.dims)), u.dims) ).hs_array ), "weight": data.get("weight", 1.0), "movements": {str(k): v for k, v in data.get("movements", {}).items()}, "cent_diffs": data.get("cent_diffs", []), "voice_crossing": data.get("voice_crossing", False), "is_directly_tunable": data.get("is_directly_tunable", False), } ) return { "nodes": nodes, "edges": edges, } def graph_from_dict(data: dict) -> "nx.MultiDiGraph": """Deserialize graph from dict.""" import networkx as nx from .pitch import Pitch from .chord import Chord nodes = [] for node_data in data["nodes"]: pitches = tuple( Pitch(tuple(arr), tuple(node_data["dims"])) for arr in node_data["pitches"] ) nodes.append(Chord(pitches, tuple(node_data["dims"]))) graph = nx.MultiDiGraph() for node in nodes: graph.add_node(node) for edge_data in data["edges"]: u = nodes[edge_data["src_idx"]] v = nodes[edge_data["dst_idx"]] trans = Pitch(tuple(edge_data["transposition"]), u.dims) movements = {int(k): v for k, v in edge_data["movements"].items()} graph.add_edge( u, v, transposition=trans, weight=edge_data.get("weight", 1.0), movements=movements, cent_diffs=edge_data.get("cent_diffs", []), voice_crossing=edge_data.get("voice_crossing", False), is_directly_tunable=edge_data.get("is_directly_tunable", False), ) return graph def save_graph_pickle(graph: "nx.MultiDiGraph", path: str) -> None: """Save graph to pickle file.""" import pickle with open(path, "wb") as f: pickle.dump(graph, f) def load_graph_pickle(path: str) -> "nx.MultiDiGraph": """Load graph from pickle file.""" import pickle with open(path, "rb") as f: return pickle.load(f) def save_graph_json(graph: "nx.MultiDiGraph", path: str) -> None: """Save graph to JSON file.""" data = graph_to_dict(graph) with open(path, "w") as f: json.dump(data, f, indent=2) def load_graph_json(path: str) -> "nx.MultiDiGraph": """Load graph from JSON file.""" import json with open(path, "r") as f: data = json.load(f) return graph_from_dict(data) def get_cache_key( dims: int, chord_size: int, symdiff_min: int, symdiff_max: int ) -> str: """Generate cache key from parameters.""" return f"d{dims}_n{size}_s{min}-{max}".replace("{size}", str(chord_size)) def load_graph_from_cache( cache_dir: str, dims: int, chord_size: int, symdiff_min: int, symdiff_max: int, ) -> tuple["nx.MultiDiGraph | None", bool]: """ Try to load graph from cache. Returns: (graph, was_cached): graph if found, False if not found """ cache_key = f"d{dims}_n{chord_size}_s{symdiff_min}-{symdiff_max}" pkl_path = Path(cache_dir) / f"{cache_key}.pkl" json_path = Path(cache_dir) / f"{cache_key}.json" # Try pickle first (faster) if pkl_path.exists(): try: graph = load_graph_pickle(str(pkl_path)) return graph, True except Exception as e: print(f"Warning: Failed to load pickle cache: {e}") # Try JSON if json_path.exists(): try: graph = load_graph_json(str(json_path)) return graph, True except Exception as e: print(f"Warning: Failed to load JSON cache: {e}") return None, False def save_graph_to_cache( graph: "nx.MultiDiGraph", cache_dir: str, dims: int, chord_size: int, symdiff_min: int, symdiff_max: int, ) -> None: """Save graph to cache in both pickle and JSON formats.""" import os cache_key = f"d{dims}_n{chord_size}_s{symdiff_min}-{symdiff_max}" pkl_path = Path(cache_dir) / f"{cache_key}.pkl" json_path = Path(cache_dir) / f"{cache_key}.json" os.makedirs(cache_dir, exist_ok=True) # Save both formats try: save_graph_pickle(graph, str(pkl_path)) print(f"Cached to {pkl_path}") except Exception as e: print(f"Warning: Failed to save pickle: {e}") try: save_graph_json(graph, str(json_path)) print(f"Cached to {json_path}") except Exception as e: print(f"Warning: Failed to save JSON: {e}") def main(): """Demo: Generate compact sets and build graph.""" import argparse from .dims import DIMS_4, DIMS_5, DIMS_7, DIMS_8 from .harmonic_space import HarmonicSpace from .pathfinder import PathFinder parser = argparse.ArgumentParser( description="Generate chord paths in harmonic space" ) parser.add_argument( "--symdiff-min", type=int, default=2, help="Minimum symmetric difference between chords", ) parser.add_argument( "--symdiff-max", type=int, default=2, help="Maximum symmetric difference between chords", ) parser.add_argument( "--melodic-min", type=int, default=0, help="Minimum cents for any pitch movement (0 = no minimum)", ) parser.add_argument( "--melodic-max", type=int, default=500, help="Maximum cents for any pitch movement (0 = no maximum)", ) parser.add_argument( "--target-register", type=float, default=0, help="Target register in octaves (default: disabled, 2 = two octaves)", ) parser.add_argument( "--allow-voice-crossing", action="store_true", help="Allow edges where voices cross (default: reject)", ) parser.add_argument( "--disable-direct-tuning", action="store_true", help="Disable direct tuning requirement (default: require)", ) parser.add_argument( "--weight-melodic", type=float, default=1, help="Weight for melodic threshold factor (0=disabled, default: 1)", ) parser.add_argument( "--weight-contrary-motion", type=float, default=0, help="Weight for contrary motion factor (0=disabled, default: 0)", ) parser.add_argument( "--weight-dca-hamiltonian", type=float, default=1, help="Weight for DCA Hamiltonian factor - favors long-unvisited nodes (0=disabled, default: 1)", ) parser.add_argument( "--weight-dca-voice-movement", type=float, default=1, help="Weight for DCA voice movement factor - favors voices that stay long to change (0=disabled, default: 1)", ) parser.add_argument( "--weight-target-register", type=float, default=1, help="Weight for target register factor (0=disabled, default: 1)", ) parser.add_argument( "--dims", type=int, default=7, help="Number of prime dimensions (4, 5, 7, or 8)" ) parser.add_argument("--chord-size", type=int, default=3, help="Size of chords") parser.add_argument("--max-path", type=int, default=50, help="Maximum path length") parser.add_argument( "--seed", type=int, default=None, help="Random seed (default: random)" ) parser.add_argument( "--cache-dir", type=str, default="./cache", help="Cache directory for graphs", ) parser.add_argument( "--rebuild-cache", action="store_true", help="Force rebuild graph (ignore cache)", ) parser.add_argument( "--no-cache", action="store_true", help="Disable caching", ) parser.add_argument( "--output-dir", type=str, default="output", help="Output directory for generated files", ) parser.add_argument( "--stats", action="store_true", help="Show analysis statistics after generation", ) parser.add_argument( "--output-path-steps", action="store_true", help="Export path steps with full edge data to path_steps.json", ) parser.add_argument( "--osc-play", nargs="?", const="output/output_chords.json", default=None, help="Enable OSC playback (optionally specify chord file, default: output/output_chords.json)", ) parser.add_argument( "--transcribe", nargs="*", metavar=("INPUT", "OUTPUT"), default=None, help="Generate LilyPond transcription (optionally: input_file output_name)", ) parser.add_argument( "--osc-ip", type=str, default="192.168.4.200", help="OSC destination IP (default: 192.168.4.200)", ) parser.add_argument( "--osc-port", type=int, default=54001, help="OSC destination port (default: 54001)", ) parser.add_argument( "--fundamental", type=float, default=100, help="Fundamental frequency in Hz for OSC output (default: 100)", ) args = parser.parse_args() # Handle transcription mode (separate from generation) if args.transcribe is not None: import json from .transcriber import transcribe input_file = ( args.transcribe[0] if len(args.transcribe) > 0 else "output/output_chords.json" ) output_name = ( args.transcribe[1] if len(args.transcribe) > 1 else "compact_sets_transcription" ) with open(input_file) as f: chords = json.load(f) print(f"Transcribing {len(chords)} chords from {input_file}") transcribe( chords, output_name, fundamental=args.fundamental, ) return # Exit after transcribing # Handle OSC playback mode (separate from generation) if args.osc_play: from .osc_sender import OSCSender chords_file = args.osc_play sender = OSCSender( ip=args.osc_ip, port=args.osc_port, fundamental=args.fundamental ) sender.load_chords(chords_file) print(f"OSC playback from: {chords_file}") print(f"Destination: {args.osc_ip}:{args.osc_port}") print(f"Fundamental: {args.fundamental} Hz") sender.play() return # Exit after OSC playback # Select dims if args.dims == 4: dims = DIMS_4 elif args.dims == 5: dims = DIMS_5 elif args.dims == 7: dims = DIMS_7 elif args.dims == 8: dims = DIMS_8 else: dims = DIMS_7 space = HarmonicSpace(dims, collapsed=True) print(f"Space: {space}") print(f"Symdiff: {args.symdiff_min} to {args.symdiff_max}") # Try to load from cache graph = None was_cached = False if not args.no_cache and not args.rebuild_cache: graph, was_cached = load_graph_from_cache( args.cache_dir, args.dims, args.chord_size, args.symdiff_min, args.symdiff_max, ) if was_cached: print(f"Loaded graph from cache") print( f"Graph: {graph.number_of_nodes()} nodes, {graph.number_of_edges()} edges" ) # Build graph if not loaded from cache if graph is None: print("Generating connected sets...") chords = space.generate_connected_sets( min_size=args.chord_size, max_size=args.chord_size ) print(f"Found {len(chords)} unique chords") print("Building voice leading graph...") graph = space.build_voice_leading_graph( chords, symdiff_min=args.symdiff_min, symdiff_max=args.symdiff_max, ) print( f"Graph: {graph.number_of_nodes()} nodes, {graph.number_of_edges()} edges" ) # Save to cache if not args.no_cache: save_graph_to_cache( graph, args.cache_dir, args.dims, args.chord_size, args.symdiff_min, args.symdiff_max, ) # Find stochastic path print("Finding stochastic path...") path_finder = PathFinder(graph) if args.seed is not None: seed(args.seed) weights_config = path_finder._default_weights_config() weights_config["melodic_threshold_min"] = args.melodic_min weights_config["melodic_threshold_max"] = args.melodic_max weights_config["voice_crossing_allowed"] = args.allow_voice_crossing weights_config["direct_tuning"] = not args.disable_direct_tuning # Soft factor weights weights_config["weight_melodic"] = args.weight_melodic weights_config["weight_contrary_motion"] = args.weight_contrary_motion weights_config["weight_dca_hamiltonian"] = args.weight_dca_hamiltonian weights_config["weight_dca_voice_movement"] = args.weight_dca_voice_movement # Target register if args.target_register > 0: weights_config["target_register"] = True weights_config["target_register_octaves"] = args.target_register weights_config["weight_target_register"] = args.weight_target_register else: weights_config["weight_target_register"] = 0 # disabled weights_config["max_path"] = args.max_path path_obj = path_finder.find_stochastic_path( max_length=args.max_path - 1, weights_config=weights_config ) print(f"Path length: {len(path_obj)}") # Create output directory and write files import os os.makedirs(args.output_dir, exist_ok=True) # Save graph_path for Hamiltonian analysis import json graph_path_data = [hash(node) for node in path_obj.graph_chords] graph_path_file = os.path.join(args.output_dir, "graph_path.json") with open(graph_path_file, "w") as f: json.dump(graph_path_data, f) print(f"Written to {graph_path_file}") write_chord_sequence( path_obj.output_chords, os.path.join(args.output_dir, "output_chords.json") ) print(f"Written to {args.output_dir}/output_chords.json") write_chord_sequence_readable( path_obj.output_chords, os.path.join(args.output_dir, "output_chords.txt") ) print(f"Written to {args.output_dir}/output_chords.txt") write_chord_sequence_frequencies( path_obj.output_chords, os.path.join(args.output_dir, "output_frequencies.txt") ) print(f"Written to {args.output_dir}/output_frequencies.txt") if args.output_path_steps: write_path_steps(path_obj, os.path.join(args.output_dir, "path_steps.json")) print(f"Written to {args.output_dir}/path_steps.json") # Show stats if requested if args.stats: from .analyze import analyze_chords, format_analysis config = { "melodic_threshold_max": args.melodic_max, "target_register_octaves": args.target_register, "max_path": args.max_path, "graph_nodes": graph.number_of_nodes() if graph else None, } # Load the chords from the output file import json chords_file = os.path.join(args.output_dir, "output_chords.json") with open(chords_file) as f: chords = json.load(f) # Load graph_path for Hamiltonian analysis graph_path_file = os.path.join(args.output_dir, "graph_path.json") graph_path = None if os.path.exists(graph_path_file): with open(graph_path_file) as f: graph_path = json.load(f) metrics = analyze_chords(chords, config, graph_path) print() print(format_analysis(metrics)) # OSC playback if enabled if args.osc_play: from .osc_sender import OSCSender chords_file = os.path.join(args.output_dir, "output_chords.json") sender = OSCSender( ip=args.osc_ip, port=args.osc_port, fundamental=args.fundamental ) sender.load_chords(chords_file) print(f"\nOSC enabled - sending to {args.osc_ip}:{args.osc_port}") print(f"Fundamental: {args.fundamental} Hz") sender.play() if __name__ == "__main__": main()