202 lines
No EOL
4.6 KiB
Python
202 lines
No EOL
4.6 KiB
Python
import os
|
|
import wave
|
|
import math
|
|
|
|
from .room import Room
|
|
|
|
TAU = 2 * math.pi
|
|
|
|
def stb(i: int, bytelen: int) -> bytes:
|
|
return i.to_bytes(bytelen, byteorder='little', signed=True)
|
|
|
|
|
|
class Program:
|
|
|
|
NOTES_OF_OCTAVE = {
|
|
0: ['C', 'B#'],
|
|
1: ['C#', 'Db'],
|
|
2: ['D'],
|
|
3: ['D#', 'Eb'],
|
|
4: ['E', 'Fb'],
|
|
5: ['F', 'E#'],
|
|
6: ['F#', 'Gb'],
|
|
7: ['G'],
|
|
8: ['G#', 'Ab'],
|
|
9: ['A'],
|
|
10: ['A#', 'Bb'],
|
|
11: ['B', 'Cb']
|
|
}
|
|
|
|
def __init__(self, name):
|
|
|
|
self.name = name
|
|
self.room = Room()
|
|
self.volume = 0.1
|
|
self.actions = []
|
|
|
|
def reset(self):
|
|
|
|
self.room = Room()
|
|
|
|
def note_to_freq(self, note):
|
|
|
|
note_no = self.note_to_note_no(note)
|
|
return self.note_no_to_freq(note_no)
|
|
|
|
def note_to_note_no(self, note):
|
|
|
|
octave = int(note[-1])
|
|
note = note[:-1]
|
|
|
|
note_no_oct = 1
|
|
for note_no_oct, notelst in self.NOTES_OF_OCTAVE.items():
|
|
|
|
if note in notelst:
|
|
break
|
|
|
|
corr_note_no_oct = note_no_oct - 8
|
|
return corr_note_no_oct + (octave*12)
|
|
|
|
def note_no_to_freq(self, note_no):
|
|
|
|
return 2**((note_no-49) / 12.0) * 440
|
|
|
|
def st(self, t_sec):
|
|
# seconds to ticks
|
|
return self.room.sample_rate * t_sec
|
|
|
|
def generate_frames(self, start_t_sec, end_t_sec):
|
|
|
|
frames = []
|
|
|
|
start_t = int(start_t_sec * self.room.sample_rate)
|
|
end_t = int(end_t_sec * self.room.sample_rate)
|
|
|
|
order_of_passage = []
|
|
|
|
for t in range(start_t, end_t):
|
|
|
|
if t%1000 == 0:
|
|
print(f'{int(t / float(end_t - start_t) * 100)}%', end='\r')
|
|
|
|
self.tick(t)
|
|
|
|
for node in self.room.nodes:
|
|
node.tick_done = False
|
|
|
|
updates = True
|
|
|
|
if not order_of_passage:
|
|
|
|
while updates:
|
|
|
|
updates = False
|
|
|
|
for node in self.room.nodes:
|
|
|
|
if not (t in node.r_amps):
|
|
|
|
has_unprocessed_inputs = False
|
|
|
|
for in_node in node.air_in:
|
|
if not in_node.tick_done:
|
|
has_unprocessed_inputs = True
|
|
break
|
|
|
|
if not has_unprocessed_inputs:
|
|
for in_node in node.wire_in:
|
|
if not in_node.tick_done:
|
|
has_unprocessed_inputs = True
|
|
break
|
|
|
|
if not has_unprocessed_inputs:
|
|
|
|
node.calc_r_amps(t)
|
|
node.tick_done = True
|
|
updates = True
|
|
order_of_passage.append(node)
|
|
|
|
else:
|
|
for node in order_of_passage:
|
|
node.calc_r_amps(t)
|
|
|
|
left_total_amp = self.room.max_amp * sum([vol * math.sin(self.room.sine_multiplier * freq * t) for freq, vol in self.room.left_sink.r_amps[t].items()])
|
|
right_total_amp = self.room.max_amp * sum([vol * math.sin(self.room.sine_multiplier * freq * t) for freq, vol in self.room.right_sink.r_amps[t].items()])
|
|
|
|
if left_total_amp > self.room.max_amp:
|
|
left_total_amp = self.room.max_amp
|
|
|
|
if left_total_amp < self.room.min_amp:
|
|
left_total_amp = self.room.min_amp
|
|
|
|
if right_total_amp > self.room.max_amp:
|
|
right_total_amp = self.room.max_amp
|
|
|
|
if right_total_amp < self.room.min_amp:
|
|
right_total_amp = self.room.min_amp
|
|
|
|
|
|
left_frame, right_frame = stb(int(left_total_amp), self.room.sample_width), stb(int(right_total_amp), self.room.sample_width)
|
|
|
|
frames.append(left_frame)
|
|
frames.append(right_frame)
|
|
|
|
return frames
|
|
|
|
def write_frames_to_wavefile(self, frames, fn = 'export.wav'):
|
|
|
|
with wave.open(fn, 'wb') as wav:
|
|
|
|
wav.setnchannels(2)
|
|
|
|
wav.setsampwidth(self.room.sample_width)
|
|
wav.setframerate(self.room.sample_rate)
|
|
wav.writeframes(b''.join(frames))
|
|
|
|
def playback(self, start_t_sec, end_t_sec, fn = 'export.wav'):
|
|
|
|
cmd = f'play -q --volume {self.volume} {fn} trim {start_t_sec} {end_t_sec-start_t_sec}'
|
|
os.system(cmd)
|
|
|
|
def export(self, start_t_sec, end_t_sec, fn = "export.wav"):
|
|
|
|
frames = self.generate_frames(start_t_sec, end_t_sec)
|
|
self.write_frames_to_wavefile(frames, fn=fn)
|
|
|
|
def interface(self):
|
|
|
|
print(f'\nRunning program: {self.name}')
|
|
print(f'[g 10] to generate 10 seconds')
|
|
print(f'[vol 10] to set playback volume to 10%')
|
|
print(f'[p 1 5] to play seconds 1 to 5')
|
|
|
|
cmd = ''
|
|
while cmd != 'q':
|
|
cmd = input(f'\n[vol {self.volume*100}%]>> ')
|
|
|
|
if cmd.startswith('g '):
|
|
import cProfile
|
|
end_sec = float(cmd[2:])
|
|
cProfile.runctx("self.export(0, end_sec)", globals(), locals())
|
|
#self.export(0, end_sec)
|
|
|
|
if cmd.startswith('vol '):
|
|
vol = float(cmd[4:])
|
|
self.volume = vol / 100.0
|
|
|
|
if cmd.startswith('p'):
|
|
pp = cmd.split(' ')
|
|
start_sec = float(pp[1])
|
|
end_sec = float(pp[2])
|
|
self.playback(start_sec, end_sec)
|
|
|
|
def tick(self, t):
|
|
|
|
for action in self.actions:
|
|
action.tick(t)
|
|
|
|
# REIMPLEMENT THESE
|
|
|
|
def setup(self):
|
|
|
|
pass |