This commit is contained in:
wrapper 2026-02-22 00:52:27 +07:00
commit a0e2494108
2 changed files with 585 additions and 0 deletions

338
mcdf_notmido.py Normal file
View file

@ -0,0 +1,338 @@
from io import BytesIO
from dataclasses import dataclass
from typing_extensions import override
from construct_typed import DataclassMixin, DataclassStruct, csfield
from typing import IO, Any, Self
from construct import *
from tap import Tap
class _EnhancedDataclassMixin(DataclassMixin):
@classmethod
def format(cls):
return DataclassStruct(cls)
@classmethod
def build(cls, obj: Self, **kw: Any): # pyright: ignore[reportExplicitAny, reportAny]
return cls.format().build(obj, **kw)
@classmethod
def parse(cls, data: bytes|bytearray, **kw: Any):# pyright: ignore[reportExplicitAny, reportAny]
return cls.format().parse(data, **kw)
@classmethod
def parse_file(cls, file: str, **kw: Any):# pyright: ignore[reportExplicitAny, reportAny]
return cls.format().parse_file(file, **kw)
@classmethod
def parse_stream(cls, stream: IO[bytes], **kw: Any):# pyright: ignore[reportExplicitAny, reportAny]
return cls.format().parse_stream(stream, **kw)
@dataclass
class CBData(_EnhancedDataclassMixin):
@dataclass
class CMD(_EnhancedDataclassMixin):
cmd: int = csfield(Hex(Int8ub))
data: bytes = csfield(Prefixed(Int8ub, GreedyBytes))
magic: int = csfield(Const(0x45, Int8ub))
cmds: list[CMD] = csfield(PrefixedArray(Int8ub, CMD.format()))
# 0x30 - Sample ID, Flags
@dataclass
class Audio(_EnhancedDataclassMixin):
audio_id: int = csfield(Hex(Int8ub))
type: int = csfield(Hex(Int8ub))
codec: int = csfield(Hex(Int8ub))
channels: int = csfield(Hex(Int8ub))
bits_per_samples: int = csfield(Hex(Int8ub))
sample_rate: int = csfield(Int16ub)
_pad: None = csfield(Padding(2))
sample_offset: int = csfield(Int32ub)
sample_size: int = csfield(Int32ub)
@dataclass
class MCDF(_EnhancedDataclassMixin):
@dataclass
class CMD(_EnhancedDataclassMixin):
cmd: int = csfield(Hex(Int16ub))
_end: None = csfield(StopIf(this.cmd == 0xffff))
_offset: int = csfield(Hex(Int32ub))
_size: int = csfield(Int32ub)
data: bytes = csfield(Pointer(this._offset, Bytes(this._size)))
magic: bytes = csfield(Const(b"MCDF"))
cmds: list[CMD] = csfield(GreedyRange(CMD.format()))
@dataclass
class MCDFSequence(_EnhancedDataclassMixin):
@dataclass
class Track(_EnhancedDataclassMixin):
_magic: bytes = csfield(Const(b"CTrk"))
data: bytes = csfield(Prefixed(Int32ub, GreedyBytes))
_magic: bytes = csfield(Const(b"CThd"))
_header_size: int = csfield(Const(6, Int32ub))
_type: int = csfield(Const(0, Int16ub))
_no_tracks: int = csfield(Const(1, Int16ub))
pqn: int = csfield(Int16ub)
track: Track = csfield(Track.format())
def convert(self):
def vlq_parse(data: bytes):
temp = 0
i = 0
for d in data:
temp |= d & 0x7f
i += 1
if not (d & 0x80):
break
temp <<= 7
return temp, i
def ctrk2mtrk():
out_data = bytearray()
last = -1
offset = 0
while offset < len(self.track.data):
dur_bytes: list[int] = []
while True:
dur_bytes.append(self.track.data[offset])
check = self.track.data[offset] & 0x80
offset += 1
if not check:
break
cmd = self.track.data[offset]
if cmd >= 0x80:
offset += 1
if cmd < 0xf0:
last = cmd
else:
cmd = last
match cmd & 0xf0:
case 0x80: # Note off is 1-byte
out_data += bytes(dur_bytes) + bytes([cmd]) + self.track.data[offset:offset + 1] + b"\0"
offset += 1
case 0x90 | 0xa0 | 0xb0: # Play PCM sample: channel must be 10 (drum) and keynote must be 127
out_data += bytes(dur_bytes) + bytes([cmd]) + self.track.data[offset:offset + 2]
offset += 2
case 0xc0 | 0xd0:
out_data += bytes(dur_bytes) + bytes([cmd]) + self.track.data[offset:offset + 1]
offset += 1
case 0xe0: # Pitch bend is 1-byte (MSB only)
out_data += bytes(dur_bytes) + bytes([cmd]) + b"\0" + self.track.data[offset:offset + 1]
offset += 1
case 0xf0:
match cmd:
case 0xff:
vlq_val, vlq_len = vlq_parse(self.track.data[offset + 1:offset + 3])
out_data += bytes(dur_bytes) + bytes([cmd]) + self.track.data[offset:offset + 1 + vlq_len + vlq_val]
offset += 1 + vlq_len + vlq_val
case 0xf0:
vlq_val, vlq_len = vlq_parse(self.track.data[offset:offset + 3])
out_data += bytes(dur_bytes) + bytes([cmd]) + self.track.data[offset:offset + vlq_len + vlq_val]
offset += vlq_len + vlq_val
case _:
raise Exception(cmd)
case _:
raise Exception(cmd)
return bytes(out_data)
return MidiSequence.build(MidiSequence(pqn=self.pqn, track=MidiSequence.Track(data=ctrk2mtrk())))
@dataclass
class MidiSequence(_EnhancedDataclassMixin):
@dataclass
class Track(_EnhancedDataclassMixin):
_magic: bytes = csfield(Const(b"MTrk"))
data: bytes = csfield(Prefixed(Int32ub, GreedyBytes))
_magic: bytes = csfield(Const(b"MThd"))
_header_size: int = csfield(Const(6, Int32ub))
_type: int = csfield(Const(0, Int16ub))
_no_tracks: int = csfield(Const(1, Int16ub))
pqn: int = csfield(Int16ub)
track: Track = csfield(Track.format())
class Args(Tap):
in_file: str # Input file
sample_rate: int = 48000 # Synth sample rate
midi_soundfont: str = "./ml2870.sf2" # Soundfont used in MIDI portion of DXM
loop: bool = False # Loop endlessly
@override
def configure(self) -> None:
self.add_argument("in_file")
if __name__ == "__main__":
import tinysoundfont
from tinysoundfont.midi import load_memory, NoteOn
from numpy.typing import NDArray
from numpy import float32
import array
from oki import OkiADP
import soxr
import time
import threading
class MCDF_MidiSynth(tinysoundfont.Synth):
def __init__(self, gain: float = 0, samplerate: int = 44100):
super().__init__(gain, samplerate)
self.__pcm_blocks = bytearray()
self.__pcm_block_align = 1024
self.__pcm_lock = threading.RLock()
def put_sample(self, data: bytes):
assert (len(data) % 4) == 0
with self.__pcm_lock:
self.__pcm_blocks += data
def replace_sample(self, data: bytes):
with self.__pcm_lock:
self.__pcm_blocks = data
@override
def start(self, buffer_size: int = 1024, **kwargs):
# Import pyaudio here so if this function is not used there is no dependency
import pyaudio
import audioop
self.__pcm_block_align = buffer_size
def callback(in_data: None, frame_count: int, time_info: None, status: None):
with self.__pcm_lock:
buf_float = self.generate(samples=frame_count)
sf2_sound_buffer = b"".join([max(-32768, min(int(x * 32767), 32767)).to_bytes(2, "little", signed=True) for x in array.array("f", bytes(buf_float))])
pcm_buf_size = buffer_size * 2 * 2
pcm_cur_buf_size = len(self.__pcm_blocks)
pcm_data = self.__pcm_blocks[:pcm_buf_size] + (b"\0" * (max(0, pcm_buf_size - pcm_cur_buf_size)))
self.__pcm_blocks = self.__pcm_blocks[pcm_buf_size:]
mix = audioop.add(audioop.mul(sf2_sound_buffer, 2, 0.5), audioop.mul(pcm_data, 2, 0.5), 2)
# PyAudio needs actual bytes, not just memoryview
return (bytes(mix), pyaudio.paContinue)
self.p = pyaudio.PyAudio()
self.stream = self.p.open(
format=pyaudio.paInt16,
channels=2,
rate=self.samplerate,
output=True,
stream_callback=callback,
frames_per_buffer=buffer_size,
**kwargs
)
class MCDF_Sequencer(tinysoundfont.Sequencer):
def __init__(self, synth: MCDF_MidiSynth, cb: IO[bytes] | None, samples: dict[int, bytes]):
super().__init__(synth)
self.__io_callback = cb
self.__samples = samples
def load_memory(self, data: bytes, **kwargs):
events = load_memory(data, **kwargs)
self.add(events)
@override
def send(self, event: tinysoundfont.sequencer.Event):
synth: MCDF_MidiSynth = self.synth # pyright: ignore[reportAssignmentType]
channel: int = event.channel
match event.action: # pyright: ignore[reportMatchNotExhaustive]
case NoteOn(key, _):
if channel == tinysoundfont.sequencer.DRUM_CHANNEL and key == 127 and self.__io_callback is not None:
cb = CBData.parse_stream(self.__io_callback)
for cdata in cb.cmds:
if cdata.cmd == 0x41: # Play audio
if cdata.data[0] == 0x30: # Play
sample = cdata.data[1]
synth.replace_sample(self.__samples[sample])
# print("Play audio:", cdata.data[1], cdata.data[2])
elif cdata.data[0] == 0x32: # Stop
synth.replace_sample(b"")
return
super().send(event)
args = Args().parse_args()
in_mcdf = open(args.in_file, "rb")
dxm = MCDF.parse_stream(in_mcdf)
# 0x0140 = Callback data (type=NOTE_ON and channel=10 and key=127)
# 0x0240 = Sequence
# 0x0501 = Audio header
# 0x0540 = Audio sample
mcdf_map: dict[int, bytes] = {}
for i in dxm.cmds:
assert i.cmd not in mcdf_map
mcdf_map[i.cmd] = i.data
# 01 - Sequence
callback_stream = BytesIO(mcdf_map[0x140]) if 0x140 in mcdf_map else None
samples: list[Audio] = GreedyRange(Audio.format()).parse(mcdf_map[0x501]) if 0x501 in mcdf_map else []
samples_data: dict[int, bytes] = {}
# pre-decode samples
for s in samples:
smpl = mcdf_map[0x540][s.sample_offset:(s.sample_offset + s.sample_size)]
audio_buf = OkiADP(type = s.type, codec = s.codec, channels = s.channels, bits_per_samples = s.bits_per_samples, sample_rate = s.sample_rate, _audio_data = smpl)
res: "array.array[int]" = array.array("h", audio_buf.decode())
resamp: NDArray[float32] = soxr.resample(res, s.sample_rate, args.sample_rate)
outp = b"".join(max(-32768, min(int(smp), 32767)).to_bytes(2, "little", signed=True) * 2 for smp in resamp)
samples_data[s.audio_id] = outp
sf = MCDF_MidiSynth(samplerate=args.sample_rate)
sf.sfload(args.midi_soundfont)
sf.program_change(tinysoundfont.sequencer.DRUM_CHANNEL, 0, True)
sf.start(1024)
seq = MCDF_Sequencer(sf, callback_stream, samples_data)
seq.pause(True)
time.sleep(0.25)
seq.load_memory(MCDFSequence.parse(mcdf_map[0x240]).convert(), persistent=False)
seq.pause(False)
if args.loop:
while True:
time.sleep(0.01)
if seq.is_empty():
callback_stream.seek(0)
seq.set_time(0)
seq.load_memory(MCDFSequence.parse(mcdf_map[0x240]).convert(), persistent=False)
time.sleep(0.1)
else:
while not seq.is_empty():
time.sleep(1)
time.sleep(1)
seq.pause(True)
time.sleep(0.25)
sf.stop()

