193 lines
6.5 KiB
Python
193 lines
6.5 KiB
Python
|
|
#!/usr/bin/env python
|
||
|
|
"""
|
||
|
|
OSC Sender - send chord frequencies via OSC for real-time playback.
|
||
|
|
"""
|
||
|
|
|
||
|
|
import json
|
||
|
|
import sys
|
||
|
|
from pathlib import Path
|
||
|
|
|
||
|
|
try:
|
||
|
|
from pythonosc import osc_message_builder
|
||
|
|
from pythonosc.udp_client import SimpleUDPClient
|
||
|
|
except ImportError:
|
||
|
|
print("Error: python-osc not installed. Run: pip install python-osc")
|
||
|
|
sys.exit(1)
|
||
|
|
|
||
|
|
|
||
|
|
class OSCSender:
|
||
|
|
"""Send chord frequencies via OSC."""
|
||
|
|
|
||
|
|
def __init__(self, ip="192.168.4.200", port=54001, fundamental=100):
|
||
|
|
self.ip = ip
|
||
|
|
self.port = port
|
||
|
|
self.fundamental = fundamental
|
||
|
|
self.client = SimpleUDPClient(ip, port)
|
||
|
|
self.chords = []
|
||
|
|
self.current_index = 0
|
||
|
|
|
||
|
|
def load_chords(self, chords_file):
|
||
|
|
"""Load chords from output_chords.json."""
|
||
|
|
with open(chords_file) as f:
|
||
|
|
self.chords = json.load(f)
|
||
|
|
print(f"Loaded {len(self.chords)} chords")
|
||
|
|
|
||
|
|
def send_chord(self, index):
|
||
|
|
"""Send frequencies for a specific chord index."""
|
||
|
|
if index < 0 or index >= len(self.chords):
|
||
|
|
return
|
||
|
|
|
||
|
|
chord = self.chords[index]
|
||
|
|
|
||
|
|
for voice_idx, pitch in enumerate(chord):
|
||
|
|
from fractions import Fraction
|
||
|
|
|
||
|
|
frac = Fraction(pitch["fraction"])
|
||
|
|
freq = self.fundamental * float(frac)
|
||
|
|
addr = f"/freq"
|
||
|
|
msg = osc_message_builder.OscMessageBuilder(addr=addr)
|
||
|
|
msg.add_arg(voice_idx + 1) # Voice number (1-indexed)
|
||
|
|
msg.add_arg(freq) # Frequency
|
||
|
|
self.client.send(msg.build())
|
||
|
|
|
||
|
|
def display_chord(self, index):
|
||
|
|
"""Display current chord info."""
|
||
|
|
if index < 0 or index >= len(self.chords):
|
||
|
|
return
|
||
|
|
|
||
|
|
chord = self.chords[index]
|
||
|
|
|
||
|
|
print(f"\r{'=' * 50}")
|
||
|
|
print(f"Chord {index + 1}/{len(self.chords)}")
|
||
|
|
print(f"{'=' * 50}")
|
||
|
|
|
||
|
|
print("Frequencies (Hz):")
|
||
|
|
from fractions import Fraction
|
||
|
|
|
||
|
|
for voice_idx, pitch in enumerate(chord):
|
||
|
|
frac = Fraction(pitch["fraction"])
|
||
|
|
freq = self.fundamental * float(frac)
|
||
|
|
print(f" Voice {voice_idx + 1}: {freq:.1f}")
|
||
|
|
|
||
|
|
print("\nHS Arrays:")
|
||
|
|
for voice_idx, pitch in enumerate(chord):
|
||
|
|
print(f" Voice {voice_idx + 1}: {pitch['hs_array']}")
|
||
|
|
|
||
|
|
print(f"\nFundamental: {self.fundamental} Hz")
|
||
|
|
print(f"Destination: {self.ip}:{self.port}")
|
||
|
|
print(f"{'=' * 50}")
|
||
|
|
print("← Previous | Next → | Ctrl+C to quit")
|
||
|
|
|
||
|
|
def play(self):
|
||
|
|
"""Interactive playback with keyboard control."""
|
||
|
|
if not self.chords:
|
||
|
|
print("No chords loaded!")
|
||
|
|
return
|
||
|
|
|
||
|
|
try:
|
||
|
|
import threading
|
||
|
|
import tty
|
||
|
|
import termios
|
||
|
|
import os
|
||
|
|
|
||
|
|
# Send and display first chord
|
||
|
|
self.send_chord(self.current_index)
|
||
|
|
self.display_chord(self.current_index)
|
||
|
|
|
||
|
|
def get_key():
|
||
|
|
"""Get a single keypress."""
|
||
|
|
fd = sys.stdin.fileno()
|
||
|
|
old_settings = termios.tcgetattr(fd)
|
||
|
|
try:
|
||
|
|
tty.setraw(sys.stdin.fileno())
|
||
|
|
ch = sys.stdin.read(1)
|
||
|
|
finally:
|
||
|
|
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
|
||
|
|
return ch
|
||
|
|
|
||
|
|
print("\nUse arrow keys to navigate. Press Ctrl+C to quit.")
|
||
|
|
|
||
|
|
while True:
|
||
|
|
key = get_key()
|
||
|
|
|
||
|
|
# Left arrow
|
||
|
|
if key == "\x1b": # Escape sequence start
|
||
|
|
next1 = get_key()
|
||
|
|
if next1 == "[":
|
||
|
|
next2 = get_key()
|
||
|
|
if next2 == "D": # Left arrow
|
||
|
|
if self.current_index > 0:
|
||
|
|
self.current_index -= 1
|
||
|
|
self.send_chord(self.current_index)
|
||
|
|
self.display_chord(self.current_index)
|
||
|
|
elif next2 == "C": # Right arrow
|
||
|
|
if self.current_index < len(self.chords) - 1:
|
||
|
|
self.current_index += 1
|
||
|
|
self.send_chord(self.current_index)
|
||
|
|
self.display_chord(self.current_index)
|
||
|
|
|
||
|
|
# Alternative: use < and > keys
|
||
|
|
elif key == "," or key == "<":
|
||
|
|
if self.current_index > 0:
|
||
|
|
self.current_index -= 1
|
||
|
|
self.send_chord(self.current_index)
|
||
|
|
self.display_chord(self.current_index)
|
||
|
|
elif key == "." or key == ">":
|
||
|
|
if self.current_index < len(self.chords) - 1:
|
||
|
|
self.current_index += 1
|
||
|
|
self.send_chord(self.current_index)
|
||
|
|
self.display_chord(self.current_index)
|
||
|
|
|
||
|
|
except KeyboardInterrupt:
|
||
|
|
print("\n\nPlayback stopped.")
|
||
|
|
|
||
|
|
def send_all(self):
|
||
|
|
"""Send all chords in sequence (for testing)."""
|
||
|
|
for i in range(len(self.chords)):
|
||
|
|
self.send_chord(i)
|
||
|
|
print(f"Sent chord {i + 1}/{len(self.chords)}")
|
||
|
|
print("All chords sent!")
|
||
|
|
|
||
|
|
|
||
|
|
def load_chords_from_output(output_dir="output"):
|
||
|
|
"""Load chords from the standard output directory."""
|
||
|
|
chords_file = Path(output_dir) / "output_chords.json"
|
||
|
|
if not chords_file.exists():
|
||
|
|
raise FileNotFoundError(f"Chords file not found: {chords_file}")
|
||
|
|
|
||
|
|
with open(chords_file) as f:
|
||
|
|
return json.load(f)
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
import argparse
|
||
|
|
|
||
|
|
parser = argparse.ArgumentParser(description="OSC Sender for Compact Sets")
|
||
|
|
parser.add_argument("--ip", default="192.168.4.200", help="Destination IP")
|
||
|
|
parser.add_argument("--port", type=int, default=54001, help="Destination port")
|
||
|
|
parser.add_argument(
|
||
|
|
"--fundamental", type=float, default=100, help="Fundamental frequency in Hz"
|
||
|
|
)
|
||
|
|
parser.add_argument("--output-dir", default="output", help="Output directory")
|
||
|
|
parser.add_argument(
|
||
|
|
"--send-all", action="store_true", help="Send all chords in sequence"
|
||
|
|
)
|
||
|
|
|
||
|
|
args = parser.parse_args()
|
||
|
|
|
||
|
|
# Load chords
|
||
|
|
chords_file = Path(args.output_dir) / "output_chords.json"
|
||
|
|
if not chords_file.exists():
|
||
|
|
print(f"Error: {chords_file} not found!")
|
||
|
|
print("Run compact_sets.py first to generate chords.")
|
||
|
|
sys.exit(1)
|
||
|
|
|
||
|
|
# Create sender
|
||
|
|
sender = OSCSender(ip=args.ip, port=args.port, fundamental=args.fundamental)
|
||
|
|
sender.load_chords(chords_file)
|
||
|
|
|
||
|
|
if args.send_all:
|
||
|
|
sender.send_all()
|
||
|
|
else:
|
||
|
|
sender.play()
|