programs, fixes

This commit is contained in:
aprilnightk 2025-08-10 16:16:16 +03:00
parent 680093eee9
commit 35e8a2c448
7 changed files with 345 additions and 227 deletions

43
core/actions.py Normal file
View file

@ -0,0 +1,43 @@
class TimedAction:
def __init__(self, start_t, duration_t, nodes, program):
self.program = program
self.program.actions.append(self)
if not isinstance(nodes, list):
nodes = [nodes]
self.nodes = nodes # list of nodes
self.start_t = start_t
self.duration_t = duration_t
self.end_t = start_t + duration_t
def tick(self, t):
if t >= self.start_t and t <= self.end_t:
for node in self.nodes:
if t == self.start_t:
self.start(node)
elif t == self.end_t:
self.end(node)
else:
self.progress(t - self.start_t, node)
# REIMPLEMENT
def start(self, node):
pass
def end(self, node):
pass
def progress(self, rel_t, node):
pass

View file

@ -1,6 +1,7 @@
import math import math
from ..soundnode import SoundNode from ..soundnode import SoundNode
from ..actions import TimedAction
class SineNode(SoundNode): class SineNode(SoundNode):
@ -8,21 +9,52 @@ class SineNode(SoundNode):
super().__init__("sine", room) super().__init__("sine", room)
self.freqs = freqs self.freqs = freqs
self.active = False
self.volume = 1
def calc_freqs_volumes(self, t): def calc_r_amps(self, t):
# This function returns volumes of each relevant freq
# at tick t
res = dict()
for freq in self.freqs:
res[freq] = 1 / float(len(self.freqs))
return res
def fill_amp_cache(self, t):
if not self.active:
self.r_amps[t] = dict()
return
tdct = dict() tdct = dict()
for freq, vol in self.calc_freqs_volumes(t).items():
tdct[freq] = vol * math.sin(self.room.sine_multiplier * freq * t)
self.amp_cache[t] = tdct for freq in self.freqs:
tdct[freq] = self.volume * math.sin(self.room.sine_multiplier * freq * t)
self.r_amps[t] = tdct
###
class PlayAction(TimedAction):
def __init__(self, start_t, duration_t, nodes, program):
super().__init__(start_t, duration_t, nodes, program)
def start(self, node):
node.active = True
def end(self, node):
node.active = False
class NoteAction(TimedAction):
def __init__(self, note, start_t, duration_t, nodes, program):
super().__init__(start_t, duration_t, nodes, program)
self.note = note
def start(self, node):
note_freq = self.program.note_to_freq(self.note)
node.freqs = [note_freq]
node.active = True
def end(self, node):
node.active = False

29
core/nodes/sink.py Normal file
View file

@ -0,0 +1,29 @@
from ..soundnode import *
class SinkNode(SoundNode):
def __init__(self, name, room):
super().__init__(name, room)
def calc_r_amps(self, t):
# This function returns volumes of each relevant freq
# at tick t
res = dict()
for source_node in self.wire_in:
for freq, vol in self.sample_r_amps_by_wire(source_node, t).items():
if not freq in res:
res[freq] = vol
else:
res[freq] += vol
for source_node in self.air_in:
for freq, vol in self.sample_r_amps_by_air(source_node, t).items():
if not freq in res:
res[freq] = vol
else:
res[freq] += vol
self.r_amps[t] = res

View file

