diff --git a/base_tef.py b/base_tef.py index 8a555c8..f9e9d91 100644 --- a/base_tef.py +++ b/base_tef.py @@ -1,3 +1,4 @@ +import struct from protocol import I2CPCClient, time from patches import * @@ -40,22 +41,10 @@ class BaseTEF668X: time.sleep(0.01) return data[-1] - @staticmethod - def _base_command_wrapper(func: Callable[Concatenate["BaseTEF668X", P], tuple[bytes, int | None, ABCCallable[[bytes], T] | None]]) -> Callable[Concatenate["BaseTEF668X", P], bytes | T]: - def inner(self: "BaseTEF668X", *args: P.args, **kwargs: P.kwargs ) -> bytes | T: - data, read_bytes, out_parser = func(self, *args, **kwargs) - if read_bytes: data = self.p.write_read_i2c(self.address, data, read_bytes) - else: data = self.p.write_i2c(self.address, data) - if out_parser: return out_parser(data) - return data - return inner - - @_base_command_wrapper def APPL_Set_ReferenceClock(self, clock: int, type_clock: bool): - return b"\x40\x04\x01" + bytes(((clock >> 16) & 0xffff).to_bytes(2) + (clock & 0xffff).to_bytes(2) + bytes([0, int(type_clock)])), None, None - @_base_command_wrapper + return self.p.write_i2c(self.address, b"\x40\x04\x01" + struct.pack(">IH", clock, type_clock)) def APPL_Activate(self): - return b"\x40\x05\x01\x00\x01", None, None + return self.p.write_i2c(self.address, b"\x40\x05\x01\x00\x01") def init(self, patch = tef_102_patch, patch_lut = tef_102_patch_lut, clock: int = 9216000): self._reset() diff --git a/protocol.py b/protocol.py index bd7e16f..52e83d3 100644 --- a/protocol.py +++ b/protocol.py @@ -70,10 +70,10 @@ class I2CPCClient: return self._send_packet(payload, False) def write_eeprom(self, addr: int, data: bytes): - payload = bytes([7, (addr >> 8) & 0xff, addr & 0xff]) + data + payload = bytes([7]) + struct.pack(">I", addr) + data return self._send_packet(payload, False) def read_eeprom(self, addr: int, len: int): - payload = bytes([8, (addr >> 8) & 0xff, addr & 0xff, len]) + payload = bytes([8]) + struct.pack(">I", addr) + bytes([len]) return self._send_packet(payload, False) def version(self): return self._send_packet(bytes([4])) diff --git a/tef.py b/tef.py index 7f3555f..50f54f4 100644 --- a/tef.py +++ b/tef.py @@ -2,6 +2,7 @@ from __future__ import annotations from base_tef import BaseTEF668X, Concatenate, Callable, ParamSpec from typing import overload, TypeVar from functools import wraps +import struct P = ParamSpec("P") T = TypeVar("T") @@ -22,98 +23,260 @@ def _command_wrapper(func: Callable[Concatenate[TEF6686, P], tuple[bytes, int | return data return inner +def pack(*values: int, signed: bool = False): return struct.pack(">" + (("h" if signed else "H")*len(values)), *values) + class TEF6686(BaseTEF668X): @_command_wrapper def FM_Tune_To(self, mode: int, frequency: int | None): - return b"\x20\x01\x01" + mode.to_bytes(2, "big") + (frequency.to_bytes(2, "big") if frequency else b""), None, None + return b"\x20\x01\x01" + (pack(mode, frequency) if frequency is not None else pack(mode)), None, None @_command_wrapper def AM_Tune_To(self, mode: int, frequency: int | None): - return b"\x21\x01\x01" + mode.to_bytes(2, "big") + (frequency.to_bytes(2, "big") if frequency else b""), None, None + return b"\x21\x01\x01" + (pack(mode, frequency) if frequency is not None else pack(mode)), None, None + @_command_wrapper def FM_Set_Tune_Options(self, afu_bw_mode: bool = False, afu_bandwidth: int = 2360, afu_mute_time: int = 1000, afu_sample_time: int = 2000): - return b"\x20\x02\x01" + afu_bw_mode.to_bytes(2, "big") + afu_bandwidth.to_bytes(2, "big") + afu_mute_time.to_bytes(2, "big") + afu_sample_time.to_bytes(2, "big"), None, None + return b"\x20\x02\x01" + pack(afu_bw_mode, afu_bandwidth, afu_mute_time, afu_sample_time), None, None + @_command_wrapper + @overload def FM_Set_Bandwidth(self, mode: bool = True, bandwidth: int = 2360, control_sensitivity: int = 1000, low_level_sensitivity: int = 1000): - return b"\x20\x0a\x01" + mode.to_bytes(2, "big") + bandwidth.to_bytes(2, "big") + control_sensitivity.to_bytes(2, "big") + low_level_sensitivity.to_bytes(2, "big"), None, None + return b"\x20\x0a\x01" + pack(mode, bandwidth, control_sensitivity, low_level_sensitivity), None, None + @_command_wrapper + @overload + def FM_Set_Bandwidth(self, mode: bool = True, bandwidth: int = 2360, control_sensitivity: int = 1000, low_level_sensitivity: int = 1000, min_bandwidth: int = 560, nominal_bandwidth: int = 2360, control_attack: int = 300): + """ + Minimal bandwidth requires p2.13, while nominal_bandwidth and control_attack require p2.17 (we have p2.24) + """ + return b"\x20\x0a\x01" + pack(mode, bandwidth, control_sensitivity, low_level_sensitivity, min_bandwidth, nominal_bandwidth, control_attack), None, None @_command_wrapper def AM_Set_Bandwidth(self, bandwidth: int = 40): - return b"\x21\x0a\x01\x00\x00" + bandwidth.to_bytes(2, "big"), None, None + return b"\x21\x0a\x01\x00\x00" + pack(bandwidth), None, None + @_command_wrapper def FM_Set_RFAGC(self, start: int = 920, extension: bool = False): - return b"\x20\x0b\x01" + start.to_bytes(2, "big") + extension.to_bytes(2, "big"), None, None + return b"\x20\x0b\x01" + pack(start, extension), None, None @_command_wrapper def AM_Set_RFAGC(self, start: int = 1000): - return b"\x21\x0b\x01" + start.to_bytes(2, "big"), None, None + return b"\x21\x0b\x01" + pack(start), None, None + @_command_wrapper def AM_Set_Antenna(self, attenuation: int = 0): - return b"\x21\x0c\x01" + attenuation.to_bytes(2, "big"), None, None + return b"\x21\x0c\x01" + pack(attenuation), None, None + @_command_wrapper def AM_Set_CoChannelDet(self, mode: bool = True, restart: int = 2, sensitivity: int = 1000, count: int = 3): - return b"\x21\x0e\x01" + mode.to_bytes(2, "big") + restart.to_bytes(2, "big") + sensitivity.to_bytes(2, "big") + count.to_bytes(2, "big"), None, None + return b"\x21\x0e\x01" + pack(mode, restart, sensitivity, count), None, None + @_command_wrapper def FM_Set_MphSuppression(self, mode: bool = False): - return b"\x20\x14\x01" + mode.to_bytes(2, "big"), None, None + return b"\x20\x14\x01" + pack(mode), None, None + @_command_wrapper def FM_Set_ChannelEqualizer(self, mode: bool = False): - return b"\x20\x16\x01" + mode.to_bytes(2, "big"), None, None + return b"\x20\x16\x01" + pack(mode), None, None + @_command_wrapper + @overload def FM_Set_NoiseBlanker(self, mode: bool = True, sensitivity: int = 1000): - return b"\x20\x17\x01" + mode.to_bytes(2, "big") + sensitivity.to_bytes(2, "big"), None, None + return b"\x20\x17\x01" + pack(mode, sensitivity), None, None @_command_wrapper + @overload + def FM_Set_NoiseBlanker(self, mode: bool = True, sensitivity: int = 1000, modulation: int = 900, offset: int = 1, attack: int = 140, decay: int = 2800): + return b"\x20\x17\x01" + pack(mode, sensitivity, 0, modulation, offset, attack, decay), None, None + + @_command_wrapper + def FM_Set_NoiseBlanker_Options(self, blank_time: int = 210, blank_time2: int = 210, blank_modulation: int = 250): + return b"\x20\x18\x01" + pack(blank_time, blank_time2, blank_modulation), None, None + + @_command_wrapper + @overload def AM_Set_NoiseBlanker(self, mode: bool = True, sensitivity: int = 1000): - return b"\x21\x17\x01" + mode.to_bytes(2, "big") + sensitivity.to_bytes(2, "big"), None, None + return b"\x21\x17\x01" + pack(mode, sensitivity), None, None @_command_wrapper + @overload + def AM_Set_NoiseBlanker(self, mode: bool = True, sensitivity: int = 1000, gain: int = 1000, blank_time: int = 56): + return b"\x21\x17\x01" + pack(mode, sensitivity, gain, blank_time), None, None + + @_command_wrapper + @overload def AM_Set_NoiseBlanker_Audio(self, mode: bool = True, sensitivity: int = 1000): - return b"\x21\x18\x01" + mode.to_bytes(2, "big") + sensitivity.to_bytes(2, "big"), None, None + return b"\x21\x18\x01" + pack(mode, sensitivity), None, None + @_command_wrapper + @overload + def AM_Set_NoiseBlanker_Audio(self, mode: bool = True, sensitivity: int = 1000, blank_time: int = 800): + return b"\x21\x18\x01" + pack(mode, sensitivity, 0, blank_time), None, None + @_command_wrapper def FM_Set_Deemphasis(self, timeconstant: int = 500): - return b"\x20\x1f\x01" + timeconstant.to_bytes(2, "big"), None, None + return b"\x20\x1f\x01" + pack(timeconstant), None, None + @_command_wrapper def FM_Set_LevelStep(self, step1: int = -20, step2: int = -30, step3: int = -40, step4: int = -50, step5: int = -60, step6: int = -60, step7: int = -60): - return b"\x20\x26\x01" + step1.to_bytes(2, "big", signed=True) + step2.to_bytes(2, "big", signed=True) \ - + step3.to_bytes(2, "big", signed=True) + step4.to_bytes(2, "big", signed=True) + step5.to_bytes(2, "big", signed=True) + \ - step6.to_bytes(2, "big", signed=True) + step7.to_bytes(2, "big", signed=True), None, None + return b"\x20\x26\x01" + pack(step1, step2, step3, step4, step5, step6, step7), None, None @_command_wrapper def AM_Set_LevelStep(self, step1: int = -10, step2: int = -20, step3: int = -30, step4: int = -40, step5: int = -50, step6: int = -60, step7: int = -60): - return b"\x21\x26\x01" + step1.to_bytes(2, "big", signed=True) + step2.to_bytes(2, "big", signed=True) \ - + step3.to_bytes(2, "big", signed=True) + step4.to_bytes(2, "big", signed=True) + step5.to_bytes(2, "big", signed=True) + \ - step6.to_bytes(2, "big", signed=True) + step7.to_bytes(2, "big", signed=True), None, None + return b"\x21\x26\x01" + pack(step1, step2, step3, step4, step5, step6, step7), None, None + @_command_wrapper def FM_Set_LevelOffset(self, offset: int = 0): - return b"\x20\x27\x01" + offset.to_bytes(2, "big", signed=True), None, None + return b"\x20\x27\x01" + pack(offset, signed=True), None, None @_command_wrapper def AM_Set_LevelOffset(self, offset: int = 0): - return b"\x21\x27\x01" + offset.to_bytes(2, "big", signed=True), None, None + return b"\x21\x27\x01" + pack(offset, signed=True), None, None + @_command_wrapper - def FM_Set_RDS(self, mode: int = 0, restart: int = 2, interface: int = 0): - return b"\x20\x51\x01" + mode.to_bytes(2, "big") + restart.to_bytes(2, "big") + interface.to_bytes(2, "big"), None, None + def FM_Set_Softmute_Time(self, slow_attack: int = 120, slow_decay: int = 500, fast_attack: int = 20, fast_decay: int = 20): + return b"\x20\x28\x01" + slow_attack.to_bytes(2, "big") + slow_decay.to_bytes(2, "big") + fast_attack.to_bytes(2, "big") + fast_decay.to_bytes(2, "big"), None, None @_command_wrapper - def FM_Set_Specials(self, mode: int = 0): - return b"\x20\x55\x01" + mode.to_bytes(2, "big"), None, None + def AM_Set_Softmute_Time(self, slow_attack: int = 120, slow_decay: int = 500, fast_attack: int = 120, fast_decay: int = 500): + return b"\x21\x28\x01" + slow_attack.to_bytes(2, "big") + slow_decay.to_bytes(2, "big") + fast_attack.to_bytes(2, "big") + fast_decay.to_bytes(2, "big"), None, None @_command_wrapper - def AUDIO_Set_Volume(self, volume: int = 0): - return b"\x30\x0a\x01" + volume.to_bytes(2, "big", signed=True), None, None + def AM_Set_Softmute_Mod(self, mode: bool = False, start: int = 210, slope: int = 120, shift: int = 260): + return b"\x21\x29\x01" + pack(mode, start, slope, shift), None, None @_command_wrapper - def AUDIO_Set_Mute(self, mute: bool = True): - return b"\x30\x0b\x01" + int(mute).to_bytes(2, "big"), None, None + def FM_Set_Softmute_Level(self, mode: int = 0, start: int = 150, slope: int = 220): + return b"\x20\x2A\x01" + mode.to_bytes(2, "big") + start.to_bytes(2, "big") + slope.to_bytes(2, "big"), None, None @_command_wrapper - def AUDIO_Set_Input(self, source: int = 0): - return b"\x30\x0c\x01" + source.to_bytes(2, "big"), None, None + def AM_Set_Softmute_Level(self, mode: int = 0, start: int = 280, slope: int = 250): + return b"\x21\x2A\x01" + mode.to_bytes(2, "big") + start.to_bytes(2, "big") + slope.to_bytes(2, "big"), None, None @_command_wrapper - def AUDIO_Set_Output_Source(self, signal: int, source: int = 224): - return b"\x30\x0d\x01" + signal.to_bytes(2, "big") + source.to_bytes(2, "big"), None, None + def FM_Set_Softmute_Noise(self, mode: int = 0, start: int = 500, slope: int = 1000): + return b"\x20\x2b\x01" + mode.to_bytes(2, "big") + start.to_bytes(2, "big") + slope.to_bytes(2, "big"), None, None @_command_wrapper - def AUDIO_Set_Ana_Out(self, signal: int, mode: bool = True): - return b"\x30\x15\x01" + signal.to_bytes(2, "big") + mode.to_bytes(2, "big"), None, None + def FM_Set_Softmute_Mph(self, mode: int = 0, start: int = 500, slope: int = 1000): + return b"\x20\x2c\x01" + mode.to_bytes(2, "big") + start.to_bytes(2, "big") + slope.to_bytes(2, "big"), None, None @_command_wrapper - def AUDIO_Set_WaveGen(self, mode: int = 0, offset: int = 0, amplitude1: int = -200, frequency1: int = 400, amplitude2: int = -200, frequency2: int = 1000): - return b"\x30\x18\x01" + mode.to_bytes(2, "big") + offset.to_bytes(2, "big") + amplitude1.to_bytes(2, "big", signed=True) + frequency1.to_bytes(2, "big") + amplitude2.to_bytes(2, "big", signed=True) + frequency2.to_bytes(2, "big"), None, None + def FM_Set_Softmute_Max(self, mode: bool = True, start: int = 200): + return b"\x20\x2c\x01" + mode.to_bytes(2, "big") + start.to_bytes(2, "big"), None, None @_command_wrapper - def APPL_Set_OperationMode(self, mode: bool = True): - return b"\x40\x01\x01" + mode.to_bytes(2, "big"), None, None + def AM_Set_Softmute_Max(self, mode: bool = True, start: int = 250): + return b"\x21\x2c\x01" + mode.to_bytes(2, "big") + start.to_bytes(2, "big"), None, None + + @_command_wrapper + def FM_Set_Highcut_Time(self, slow_attack: int = 500, slow_decay: int = 2000, fast_attack: int = 20, fast_decay: int = 20): + return b"\x20\x32\x01" + pack(slow_attack, slow_decay, fast_attack, fast_decay), None, None + @_command_wrapper + def AM_Set_Highcut_Time(self, slow_attack: int = 500, slow_decay: int = 2000, fast_attack: int = 12, fast_decay: int = 50): + return b"\x21\x32\x01" + pack(slow_attack, slow_decay, fast_attack, fast_decay), None, None + @_command_wrapper + def FM_Set_Highcut_Mod(self, mode: bool = False, start: int = 250, slope: int = 130, shift: int = 500): + return b"\x20\x33\x01" + pack(mode, start, slope, shift), None, None + @_command_wrapper + def AM_Set_Highcut_Mod(self, mode: bool = False, start: int = 250, slope: int = 130, shift: int = 500): + return b"\x21\x33\x01" + pack(mode, start, slope, shift), None, None + @_command_wrapper + def FM_Set_Highcut_Level(self, mode: int = 3, start: int = 360, slope: int = 300): + return b"\x20\x34\x01" + pack(mode, start, slope), None, None + @_command_wrapper + def AM_Set_Highcut_Level(self, mode: int = 2, start: int = 400, slope: int = 200): + return b"\x21\x34\x01" + pack(mode, start, slope), None, None + @_command_wrapper + def FM_Set_Highcut_Noise(self, mode: int = 2, start: int = 360, slope: int = 300): + return b"\x20\x35\x01" + pack(mode, start, slope), None, None + @_command_wrapper + def FM_Set_Highcut_Mph(self, mode: int = 2, start: int = 360, slope: int = 300): + return b"\x20\x36\x01" + pack(mode, start, slope), None, None + @_command_wrapper + def FM_Set_Highcut_Max(self, mode: bool = True, limit: int = 4000): + return b"\x20\x37\x01" + pack(mode, limit), None, None + @_command_wrapper + def AM_Set_Highcut_Max(self, mode: bool = True, limit: int = 1800): + return b"\x21\x37\x01" + pack(mode, limit), None, None + @_command_wrapper + def FM_Set_Highcut_Min(self, mode: bool = False, limit: int = 10000): + return b"\x20\x38\x01" + pack(mode, limit), None, None + @_command_wrapper + def AM_Set_Highcut_Min(self, mode: bool = False, limit: int = 6000): + return b"\x21\x38\x01" + pack(mode, limit), None, None + @_command_wrapper + def FM_Set_Lowcut_Max(self, mode: bool = True, limit: int = 120): + return b"\x20\x39\x01" + pack(mode, limit), None, None + @_command_wrapper + def AM_Set_Lowcut_Max(self, mode: bool = True, limit: int = 120): + return b"\x21\x39\x01" + pack(mode, limit), None, None + @_command_wrapper + def FM_Set_Lowcut_Min(self, mode: bool = False, limit: int = 20): + return b"\x20\x3A\x01" + pack(mode, limit), None, None + @_command_wrapper + def AM_Set_Lowcut_Min(self, mode: bool = True, limit: int = 20): + return b"\x21\x3A\x01" + pack(mode, limit), None, None + @_command_wrapper + def FM_Set_Highcut_Options(self, mode: int = 1): + return b"\x20\x3b\x01" + pack(mode), None, None + + @_command_wrapper + def FM_Set_Stereo_Time(self, slow_attack: int = 1000, slow_decay: int = 4000, fast_attack: int = 80, fast_decay: int = 80): + return b"\x20\x3c\x01" + pack(slow_attack, slow_decay, fast_attack, fast_decay), None, None + @_command_wrapper + def FM_Set_Stereo_Mod(self, mode: bool = False, start: int = 210, slope: int = 90, shift: int = 500): + return b"\x20\x3d\x01" + pack(mode, start, slope, shift), None, None + @_command_wrapper + def FM_Set_Stereo_Level(self, mode: int = 3, start: int = 460, slope: int = 240): + return b"\x20\x3e\x01" + pack(mode, start, slope), None, None + @_command_wrapper + def FM_Set_Stereo_Noise(self, mode: int = 3, start: int = 240, slope: int = 200): + return b"\x20\x3f\x01" + pack(mode, start, slope), None, None + @_command_wrapper + def FM_Set_Stereo_Mph(self, mode: int = 3, start: int = 240, slope: int = 200): + return b"\x20\x40\x01" + pack(mode, start, slope), None, None + @_command_wrapper + def FM_Set_Stereo_Max(self, mode: bool = True): + return b"\x20\x41\x01" + pack(mode), None, None @_command_wrapper def FM_Set_Stereo_Min(self, mode: int = 0, limit: int = 400): - return b"\x20\x42\x01" + mode.to_bytes(2, "big") + limit.to_bytes(2, "big"), None, None + return b"\x20\x42\x01" + pack(mode, limit), None, None + + @_command_wrapper + def FM_Set_RDS(self, mode: int = 0, restart: int = 2, interface: int = 0): + return b"\x20\x51\x01" + pack(mode, restart, interface), None, None + + @_command_wrapper + def FM_Set_Specials(self, mode: int = 0): + return b"\x20\x55\x01" + pack(mode), None, None + + @_command_wrapper + def FM_Set_Bandwidth_Options(self, modulation: int = 950): + return b"\x20\x56\x01" + pack(modulation), None, None + + @_command_wrapper + def AUDIO_Set_Volume(self, volume: int = 0): + return b"\x30\x0a\x01" + pack(volume, signed=True), None, None + + @_command_wrapper + def AUDIO_Set_Mute(self, mute: bool = True): + return b"\x30\x0b\x01" + pack(mute), None, None + + @_command_wrapper + def AUDIO_Set_Input(self, source: int = 0): + return b"\x30\x0c\x01" + pack(source), None, None + + @_command_wrapper + def AUDIO_Set_Output_Source(self, signal: int, source: int = 224): + return b"\x30\x0d\x01" + pack(signal, source), None, None + + @_command_wrapper + def AUDIO_Set_Ana_Out(self, signal: int, mode: bool = True): + return b"\x30\x15\x01" + pack(signal, mode), None, None + + @_command_wrapper + def AUDIO_Set_Dig_IO(self, signal: int, mode: int = 0, format: int = 32, operation: int = 0, sample_rate: int = 4410): + return b"\x30\x16\x01" + pack(signal, mode, format, operation, sample_rate), None, None + + @_command_wrapper + def AUDIO_Set_Input_Scaler(self, source: int, gain: int = 0): + return b"\x30\x17\x01" + pack(source) + pack(gain, signed=True), None, None + + @_command_wrapper + def AUDIO_Set_WaveGen(self, mode: int = 0, offset: int = 0, amplitude1: int = -200, frequency1: int = 400, amplitude2: int = -200, frequency2: int = 1000): + return b"\x30\x18\x01" + pack(mode) + offset.to_bytes(2, "big") + pack(amplitude1, signed=True) + pack(frequency1) + pack(amplitude2, signed=True) + pack(frequency2), None, None + + @_command_wrapper + def APPL_Set_OperationMode(self, mode: bool = True): + return b"\x40\x01\x01" + pack(mode), None, None + + @_command_wrapper + def APPL_Set_GPIO(self, pin: int, module: int, feature: int): + return b"\x40\x03\x01" + pack(pin, module, feature), None, None @staticmethod def _get_quality_data(data) -> tuple[None, None, None, None, None, None, None] | tuple[int, int, int, int, int, int, int]: @@ -128,16 +291,16 @@ class TEF6686(BaseTEF668X): return status, level, usn, wam, offset, bandwidth, modulation @_command_wrapper def FM_Get_Quality_Status(self): - return b"\x20\x80\x01", 14, self._get_quality_data + return b"\x20\x80\x01", 7*2, self._get_quality_data @_command_wrapper def FM_Get_Quality_Data(self): - return b"\x20\x81\x01", 14, self._get_quality_data + return b"\x20\x81\x01", 7*2, self._get_quality_data @_command_wrapper def AM_Get_Quality_Status(self): - return b"\x21\x80\x01", 14, self._get_quality_data + return b"\x21\x80\x01", 7*2, self._get_quality_data @_command_wrapper def AM_Get_Quality_Data(self): - return b"\x21\x81\x01", 14, self._get_quality_data + return b"\x21\x81\x01", 7*2, self._get_quality_data @staticmethod def _get_rds_data_proc_decoder(data) -> tuple[None, None, None, None, None, None] | tuple[int, int, int, int, int, int]: @@ -158,16 +321,16 @@ class TEF6686(BaseTEF668X): return status, (raw_data_high << 16) | raw_data_low @_command_wrapper def FM_Get_RDS_Status__decoder(self): - return b"\x20\x82\x01", 12, self._get_rds_data_proc_decoder + return b"\x20\x82\x01", 2*6, self._get_rds_data_proc_decoder @_command_wrapper def FM_Get_RDS_Data__decoder(self): - return b"\x20\x83\x01", 12, self._get_rds_data_proc_decoder + return b"\x20\x83\x01", 2*6, self._get_rds_data_proc_decoder @_command_wrapper def FM_Get_RDS_Status__demodulator(self): - return b"\x20\x82\x01", 12, self._get_rds_data_proc_demodulator + return b"\x20\x82\x01", 2*3, self._get_rds_data_proc_demodulator @_command_wrapper def FM_Get_RDS_Data__demodulator(self): - return b"\x20\x83\x01", 12, self._get_rds_data_proc_demodulator + return b"\x20\x83\x01", 2*3, self._get_rds_data_proc_demodulator @_command_wrapper def FM_Get_AGC(self): @@ -179,7 +342,8 @@ class TEF6686(BaseTEF668X): input_att = (data[1] << 8) | data[2] feedback_att = (data[3] << 8) | data[4] return input_att, feedback_att - return b"\x20\x84\x01", 4, proc + return b"\x20\x84\x01", 2*2, proc + @_command_wrapper def FM_Get_Signal_Status(self): """ @@ -190,16 +354,25 @@ class TEF6686(BaseTEF668X): value = (data[-2] << 8) | data[-1] return (value & (1 << 15)) != 0, (value & (1 << 14)) != 0 return b"\x20\x85\x01", 2, proc + @_command_wrapper def FM_Get_Processing_Status(self): def proc(data): - if data[0] != 0: return None, None, None, None + if data[0] != 0: return None softmute = (data[1] << 8) | data[2] highcut = (data[3] << 8) | data[4] stereo = (data[5] << 8) | data[6] sthiblend = (data[7] << 8) | data[8] return softmute, highcut, stereo, sthiblend return b"\x20\x86\x01", 2*4, proc + + @_command_wrapper + def APPL_Get_GPIO_Status(self): + def proc(data): + if data[0] != 0: return None + return bool(data[1] & 1), bool((data[1] >> 1) & 1), bool((data[1] >> 2) & 1) + return b"\x40\x81\x01", 2, proc + @_command_wrapper def APPL_Get_Identification(self): def proc(data): @@ -208,4 +381,4 @@ class TEF6686(BaseTEF668X): hw_version = (data[3] << 8) | data[4] sw_version = (data[5] << 8) | data[6] return device, hw_version, sw_version - return b"\x40\x82\x01", 6, proc \ No newline at end of file + return b"\x40\x82\x01", 2*3, proc \ No newline at end of file diff --git a/xrd.py b/xrd.py index f32cf73..cffb008 100644 --- a/xrd.py +++ b/xrd.py @@ -119,8 +119,7 @@ def process_command(tef: TEF6686, data: bytes, state: dict, conn: socket.socket) return out def send_signal_status(tef: TEF6686, conn: socket.socket, state: dict): - _, _, stereo, _ = tef.FM_Get_Processing_Status() - stereo = stereo if stereo is not None else 1000 # Default to Mono + _, _, stereo, _ = d if (d := tef.FM_Get_Processing_Status()) else (None, None, 1000, None) status, level, usn, wam, _, bandwidth, _ = tef.FM_Get_Quality_Data() if status is None or level is None or wam is None or usn is None or bandwidth is None: return