247
oki.py Normal file
View file

@ -0,0 +1,247 @@
from dataclasses import dataclass
from typing_extensions import override
from construct_typed import DataclassMixin, DataclassStruct, csfield
from typing import IO, Any, Self
from construct import *
from tap import Tap
class _EnhancedDataclassMixin(DataclassMixin):
@classmethod
def format(cls):
return DataclassStruct(cls)
@classmethod
def build(cls, obj: Self, **kw: Any): # pyright: ignore[reportExplicitAny, reportAny]
return cls.format().build(obj, **kw)
@classmethod
def parse(cls, data: bytes|bytearray, **kw: Any):# pyright: ignore[reportExplicitAny, reportAny]
return cls.format().parse(data, **kw)
@classmethod
def parse_file(cls, file: str, **kw: Any):# pyright: ignore[reportExplicitAny, reportAny]
return cls.format().parse_file(file, **kw)
@classmethod
def parse_stream(cls, stream: IO[bytes], **kw: Any):# pyright: ignore[reportExplicitAny, reportAny]
return cls.format().parse_stream(stream, **kw)
# Wave file
@dataclass
class WAV(_EnhancedDataclassMixin):
@dataclass
class WAVE(_EnhancedDataclassMixin):
@dataclass
class FMT(_EnhancedDataclassMixin):
audio_fmt: int = csfield(Hex(Int16ul))
channels: int = csfield(Int16ul)
sample_rate: int = csfield(Int32ul)
bytes_rate: int = csfield(Rebuild(Int32ul, this.sample_rate * this.channels * (this.bits_per_samples // 8)))
block_align: int = csfield(Rebuild(Int16ul, this.channels * (this.bits_per_samples // 8)))
bits_per_samples: int = csfield(Int16ul)
@dataclass
class Chunk(_EnhancedDataclassMixin):
type: str = csfield(PaddedString(4, "ascii"))
data: bytes = csfield(Prefixed(Int32ul, GreedyBytes) )
magic: bytes = csfield(Const(b"WAVE"))
chunks: list[Chunk] = csfield(GreedyRange(Chunk.format()))
magic: bytes = csfield(Const(b"RIFF"))
wave: WAVE = csfield(Prefixed(Int32ul, WAVE.format()))
# ADPCM2 parameters
STEP_TABLES = [16, 17, 18, 20, 21, 23, 25, 27, 29, 31, 34, 37, 40, 43, 46, 50, 54, 59, 63, 69, 74, 80, 86, 93, 101, 109, 118, 127, 138, 149, 161, 173, 187, 202, 219, 236, 255, 275, 298, 321, 347, 375, 405, 437, 472, 510, 551, 595, 643, 694, 750, 810, 875, 945, 1020, 1102, 1190, 1286, 1388, 1500, 1620, 1749, 1889, 2040, 2204, 2380, 2570, 2776, 2998, 3238, 3497, 3777, 4079, 4406, 4758, 5139, 5550, 5994, 6474, 6991, 7551, 8155, 8807, 9512, 10273, 11095, 11982, 12941, 13976, 15095, 16302]
STEP_INDEX = [-2, -2, -2, -2, 2, 6, 9, 11]
STEP_INDEX_2BIT = [-2, 3]
# ADPCM2 decoder (used in ML2871)
class OKIAdpcm2Decode():
def __init__(self, bits: int=4, channels: int=1):
assert bits in [2, 4]
assert channels in [1, 2]
self.__decode_bits = bits
self.delta: list[int] = [0] * channels
self.step_index: list[int] = [0] * channels
self.__channels = channels
self.__decoder_idx = 0
def __expand_sample(self, nibble: int):
SIGN_MASK = 1 << (self.__decode_bits - 1)
VALU_MASK = SIGN_MASK - 1
cur_step = STEP_TABLES[self.step_index[self.__decoder_idx]]
sign = (-1 if nibble & SIGN_MASK else 1)
if self.__decode_bits == 4:
cur_delta = cur_step >> 3
if nibble & 1:
cur_delta += cur_step >> 2
if nibble & 2:
cur_delta += cur_step >> 1
if nibble & 4:
cur_delta += cur_step
else:
cur_delta = cur_step >> 1
if nibble & 1:
cur_delta += cur_step
self.delta[self.__decoder_idx] += sign * cur_delta
self.step_index[self.__decoder_idx] = max(0, min(self.step_index[self.__decoder_idx] + (STEP_INDEX if self.__decode_bits == 4 else STEP_INDEX_2BIT)[nibble & VALU_MASK], len(STEP_TABLES) - 1))
self.delta[self.__decoder_idx] = max(-32768, min(self.delta[self.__decoder_idx], 32767))
ret = self.delta[self.__decoder_idx]
self.__decoder_idx = (self.__decoder_idx + 1) % self.__channels
return ret
def decode(self, data: bytes):
outp: list[int] = []
for p in data:
if self.__decode_bits == 2:
outp.append(self.__expand_sample(p >> 6))
outp.append(self.__expand_sample((p >> 4) & 3))
outp.append(self.__expand_sample((p >> 2) & 3))
outp.append(self.__expand_sample(p & 3))
else:
outp.append(self.__expand_sample(p >> 4))
outp.append(self.__expand_sample(p & 0xf))
return b"".join(x.to_bytes(2, "little", signed=True) for x in outp)
# ADPCM1 parameters
STEP_TABLES_VOX = [16, 17, 19, 21, 23, 25, 28, 31, 34, 37, 41, 45, 50, 55, 60, 66, 73, 80, 88, 97, 107, 118, 130, 143, 157, 173, 190, 209, 230, 253, 279, 307, 337, 371, 408, 449, 494, 544, 598, 658, 724, 796, 876, 963, 1060, 1166, 1282, 1411, 1552]
STEP_INDEX_VOX = [-1, -1, -1, -1, 2, 4, 6, 8]
# ADPCM1 decoder (VOX)
class OKIAdpcm1Decode():
def __init__(self, bits: int=4, channels: int=1):
assert bits in [4]
assert channels in [1, 2]
self.__decode_bits = bits
self.delta: list[int] = [0] * channels
self.step_index: list[int] = [0] * channels
self.__channels = channels
self.__decoder_idx = 0
def __expand_sample(self, nibble: int):
SIGN_MASK = 1 << (self.__decode_bits - 1)
VALU_MASK = SIGN_MASK - 1
cur_step = STEP_TABLES_VOX[self.step_index[self.__decoder_idx]]
sign = (-1 if nibble & SIGN_MASK else 1)
cur_delta = cur_step >> 3
if nibble & 1:
cur_delta += cur_step >> 2
if nibble & 2:
cur_delta += cur_step >> 1
if nibble & 4:
cur_delta += cur_step
# cur_delta = ((((nibble & 7) * 2) + 1) * cur_step) >> 3 # sox, libsndfile and FFmpeg VOX uses IMA mult instead of shift, while official implementation uses IMA shift.
self.delta[self.__decoder_idx] += sign * cur_delta
self.step_index[self.__decoder_idx] = max(0, min(self.step_index[self.__decoder_idx] + STEP_INDEX_VOX[nibble & VALU_MASK], len(STEP_TABLES_VOX) - 1))
self.delta[self.__decoder_idx] = max(-2048, min(self.delta[self.__decoder_idx], 2047))
ret = self.delta[self.__decoder_idx]
self.__decoder_idx = (self.__decoder_idx + 1) % self.__channels
return ret << 4
def decode(self, data: bytes):
outp: list[int] = []
for p in data:
outp.append(self.__expand_sample(p >> 4))
outp.append(self.__expand_sample(p & 0xf))
return b"".join(x.to_bytes(2, "little", signed=True) for x in outp)
# OKI ADP (Audi) format
@dataclass
class OkiADP(_EnhancedDataclassMixin):
magic: bytes = csfield(Const(b"Audi"))
type: int = csfield(Hex(Int8ub))
codec: int = csfield(Hex(Int8ub))
channels: int = csfield(Hex(Int8ub))
bits_per_samples: int = csfield(Hex(Int8ub))
sample_rate: int = csfield(Int16ub)
_pad: None = csfield(Padding(2))
_audio_data: bytes = csfield(Prefixed(Int32ub, GreedyBytes))
@staticmethod
def __byswap(data: bytes):
temp = bytearray()
for i in range(len(data) >> 1):
temp.append(data[(i << 1) + 1])
temp.append(data[i << 1])
return bytes(temp)
def decode(self):
assert self.type in [0x0, 0x80], "Only PCM and ML28xx ADP files were supported"
if self.type == 0x00 and self.codec == 0x00: # LPCM (Signed)
assert (self.codec == 0 and self.bits_per_samples in [4, 8, 16]), "Bad codec and bits per sample combinations"
if self.bits_per_samples == 4:
decoder = OKIAdpcm1Decode(self.bits_per_samples, self.channels)
return decoder.decode(self._audio_data)
return self.__byswap(self._audio_data) if self.bits_per_samples == 16 else bytes(x ^ 0x80 for x in self._audio_data)
else:
assert (self.codec == 0 and self.bits_per_samples == 2) or (self.codec == 1 and self.bits_per_samples == 4), "Bad codec and bits per sample combinations"
decoder = OKIAdpcm2Decode(self.bits_per_samples, self.channels)
return decoder.decode(self._audio_data)
class Args(Tap):
in_file: str # Input file
out_file: str = "" # Output file (leave blank for playback)
@override
def configure(self) -> None:
self.add_argument("in_file")
self.add_argument("out_file", nargs="?")
if __name__ == "__main__":
import pyaudio
import time
args = Args().parse_args()
adpc = OkiADP.parse_file(args.in_file)
if not args.out_file:
snd = pyaudio.PyAudio()
stream = snd.open(format=pyaudio.paUInt8 if adpc.bits_per_samples == 8 else pyaudio.paInt16, channels=adpc.channels, rate=adpc.sample_rate, output=True)
buffer = adpc.decode()
buf_size = (1024 if adpc.bits_per_samples == 8 else 2048) * adpc.channels
while len(buffer) > 0:
stream.write(buffer[:buf_size])
buffer = buffer[buf_size:]
time.sleep(0.5)
stream.close()
else:
WaveOut = WAV(wave=WAV.WAVE(chunks=[]))
WaveOut.wave.chunks.append(WAV.WAVE.Chunk("fmt ", WAV.WAVE.FMT.build(WAV.WAVE.FMT(1, adpc.channels, adpc.sample_rate, bits_per_samples=8 if adpc.bits_per_samples == 8 else 16))))
WaveOut.wave.chunks.append(WAV.WAVE.Chunk("data", adpc.decode()))
open(args.out_file, "wb").write(WAV.build(WaveOut))
#open("audi.bin", "wb").write(adpTest.decode())