@ -1,15 +1,133 @@
import os
import wave
from .room import Room from .room import Room
class Program: class Program:
def __init__(self): NOTES_OF_OCTAVE = {
0: ['C', 'B#'],
1: ['C#', 'Db'],
2: ['D'],
3: ['D#', 'Eb'],
4: ['E', 'Fb'],
5: ['F', 'E#'],
6: ['F#', 'Gb'],
7: ['G'],
8: ['G#', 'Ab'],
9: ['A'],
10: ['A#', 'Bb'],
11: ['B', 'Cb']
}
def __init__(self, name):
self.name = name
self.room = Room()
self.volume = 0.1
self.actions = []
def reset(self):
self.room = Room() self.room = Room()
def note_to_freq(self, note):
note_no = self.note_to_note_no(note)
return self.note_no_to_freq(note_no)
def note_to_note_no(self, note):
octave = int(note[-1])
note = note[:-1]
note_no_oct = 1
for note_no_oct, notelst in self.NOTES_OF_OCTAVE.items():
if note in notelst:
break
corr_note_no_oct = note_no_oct - 8
return corr_note_no_oct + (octave*12)
def note_no_to_freq(self, note_no):
return 2**((note_no-49) / 12.0) * 440
def st(self, t_sec):
# seconds to ticks
return self.room.sample_rate * t_sec
def generate_frames(self, start_t_sec, end_t_sec):
frames = []
start_t = int(start_t_sec * self.room.sample_rate)
end_t = int(end_t_sec * self.room.sample_rate)
for t in range(start_t, end_t):
self.tick(t)
left_frame, right_frame = self.room.generate_frame(t)
frames.append(left_frame)
frames.append(right_frame)
return frames
def write_frames_to_wavefile(self, frames, fn = 'export.wav'):
with wave.open(fn, 'wb') as wav:
wav.setnchannels(2)
wav.setsampwidth(self.room.sample_width)
wav.setframerate(self.room.sample_rate)
wav.writeframes(b''.join(frames))
def playback(self, start_t_sec, end_t_sec, fn = 'export.wav'):
cmd = f'play -q --volume {self.volume} {fn} trim {start_t_sec} {end_t_sec-start_t_sec}'
os.system(cmd)
def export(self, start_t_sec, end_t_sec, fn = "export.wav"):
frames = self.generate_frames(start_t_sec, end_t_sec)
self.write_frames_to_wavefile(frames, fn=fn)
def interface(self):
print(f'\nRunning program: {self.name}')
print(f'[g 10] to generate 10 seconds')
print(f'[vol 10] to set playback volume to 10%')
print(f'[p 1 5] to play seconds 1 to 5')
cmd = ''
while cmd != 'q':
cmd = input('\n[vol {self.volume*100}%]>> ')
if cmd.startswith('g '):
end_sec = float(cmd[2:])
self.export(0, end_sec)
if cmd.startswith('vol '):
vol = float(cmd[4:])
self.volume = vol / 100.0
if cmd.startswith('p'):
pp = cmd.split(' ')
start_sec = float(pp[1])
end_sec = float(pp[2])
self.playback(start_sec, end_sec)
def tick(self, t):
for action in self.actions:
action.tick(t)
# REIMPLEMENT THESE
def setup(self): def setup(self):
pass
def tick(t):
pass pass

View file

