from dataclasses import dataclass from typing_extensions import override from construct_typed import DataclassMixin, DataclassStruct, csfield from typing import IO, Any, Self from construct import * from tap import Tap class _EnhancedDataclassMixin(DataclassMixin): @classmethod def format(cls): return DataclassStruct(cls) @classmethod def build(cls, obj: Self, **kw: Any): # pyright: ignore[reportExplicitAny, reportAny] return cls.format().build(obj, **kw) @classmethod def parse(cls, data: bytes|bytearray, **kw: Any):# pyright: ignore[reportExplicitAny, reportAny] return cls.format().parse(data, **kw) @classmethod def parse_file(cls, file: str, **kw: Any):# pyright: ignore[reportExplicitAny, reportAny] return cls.format().parse_file(file, **kw) @classmethod def parse_stream(cls, stream: IO[bytes], **kw: Any):# pyright: ignore[reportExplicitAny, reportAny] return cls.format().parse_stream(stream, **kw) # Wave file @dataclass class WAV(_EnhancedDataclassMixin): @dataclass class WAVE(_EnhancedDataclassMixin): @dataclass class FMT(_EnhancedDataclassMixin): audio_fmt: int = csfield(Hex(Int16ul)) channels: int = csfield(Int16ul) sample_rate: int = csfield(Int32ul) bytes_rate: int = csfield(Rebuild(Int32ul, this.sample_rate * this.channels * (this.bits_per_samples // 8))) block_align: int = csfield(Rebuild(Int16ul, this.channels * (this.bits_per_samples // 8))) bits_per_samples: int = csfield(Int16ul) @dataclass class Chunk(_EnhancedDataclassMixin): type: str = csfield(PaddedString(4, "ascii")) data: bytes = csfield(Prefixed(Int32ul, GreedyBytes) ) magic: bytes = csfield(Const(b"WAVE")) chunks: list[Chunk] = csfield(GreedyRange(Chunk.format())) magic: bytes = csfield(Const(b"RIFF")) wave: WAVE = csfield(Prefixed(Int32ul, WAVE.format())) # ADPCM2 parameters STEP_TABLES = [16, 17, 18, 20, 21, 23, 25, 27, 29, 31, 34, 37, 40, 43, 46, 50, 54, 59, 63, 69, 74, 80, 86, 93, 101, 109, 118, 127, 138, 149, 161, 173, 187, 202, 219, 236, 255, 275, 298, 321, 347, 375, 405, 437, 472, 510, 551, 595, 643, 694, 750, 810, 875, 945, 1020, 1102, 1190, 1286, 1388, 1500, 1620, 1749, 1889, 2040, 2204, 2380, 2570, 2776, 2998, 3238, 3497, 3777, 4079, 4406, 4758, 5139, 5550, 5994, 6474, 6991, 7551, 8155, 8807, 9512, 10273, 11095, 11982, 12941, 13976, 15095, 16302] STEP_INDEX = [-2, -2, -2, -2, 2, 6, 9, 11] STEP_INDEX_2BIT = [-2, 3] # ADPCM2 decoder (used in ML2871) class OKIAdpcm2Decode(): def __init__(self, bits: int=4, channels: int=1): assert bits in [2, 4] assert channels in [1, 2] self.__decode_bits = bits self.delta: list[int] = [0] * channels self.step_index: list[int] = [0] * channels self.__channels = channels self.__decoder_idx = 0 def __expand_sample(self, nibble: int): SIGN_MASK = 1 << (self.__decode_bits - 1) VALU_MASK = SIGN_MASK - 1 cur_step = STEP_TABLES[self.step_index[self.__decoder_idx]] sign = (-1 if nibble & SIGN_MASK else 1) if self.__decode_bits == 4: cur_delta = cur_step >> 3 if nibble & 1: cur_delta += cur_step >> 2 if nibble & 2: cur_delta += cur_step >> 1 if nibble & 4: cur_delta += cur_step else: cur_delta = cur_step >> 1 if nibble & 1: cur_delta += cur_step self.delta[self.__decoder_idx] += sign * cur_delta self.step_index[self.__decoder_idx] = max(0, min(self.step_index[self.__decoder_idx] + (STEP_INDEX if self.__decode_bits == 4 else STEP_INDEX_2BIT)[nibble & VALU_MASK], len(STEP_TABLES) - 1)) self.delta[self.__decoder_idx] = max(-32768, min(self.delta[self.__decoder_idx], 32767)) ret = self.delta[self.__decoder_idx] self.__decoder_idx = (self.__decoder_idx + 1) % self.__channels return ret def decode(self, data: bytes): outp: list[int] = [] for p in data: if self.__decode_bits == 2: outp.append(self.__expand_sample(p >> 6)) outp.append(self.__expand_sample((p >> 4) & 3)) outp.append(self.__expand_sample((p >> 2) & 3)) outp.append(self.__expand_sample(p & 3)) else: outp.append(self.__expand_sample(p >> 4)) outp.append(self.__expand_sample(p & 0xf)) return b"".join(x.to_bytes(2, "little", signed=True) for x in outp) # ADPCM1 parameters STEP_TABLES_VOX = [16, 17, 19, 21, 23, 25, 28, 31, 34, 37, 41, 45, 50, 55, 60, 66, 73, 80, 88, 97, 107, 118, 130, 143, 157, 173, 190, 209, 230, 253, 279, 307, 337, 371, 408, 449, 494, 544, 598, 658, 724, 796, 876, 963, 1060, 1166, 1282, 1411, 1552] STEP_INDEX_VOX = [-1, -1, -1, -1, 2, 4, 6, 8] # ADPCM1 decoder (VOX) class OKIAdpcm1Decode(): def __init__(self, bits: int=4, channels: int=1): assert bits in [4] assert channels in [1, 2] self.__decode_bits = bits self.delta: list[int] = [0] * channels self.step_index: list[int] = [0] * channels self.__channels = channels self.__decoder_idx = 0 def __expand_sample(self, nibble: int): SIGN_MASK = 1 << (self.__decode_bits - 1) VALU_MASK = SIGN_MASK - 1 cur_step = STEP_TABLES_VOX[self.step_index[self.__decoder_idx]] sign = (-1 if nibble & SIGN_MASK else 1) cur_delta = cur_step >> 3 if nibble & 1: cur_delta += cur_step >> 2 if nibble & 2: cur_delta += cur_step >> 1 if nibble & 4: cur_delta += cur_step # cur_delta = ((((nibble & 7) * 2) + 1) * cur_step) >> 3 # sox, libsndfile and FFmpeg VOX uses IMA mult instead of shift, while official implementation uses IMA shift. self.delta[self.__decoder_idx] += sign * cur_delta self.step_index[self.__decoder_idx] = max(0, min(self.step_index[self.__decoder_idx] + STEP_INDEX_VOX[nibble & VALU_MASK], len(STEP_TABLES_VOX) - 1)) self.delta[self.__decoder_idx] = max(-2048, min(self.delta[self.__decoder_idx], 2047)) ret = self.delta[self.__decoder_idx] self.__decoder_idx = (self.__decoder_idx + 1) % self.__channels return ret << 4 def decode(self, data: bytes): outp: list[int] = [] for p in data: outp.append(self.__expand_sample(p >> 4)) outp.append(self.__expand_sample(p & 0xf)) return b"".join(x.to_bytes(2, "little", signed=True) for x in outp) # OKI ADP (Audi) format @dataclass class OkiADP(_EnhancedDataclassMixin): magic: bytes = csfield(Const(b"Audi")) type: int = csfield(Hex(Int8ub)) codec: int = csfield(Hex(Int8ub)) channels: int = csfield(Hex(Int8ub)) bits_per_samples: int = csfield(Hex(Int8ub)) sample_rate: int = csfield(Int16ub) _pad: None = csfield(Padding(2)) _audio_data: bytes = csfield(Prefixed(Int32ub, GreedyBytes)) @staticmethod def __byswap(data: bytes): temp = bytearray() for i in range(len(data) >> 1): temp.append(data[(i << 1) + 1]) temp.append(data[i << 1]) return bytes(temp) def decode(self): assert self.type in [0x0, 0x80], "Only PCM and ML28xx ADP files were supported" if self.type == 0x00 and self.codec == 0x00: # LPCM (Signed) assert (self.codec == 0 and self.bits_per_samples in [4, 8, 16]), "Bad codec and bits per sample combinations" if self.bits_per_samples == 4: decoder = OKIAdpcm1Decode(self.bits_per_samples, self.channels) return decoder.decode(self._audio_data) return self.__byswap(self._audio_data) if self.bits_per_samples == 16 else bytes(x ^ 0x80 for x in self._audio_data) else: assert (self.codec == 0 and self.bits_per_samples == 2) or (self.codec == 1 and self.bits_per_samples == 4), "Bad codec and bits per sample combinations" decoder = OKIAdpcm2Decode(self.bits_per_samples, self.channels) return decoder.decode(self._audio_data) class Args(Tap): in_file: str # Input file out_file: str = "" # Output file (leave blank for playback) @override def configure(self) -> None: self.add_argument("in_file") self.add_argument("out_file", nargs="?") if __name__ == "__main__": import pyaudio import time args = Args().parse_args() adpc = OkiADP.parse_file(args.in_file) if not args.out_file: snd = pyaudio.PyAudio() stream = snd.open(format=pyaudio.paUInt8 if adpc.bits_per_samples == 8 else pyaudio.paInt16, channels=adpc.channels, rate=adpc.sample_rate, output=True) buffer = adpc.decode() buf_size = (1024 if adpc.bits_per_samples == 8 else 2048) * adpc.channels while len(buffer) > 0: stream.write(buffer[:buf_size]) buffer = buffer[buf_size:] time.sleep(0.5) stream.close() else: WaveOut = WAV(wave=WAV.WAVE(chunks=[])) WaveOut.wave.chunks.append(WAV.WAVE.Chunk("fmt ", WAV.WAVE.FMT.build(WAV.WAVE.FMT(1, adpc.channels, adpc.sample_rate, bits_per_samples=8 if adpc.bits_per_samples == 8 else 16)))) WaveOut.wave.chunks.append(WAV.WAVE.Chunk("data", adpc.decode())) open(args.out_file, "wb").write(WAV.build(WaveOut)) #open("audi.bin", "wb").write(adpTest.decode())