#!/usr/bin/env python """ OSC Sender - send chord frequencies via OSC for real-time playback. """ import json import sys from pathlib import Path try: from pythonosc import osc_message_builder from pythonosc.udp_client import SimpleUDPClient except ImportError: print("Error: python-osc not installed. Run: pip install python-osc") sys.exit(1) class OSCSender: """Send chord frequencies via OSC.""" def __init__(self, ip="192.168.4.200", port=54001, fundamental=100): self.ip = ip self.port = port self.fundamental = fundamental self.client = SimpleUDPClient(ip, port) self.chords = [] self.current_index = 0 self.preview_mode = True def load_chords(self, chords_file): """Load chords from output_chords.json.""" with open(chords_file) as f: self.chords = json.load(f) print(f"Loaded {len(self.chords)} chords") def send_chord(self, index): """Send frequencies for a specific chord index.""" if index < 0 or index >= len(self.chords): return chord = self.chords[index] for voice_idx, pitch in enumerate(chord): from fractions import Fraction frac = Fraction(pitch["fraction"]) freq = self.fundamental * float(frac) addr = "/freq" msg = osc_message_builder.OscMessageBuilder(address=addr) msg.add_arg(voice_idx + 1) # Voice number (1-indexed) msg.add_arg(freq) # Frequency self.client.send(msg.build()) print(f" [Sent chord {index + 1}/{len(self.chords)}]") def send_current(self): """Send the current chord.""" self.send_chord(self.current_index) def _get_chord_preview(self, index): """Get a one-line preview of a chord.""" if index < 0 or index >= len(self.chords): return None chord = self.chords[index] from fractions import Fraction freqs = [ f"{self.fundamental * float(Fraction(p['fraction'])):.2f}" for p in chord ] return f"Chord {index + 1}: {', '.join(freqs)} Hz" def _get_chord_full(self, index): """Get full details of a chord.""" if index < 0 or index >= len(self.chords): return [] return self.chords[index] def display_chords(self): """Display previous, current, and next chords.""" total = len(self.chords) prev_idx = self.current_index - 1 next_idx = self.current_index + 1 print("\n" + "=" * 60) print( f"PREVIOUS: Chord {prev_idx + 1}/{total}" if prev_idx >= 0 else "PREVIOUS: None" ) if prev_idx >= 0: print(f" {self._get_chord_preview(prev_idx)}") print("-" * 60) mode_str = "[PREVIEW]" if self.preview_mode else "[SEND]" print(f">>> CURRENT: Chord {self.current_index + 1}/{total} {mode_str} <<<") chord = self._get_chord_full(self.current_index) print("Frequencies (Hz):") from fractions import Fraction for voice_idx, pitch in enumerate(chord): frac = Fraction(pitch["fraction"]) freq = self.fundamental * float(frac) print(f" Voice {voice_idx + 1}: {freq:.2f}") print("\nHS Arrays:") for voice_idx, pitch in enumerate(chord): print(f" Voice {voice_idx + 1}: {pitch['hs_array']}") print("-" * 60) print( f"NEXT: Chord {next_idx + 1}/{total}" if next_idx < total else "NEXT: None" ) if next_idx < total: print(f" {self._get_chord_preview(next_idx)}") print("=" * 60) print( f"Fundamental: {self.fundamental} Hz | Destination: {self.ip}:{self.port}" ) print( "Controls: <-|-> navigate | p=preview | k/K=kill | Enter=send | [n]=jump | Esc=quit" ) def play(self): """Interactive playback with keyboard control.""" if not self.chords: print("No chords loaded!") return try: import tty import termios self.preview_mode = True self.current_index = 0 self.display_chords() def get_key(): """Get a keypress. Returns full escape sequences for arrow keys.""" fd = sys.stdin.fileno() old_settings = termios.tcgetattr(fd) try: tty.setraw(sys.stdin.fileno()) ch = sys.stdin.read(1) if ch == "\x1b": next1 = sys.stdin.read(1) if next1 == "[": next2 = sys.stdin.read(1) return "\x1b[" + next2 else: return ch finally: termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) return ch num_buffer = "" while True: key = get_key() if key == "\x1b": break elif key == "\x1b[D": if self.current_index > 0: self.current_index -= 1 self.display_chords() if not self.preview_mode: self.send_current() elif key == "\x1b[C": if self.current_index < len(self.chords) - 1: self.current_index += 1 self.display_chords() if not self.preview_mode: self.send_current() elif key == "p": self.preview_mode = not self.preview_mode self.display_chords() elif key == "k": num_voices = len(self.chords[self.current_index]) for voice_idx in range(num_voices): msg = osc_message_builder.OscMessageBuilder(address="/freq") msg.add_arg(voice_idx + 1) msg.add_arg(15.0) self.client.send(msg.build()) print(f" [Sent kill soft (15.0) to {num_voices} voices]") elif key == "K": num_voices = len(self.chords[self.current_index]) for voice_idx in range(num_voices): msg = osc_message_builder.OscMessageBuilder(address="/freq") msg.add_arg(voice_idx + 1) msg.add_arg(0.0) self.client.send(msg.build()) print(f" [Sent kill hard (0.0) to {num_voices} voices]") elif key.isdigit(): num_buffer += key print(f"\rJump to: {num_buffer}", end="", flush=True) elif key == "\r": if num_buffer: try: idx = int(num_buffer) - 1 if 0 <= idx < len(self.chords): self.current_index = idx self.display_chords() else: print(f"\nIndex must be 1-{len(self.chords)}") except ValueError: print(f"\nInvalid number: {num_buffer}") num_buffer = "" elif not self.preview_mode: self.send_current() elif key == "\x7f" or key == "\x08": if num_buffer: num_buffer = num_buffer[:-1] if num_buffer: print(f"\rJump to: {num_buffer}", end="", flush=True) else: print("\r ", end="", flush=True) print("\r", end="", flush=True) except KeyboardInterrupt: pass print("\n\nPlayback stopped.") def send_all(self): """Send all chords in sequence (for testing).""" for i in range(len(self.chords)): self.send_chord(i) print("All chords sent!") def load_chords_from_output(output_dir="output"): """Load chords from the standard output directory.""" chords_file = Path(output_dir) / "output_chords.json" if not chords_file.exists(): raise FileNotFoundError(f"Chords file not found: {chords_file}") with open(chords_file) as f: return json.load(f) if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="OSC Sender for Compact Sets") parser.add_argument("--ip", default="192.168.4.200", help="Destination IP") parser.add_argument("--port", type=int, default=54001, help="Destination port") parser.add_argument( "--fundamental", type=float, default=100, help="Fundamental frequency in Hz" ) parser.add_argument("--output-dir", default="output", help="Output directory") parser.add_argument( "--send-all", action="store_true", help="Send all chords in sequence" ) args = parser.parse_args() # Load chords chords_file = Path(args.output_dir) / "output_chords.json" if not chords_file.exists(): print(f"Error: {chords_file} not found!") print("Run compact_sets.py first to generate chords.") sys.exit(1) # Create sender sender = OSCSender(ip=args.ip, port=args.port, fundamental=args.fundamental) sender.load_chords(chords_file) if args.send_all: sender.send_all() else: sender.play()