@ -2,39 +2,13 @@ import wave
import math import math
from .soundnode import * from .soundnode import *
from .nodes.sink import *
TAU = 2 * math.pi TAU = 2 * math.pi
def stb(i: int, bytelen: int) -> bytes: def stb(i: int, bytelen: int) -> bytes:
return i.to_bytes(bytelen, byteorder='little', signed=True) return i.to_bytes(bytelen, byteorder='little', signed=True)
class SinkNode(SoundNode):
def __init__(self, name, room):
super().__init__(name, room)
def fill_amp_cache(self, t):
# This function returns volumes of each relevant freq
# at tick t
res = dict()
for source_node in self.wire_in:
for freq, vol in self.sample_freqs_by_wire(source_node, t).items():
if not freq in res:
res[freq] = vol
else:
res[freq] += vol
for source_node in self.air_in:
for freq, vol in self.sample_freqs_by_air(source_node, t).items():
if not freq in res:
res[freq] = vol
else:
res[freq] += vol
self.amp_cache[t] = res
class Room: class Room:
@ -42,13 +16,15 @@ class Room:
self.nodes = [] self.nodes = []
self.dissipation_quotient = 0.9993
self.sample_rate = 44100 self.sample_rate = 44100
self.speed_of_sound = 343 / float(self.sample_rate) # m/tick self.speed_of_sound = 343 / float(self.sample_rate) # m/tick
self.set_bit_depth(24) self.set_bit_depth(24)
self.left_sink = SinkNode('LEFT', self) self.left_sink = SinkNode('left', self)
self.right_sink = SinkNode('RIGHT', self) self.left_sink.start_location = (-0.3, 0, 0)
self.right_sink = SinkNode('right', self)
self.right_sink.start_location = (0.3, 0, 0)
self.sine_multiplier = TAU / self.sample_rate self.sine_multiplier = TAU / self.sample_rate
@ -61,151 +37,57 @@ class Room:
self.min_amp = -int((2**bit_depth) / 2.0) self.min_amp = -int((2**bit_depth) / 2.0)
self.sample_width = int(self.bit_depth / 8.0) self.sample_width = int(self.bit_depth / 8.0)
def generate_frame(self, t):
def generate_frames(self, start_t_sec, end_t_sec): for node in self.nodes:
node.tick_done = False
frames = []
if t%1000 == 0:
start_t = start_t_sec * self.sample_rate print(t, end='\r', flush=True)
end_t = end_t_sec * self.sample_rate
for t in range(start_t, end_t): updates = True
if self.running_program:
self.running_program.tick(t)
while updates:
updates = False
for node in self.nodes: for node in self.nodes:
node.tick_done = False
if not (t in node.r_amps):
if t%1000 == 0: has_unprocessed_inputs = False
print(t)
updates = True
while updates:
updates = False
for node in self.nodes:
if not (t in node.amp_cache): for in_node in node.air_in:
if not in_node.tick_done:
has_unprocessed_inputs = False has_unprocessed_inputs = True
break
for in_node in node.air_in:
if not has_unprocessed_inputs:
for in_node in node.wire_in:
if not in_node.tick_done: if not in_node.tick_done:
has_unprocessed_inputs = True has_unprocessed_inputs = True
break break
if not has_unprocessed_inputs:
if not has_unprocessed_inputs: node.calc_r_amps(t)
for in_node in node.wire_in: node.tick_done = True
if not in_node.tick_done: updates = True
has_unprocessed_inputs = True
break
if not has_unprocessed_inputs:
node.fill_amp_cache(t)
node.tick_done = True
updates = True
left_total_amp = self.max_amp * sum([vol for freq, vol in self.left_sink.amp_cache[t].items()])
right_total_amp = self.max_amp * sum([vol for freq, vol in self.right_sink.amp_cache[t].items()])
if left_total_amp > self.max_amp:
left_total_amp = self.max_amp
if left_total_amp < self.min_amp:
left_total_amp = self.min_amp
if right_total_amp > self.max_amp:
right_total_amp = self.max_amp
if right_total_amp < self.min_amp:
right_total_amp = self.min_amp
frames.append(stb(int(left_total_amp), self.sample_width))
frames.append(stb(int(right_total_amp), self.sample_width))
return frames left_total_amp = self.max_amp * sum([vol for freq, vol in self.left_sink.r_amps[t].items()])
right_total_amp = self.max_amp * sum([vol for freq, vol in self.right_sink.r_amps[t].items()])
def write_frames_to_wavefile(self, fn, frames):
with wave.open(fn, 'wb') as wav: if left_total_amp > self.max_amp:
left_total_amp = self.max_amp
wav.setnchannels(2) if left_total_amp < self.min_amp:
left_total_amp = self.min_amp
wav.setsampwidth(self.sample_width)
wav.setframerate(self.sample_rate)
wav.writeframes(b''.join(frames))
def record(self, fn, start_t_sec, end_t_sec):
frames = self.generate_frames(start_t_sec, end_t_sec) if right_total_amp > self.max_amp:
self.write_frames_to_wavefile(fn, frames) right_total_amp = self.max_amp
if right_total_amp < self.min_amp:
right_total_amp = self.min_amp
return stb(int(left_total_amp), self.sample_width), stb(int(right_total_amp), self.sample_width)
"""
def record(self, fn, start_t_sec, end_t_sec):
with wave.open(fn, 'wb') as wav:
wav.setnchannels(2)
self.sample_width = int(self.bit_depth / 8.0)
wav.setsampwidth(self.sample_width)
wav.setframerate(self.sample_rate)
frames = []
start_t = start_t_sec * self.sample_rate
end_t = end_t_sec * self.sample_rate
for t in range(start_t, end_t):
#№if t%1000 == 0:
# print(t)
f = self.lowest_freq
left_total_amp = 0
right_total_amp = 0
left_amps = []
right_amps = []
while f < self.highest_freq:
for in_node in self.left_sink.air_in:
left_amps.append(in_node.amp_at_tick_by_air(f, t, self.left_sink))
for in_node in self.left_sink.wire_in:
left_amps.append(in_node.amp_at_tick(f, t))
for in_node in self.right_sink.air_in:
right_amps.append(in_node.amp_at_tick_by_air(f, t, self.right_sink))
for in_node in self.right_sink.wire_in:
right_amps.append(in_node.amp_at_tick(f, t))
f += self.freq_sample_step
left_total_amp = self.max_amp * sum(left_amps)
right_total_amp = self.max_amp * sum(right_amps)
if left_total_amp > self.max_amp:
left_total_amp = self.max_amp
if left_total_amp < self.min_amp:
left_total_amp = self.min_amp
if right_total_amp > self.max_amp:
right_total_amp = self.max_amp
if right_total_amp < self.min_amp:
right_total_amp = self.min_amp
frames.append(stb(int(left_total_amp), self.sample_width))
frames.append(stb(int(right_total_amp), self.sample_width))
wav.writeframes(b''.join(frames))
"""

View file

