You've already forked TEF6686_Driver
211 lines
12 KiB
Python
211 lines
12 KiB
Python
from __future__ import annotations
|
|
from base_tef import BaseTEF668X, Concatenate, Callable, ParamSpec
|
|
from typing import overload, TypeVar
|
|
from functools import wraps
|
|
|
|
P = ParamSpec("P")
|
|
T = TypeVar("T")
|
|
|
|
@overload
|
|
def _command_wrapper(func: Callable[Concatenate[TEF6686, P],tuple[bytes, int | None, None],],) -> Callable[Concatenate[TEF6686, P], bytes]: ...
|
|
@overload
|
|
def _command_wrapper(func: Callable[Concatenate[TEF6686, P],tuple[bytes, int | None, Callable[[bytes], T]],],) -> Callable[Concatenate[TEF6686, P], T]: ...
|
|
|
|
def _command_wrapper(func: Callable[Concatenate[TEF6686, P], tuple[bytes, int | None, Callable[[bytes], T] | None], ]) -> Callable[Concatenate[TEF6686, P], bytes | T]:
|
|
@wraps(func)
|
|
def inner(self: TEF6686, *args: P.args, **kwargs: P.kwargs) -> bytes | T:
|
|
data, read_bytes, out_parser = func(self, *args, **kwargs)
|
|
|
|
if read_bytes: data = self.p.write_read_i2c(self.address, data, read_bytes)
|
|
else: data = self.p.write_i2c(self.address, data)
|
|
if out_parser: return out_parser(data)
|
|
return data
|
|
return inner
|
|
|
|
class TEF6686(BaseTEF668X):
|
|
@_command_wrapper
|
|
def FM_Tune_To(self, mode: int, frequency: int | None):
|
|
return b"\x20\x01\x01" + mode.to_bytes(2, "big") + (frequency.to_bytes(2, "big") if frequency else b""), None, None
|
|
@_command_wrapper
|
|
def AM_Tune_To(self, mode: int, frequency: int | None):
|
|
return b"\x21\x01\x01" + mode.to_bytes(2, "big") + (frequency.to_bytes(2, "big") if frequency else b""), None, None
|
|
@_command_wrapper
|
|
def FM_Set_Tune_Options(self, afu_bw_mode: bool = False, afu_bandwidth: int = 2360, afu_mute_time: int = 1000, afu_sample_time: int = 2000):
|
|
return b"\x20\x02\x01" + afu_bw_mode.to_bytes(2, "big") + afu_bandwidth.to_bytes(2, "big") + afu_mute_time.to_bytes(2, "big") + afu_sample_time.to_bytes(2, "big"), None, None
|
|
@_command_wrapper
|
|
def FM_Set_Bandwidth(self, mode: bool = True, bandwidth: int = 2360, control_sensitivity: int = 1000, low_level_sensitivity: int = 1000):
|
|
return b"\x20\x0a\x01" + mode.to_bytes(2, "big") + bandwidth.to_bytes(2, "big") + control_sensitivity.to_bytes(2, "big") + low_level_sensitivity.to_bytes(2, "big"), None, None
|
|
@_command_wrapper
|
|
def AM_Set_Bandwidth(self, bandwidth: int = 40):
|
|
return b"\x21\x0a\x01\x00\x00" + bandwidth.to_bytes(2, "big"), None, None
|
|
@_command_wrapper
|
|
def FM_Set_RFAGC(self, start: int = 920, extension: bool = False):
|
|
return b"\x20\x0b\x01" + start.to_bytes(2, "big") + extension.to_bytes(2, "big"), None, None
|
|
@_command_wrapper
|
|
def AM_Set_RFAGC(self, start: int = 1000):
|
|
return b"\x21\x0b\x01" + start.to_bytes(2, "big"), None, None
|
|
@_command_wrapper
|
|
def AM_Set_Antenna(self, attenuation: int = 0):
|
|
return b"\x21\x0c\x01" + attenuation.to_bytes(2, "big"), None, None
|
|
@_command_wrapper
|
|
def AM_Set_CoChannelDet(self, mode: bool = True, restart: int = 2, sensitivity: int = 1000, count: int = 3):
|
|
return b"\x21\x0e\x01" + mode.to_bytes(2, "big") + restart.to_bytes(2, "big") + sensitivity.to_bytes(2, "big") + count.to_bytes(2, "big"), None, None
|
|
@_command_wrapper
|
|
def FM_Set_MphSuppression(self, mode: bool = False):
|
|
return b"\x20\x14\x01" + mode.to_bytes(2, "big"), None, None
|
|
@_command_wrapper
|
|
def FM_Set_ChannelEqualizer(self, mode: bool = False):
|
|
return b"\x20\x16\x01" + mode.to_bytes(2, "big"), None, None
|
|
@_command_wrapper
|
|
def FM_Set_NoiseBlanker(self, mode: bool = True, sensitivity: int = 1000):
|
|
return b"\x20\x17\x01" + mode.to_bytes(2, "big") + sensitivity.to_bytes(2, "big"), None, None
|
|
@_command_wrapper
|
|
def AM_Set_NoiseBlanker(self, mode: bool = True, sensitivity: int = 1000):
|
|
return b"\x21\x17\x01" + mode.to_bytes(2, "big") + sensitivity.to_bytes(2, "big"), None, None
|
|
@_command_wrapper
|
|
def AM_Set_NoiseBlanker_Audio(self, mode: bool = True, sensitivity: int = 1000):
|
|
return b"\x21\x18\x01" + mode.to_bytes(2, "big") + sensitivity.to_bytes(2, "big"), None, None
|
|
@_command_wrapper
|
|
def FM_Set_Deemphasis(self, timeconstant: int = 500):
|
|
return b"\x20\x1f\x01" + timeconstant.to_bytes(2, "big"), None, None
|
|
@_command_wrapper
|
|
def FM_Set_LevelStep(self, step1: int = -20, step2: int = -30, step3: int = -40, step4: int = -50, step5: int = -60, step6: int = -60, step7: int = -60):
|
|
return b"\x20\x26\x01" + step1.to_bytes(2, "big", signed=True) + step2.to_bytes(2, "big", signed=True) \
|
|
+ step3.to_bytes(2, "big", signed=True) + step4.to_bytes(2, "big", signed=True) + step5.to_bytes(2, "big", signed=True) + \
|
|
step6.to_bytes(2, "big", signed=True) + step7.to_bytes(2, "big", signed=True), None, None
|
|
@_command_wrapper
|
|
def AM_Set_LevelStep(self, step1: int = -10, step2: int = -20, step3: int = -30, step4: int = -40, step5: int = -50, step6: int = -60, step7: int = -60):
|
|
return b"\x21\x26\x01" + step1.to_bytes(2, "big", signed=True) + step2.to_bytes(2, "big", signed=True) \
|
|
+ step3.to_bytes(2, "big", signed=True) + step4.to_bytes(2, "big", signed=True) + step5.to_bytes(2, "big", signed=True) + \
|
|
step6.to_bytes(2, "big", signed=True) + step7.to_bytes(2, "big", signed=True), None, None
|
|
@_command_wrapper
|
|
def FM_Set_LevelOffset(self, offset: int = 0):
|
|
return b"\x20\x27\x01" + offset.to_bytes(2, "big", signed=True), None, None
|
|
@_command_wrapper
|
|
def AM_Set_LevelOffset(self, offset: int = 0):
|
|
return b"\x21\x27\x01" + offset.to_bytes(2, "big", signed=True), None, None
|
|
@_command_wrapper
|
|
def FM_Set_RDS(self, mode: int = 0, restart: int = 2, interface: int = 0):
|
|
return b"\x20\x51\x01" + mode.to_bytes(2, "big") + restart.to_bytes(2, "big") + interface.to_bytes(2, "big"), None, None
|
|
@_command_wrapper
|
|
def FM_Set_Specials(self, mode: int = 0):
|
|
return b"\x20\x55\x01" + mode.to_bytes(2, "big"), None, None
|
|
@_command_wrapper
|
|
def AUDIO_Set_Volume(self, volume: int = 0):
|
|
return b"\x30\x0a\x01" + volume.to_bytes(2, "big", signed=True), None, None
|
|
@_command_wrapper
|
|
def AUDIO_Set_Mute(self, mute: bool = True):
|
|
return b"\x30\x0b\x01" + int(mute).to_bytes(2, "big"), None, None
|
|
@_command_wrapper
|
|
def AUDIO_Set_Input(self, source: int = 0):
|
|
return b"\x30\x0c\x01" + source.to_bytes(2, "big"), None, None
|
|
@_command_wrapper
|
|
def AUDIO_Set_Output_Source(self, signal: int, source: int = 224):
|
|
return b"\x30\x0d\x01" + signal.to_bytes(2, "big") + source.to_bytes(2, "big"), None, None
|
|
@_command_wrapper
|
|
def AUDIO_Set_Ana_Out(self, signal: int, mode: bool = True):
|
|
return b"\x30\x15\x01" + signal.to_bytes(2, "big") + mode.to_bytes(2, "big"), None, None
|
|
@_command_wrapper
|
|
def AUDIO_Set_WaveGen(self, mode: int = 0, offset: int = 0, amplitude1: int = -200, frequency1: int = 400, amplitude2: int = -200, frequency2: int = 1000):
|
|
return b"\x30\x18\x01" + mode.to_bytes(2, "big") + offset.to_bytes(2, "big") + amplitude1.to_bytes(2, "big", signed=True) + frequency1.to_bytes(2, "big") + amplitude2.to_bytes(2, "big", signed=True) + frequency2.to_bytes(2, "big"), None, None
|
|
@_command_wrapper
|
|
def APPL_Set_OperationMode(self, mode: bool = True):
|
|
return b"\x40\x01\x01" + mode.to_bytes(2, "big"), None, None
|
|
@_command_wrapper
|
|
def FM_Set_Stereo_Min(self, mode: int = 0, limit: int = 400):
|
|
return b"\x20\x42\x01" + mode.to_bytes(2, "big") + limit.to_bytes(2, "big"), None, None
|
|
|
|
@staticmethod
|
|
def _get_quality_data(data) -> tuple[None, None, None, None, None, None, None] | tuple[int, int, int, int, int, int, int]:
|
|
if data[0] != 0: return None, None, None, None, None, None, None
|
|
status = (data[1] << 8) | data[2]
|
|
level = int.from_bytes(bytes([data[3], data[4]]), "big", signed=True)
|
|
usn = (data[5] << 8) | data[6] # "noise" on AM
|
|
wam = (data[7] << 8) | data[8] # "co-channel" on AM
|
|
offset = int.from_bytes(bytes([data[9], data[10]]), "big", signed=True)
|
|
bandwidth = (data[11] << 8) | data[12]
|
|
modulation = (data[13] << 8) | data[14]
|
|
return status, level, usn, wam, offset, bandwidth, modulation
|
|
@_command_wrapper
|
|
def FM_Get_Quality_Status(self):
|
|
return b"\x20\x80\x01", 14, self._get_quality_data
|
|
@_command_wrapper
|
|
def FM_Get_Quality_Data(self):
|
|
return b"\x20\x81\x01", 14, self._get_quality_data
|
|
@_command_wrapper
|
|
def AM_Get_Quality_Status(self):
|
|
return b"\x21\x80\x01", 14, self._get_quality_data
|
|
@_command_wrapper
|
|
def AM_Get_Quality_Data(self):
|
|
return b"\x21\x81\x01", 14, self._get_quality_data
|
|
|
|
@staticmethod
|
|
def _get_rds_data_proc_decoder(data) -> tuple[None, None, None, None, None, None] | tuple[int, int, int, int, int, int]:
|
|
if data[0] != 0: return None, None, None, None, None, None
|
|
status = (data[1] << 8) | data[2]
|
|
A_block = (data[3] << 8) | data[4]
|
|
B_block = (data[5] << 8) | data[6]
|
|
C_block = (data[7] << 8) | data[8]
|
|
D_block = (data[9] << 8) | data[10]
|
|
dec_error = (data[11] << 8) | data[12]
|
|
return status, A_block, B_block, C_block, D_block, dec_error
|
|
@staticmethod
|
|
def _get_rds_data_proc_demodulator(data):
|
|
if data[0] != 0: return None, None, None
|
|
status = (data[1] << 8) | data[2]
|
|
raw_data_high = (data[3] << 8) | data[4]
|
|
raw_data_low = (data[5] << 8) | data[6]
|
|
return status, (raw_data_high << 16) | raw_data_low
|
|
@_command_wrapper
|
|
def FM_Get_RDS_Status__decoder(self):
|
|
return b"\x20\x82\x01", 12, self._get_rds_data_proc_decoder
|
|
@_command_wrapper
|
|
def FM_Get_RDS_Data__decoder(self):
|
|
return b"\x20\x83\x01", 12, self._get_rds_data_proc_decoder
|
|
@_command_wrapper
|
|
def FM_Get_RDS_Status__demodulator(self):
|
|
return b"\x20\x82\x01", 12, self._get_rds_data_proc_demodulator
|
|
@_command_wrapper
|
|
def FM_Get_RDS_Data__demodulator(self):
|
|
return b"\x20\x83\x01", 12, self._get_rds_data_proc_demodulator
|
|
|
|
@_command_wrapper
|
|
def FM_Get_AGC(self):
|
|
"""
|
|
Returns None if there was a I2C error, or a tuple of booleans (FM Stereo/Mono and Analog/Digital radio [DR models])
|
|
"""
|
|
def proc(data):
|
|
if data[0] != 0: return None
|
|
input_att = (data[1] << 8) | data[2]
|
|
feedback_att = (data[3] << 8) | data[4]
|
|
return input_att, feedback_att
|
|
return b"\x20\x84\x01", 4, proc
|
|
@_command_wrapper
|
|
def FM_Get_Signal_Status(self):
|
|
"""
|
|
Returns None if there was a I2C error, or a tuple of booleans (FM Stereo/Mono and Analog/Digital radio [DR models])
|
|
"""
|
|
def proc(data):
|
|
if data[0] != 0: return None
|
|
value = (data[-2] << 8) | data[-1]
|
|
return (value & (1 << 15)) != 0, (value & (1 << 14)) != 0
|
|
return b"\x20\x85\x01", 2, proc
|
|
@_command_wrapper
|
|
def FM_Get_Processing_Status(self):
|
|
def proc(data):
|
|
if data[0] != 0: return None, None, None, None
|
|
softmute = (data[1] << 8) | data[2]
|
|
highcut = (data[3] << 8) | data[4]
|
|
stereo = (data[5] << 8) | data[6]
|
|
sthiblend = (data[7] << 8) | data[8]
|
|
return softmute, highcut, stereo, sthiblend
|
|
return b"\x20\x86\x01", 2*4, proc
|
|
@_command_wrapper
|
|
def APPL_Get_Identification(self):
|
|
def proc(data):
|
|
if data[0] != 0: return None
|
|
device = (data[1] << 8) | data[2]
|
|
hw_version = (data[3] << 8) | data[4]
|
|
sw_version = (data[5] << 8) | data[6]
|
|
return device, hw_version, sw_version
|
|
return b"\x40\x82\x01", 6, proc |