@ -9,22 +9,17 @@ class SoundNode:
self.room = room self.room = room
self.room.nodes.append(self) self.room.nodes.append(self)
self.amp_cache = dict() # {tick: {freq: amp}} self.r_amps = dict() # {tick: {freq: r_amp}}
self.air_in = [] self.air_in = []
self.wire_in = [] self.wire_in = []
self.start_location = (0, 0, 0) self.start_location = (0, 0, 0)
# Used by the tick cycle to correctly order the nodes.
# Do not touch this.
self.tick_done = False self.tick_done = False
def location(self, t):
# Location of the soundnote (x,y,z) in meters
# at time t.
return self.start_location
def distance_to_node(self, other_node, t): def distance_to_node(self, other_node, t):
loc = self.location(t) loc = self.location(t)
@ -32,49 +27,47 @@ class SoundNode:
return (loc[0]-other_loc[0])**2 + (loc[1]-other_loc[1])**2 + (loc[2]-other_loc[2])**2 return (loc[0]-other_loc[0])**2 + (loc[1]-other_loc[1])**2 + (loc[2]-other_loc[2])**2
def sample_r_amps_by_wire(self, source_node, current_t):
def fill_amp_cache(self, t): if current_t in source_node.r_amps:
return source_node.r_amps[current_t]
self.amp_cache[t] = dict()
def sample_freqs_by_wire(self, source_node, current_t):
if current_t in source_node.amp_cache:
return source_node.amp_cache[current_t]
return dict() return dict()
def sample_freqs_by_air(self, source_node, current_t): def sample_r_amps_by_air(self, source_node, current_t):
dist = self.distance_to_node(source_node, current_t) dist = self.distance_to_node(source_node, current_t)
sample_t = current_t - int(dist / self.room.speed_of_sound) sample_t = current_t - int(dist / self.room.speed_of_sound)
if sample_t in source_node.amp_cache: if sample_t in source_node.r_amps:
return {f: a / float(dist) for f, a in source_node.amp_cache[sample_t].items()} return {f: a / float(dist) for f, a in source_node.r_amps[sample_t].items()}
return dict() return dict()
def add_air_output(self, out_node): def air_to(self, out_node):
if not self in out_node.air_in: if not self in out_node.air_in:
out_node.air_in.append(self) out_node.air_in.append(self)
def add_wire_output(self, out_node): def wire_to(self, out_node):
if not self in out_node.wire_in: if not self in out_node.wire_in:
out_node.wire_in.append(self) out_node.wire_in.append(self)
""" ## THE FOLLOWING FUNCTIONS NEED TO BE DEFINED BY CONCRETE NODES
def location(self, t):
def amp_at_tick(self, f, t): # Location of the soundnote (x,y,z) in meters
# at time t.
return self.frequency_max_rel_amp(f, t) * math.sin(self.room.sine_multiplier * f * t) return self.start_location
def calc_r_amps(self, t):
def amp_at_tick_by_air(self, f, t, node): # Fill the amp_cache for tick t
# {freq: relative_amp}
# relative_amp must be between 0 and 1
dist = self.distance_to_node(node, t) self.r_amps[t] = dict()
t = t - int(dist / self.room.speed_of_sound)
return self.frequency_max_rel_amp(f, t) * math.sin(self.room.sine_multiplier * f * t)
"""

51
test.py
View file

@ -1,21 +1,42 @@
from core.room import Room
from core.prorgam import Program
from core.soundnode import SoundNode
from core.nodes.sinenode import SineNode
import math import math
R = Room() from core.room import Room
R.left_sink.start_location = (-1, 0, 0) from core.program import Program
R.right_sink.start_location = (2, 0, 0) from core.soundnode import SoundNode
from core.nodes.sinenode import *
sn = SineNode([440, 440*2, 440*3, 440*4, 220], R) class TestProgram(Program):
sn2 = SineNode([523.25, 523.25*2, 523.25*3, 523.25*4, 220], R)
def __init__(self):
super().__init__("testprogram")
sn.add_air_output(R.left_sink) def setup(self):
sn.add_air_output(R.right_sink)
sn2.add_air_output(R.left_sink) self.reset()
sn2.add_air_output(R.right_sink)
sn2.start_location = (1, 0, 0) sn = SineNode([], self.room)
sn.air_to(self.room.left_sink)
#sn.air_to(self.room.right_sink)
NoteAction('A4', self.st(0), self.st(0.5), [sn], self)
NoteAction('G4', self.st(1), self.st(0.5), [sn], self)
NoteAction('F4', self.st(2), self.st(0.5), [sn], self)
NoteAction('E4', self.st(3), self.st(0.5), [sn], self)
sn2 = SineNode([], self.room)
#sn2.air_to(self.room.left_sink)
sn2.air_to(self.room.right_sink)
NoteAction('A3', self.st(0), self.st(0.5), [sn2], self)
NoteAction('G3', self.st(1), self.st(0.5), [sn2], self)
NoteAction('F3', self.st(2), self.st(0.5), [sn2], self)
NoteAction('E3', self.st(3), self.st(0.5), [sn2], self)
R.record('test6.wav', 0, 2)
TP = TestProgram()
TP.setup()
TP.interface()