diff --git a/base_tef.py b/base_tef.py index 31ef581..48bc606 100644 --- a/base_tef.py +++ b/base_tef.py @@ -24,7 +24,7 @@ class BaseTEF668X: def send_patch(_patch: bytes): for i in range(0, len(_patch), 24): data = _patch[i:i+24] - if self.p.write_i2c(ADDRESS, b"\x1b" + data)[1] != 0: raise Exception + if self.p.write_i2c(ADDRESS, b"\x1b" + data)[0] != 0: raise Exception send_patch(bytes(patch)) self.p.write_i2c(ADDRESS, b"\x1c\x00\x00") self.p.write_i2c(ADDRESS, b"\x1c\x00\x75") @@ -33,7 +33,7 @@ class BaseTEF668X: def APPL_Get_Operation_Status(self): data = self.p.write_read_i2c(ADDRESS, b"\x40\x80\x01", 2) - while data[1] != 0: + while data[0] != 0: data = self.p.write_read_i2c(ADDRESS, b"\x40\x80\x01", 2) time.sleep(0.01) return data[-1] diff --git a/protocol.py b/protocol.py index 2fdb9f0..1e6240e 100644 --- a/protocol.py +++ b/protocol.py @@ -49,8 +49,8 @@ class I2CPCClient: body = resp_len_raw + response[:-1] if crc8(body) != received_crc: raise ValueError("CRC mismatch") response = response[:-1] - - return response + if response[0] == 0xff and resp_len == 1: raise Exception(f"Error from device: {response[1]}") + return response[1:] def set_clock(self, clock_hz: int): @@ -59,18 +59,18 @@ class I2CPCClient: def write_i2c(self, addr: int, data: bytes): payload = bytes([1, addr]) + data - return self._send_packet(payload) + return self._send_packet(payload, False) def write_read_i2c(self, addr: int, write_data: bytes, read_len: int): payload = bytes([2, addr, len(write_data)]) + write_data + bytes([read_len]) - return self._send_packet(payload) + return self._send_packet(payload, False) def write_eeprom(self, addr: int, data: bytes): payload = bytes([7, (addr >> 8) & 0xff, addr & 0xff]) + data - return self._send_packet(payload) + return self._send_packet(payload, False) def read_eeprom(self, addr: int, len: int): payload = bytes([8, (addr >> 8) & 0xff, addr & 0xff, len]) - return self._send_packet(payload) + return self._send_packet(payload, False) def version(self): return self._send_packet(bytes([4])) diff --git a/tef.py b/tef.py index 75128b0..1b57b31 100644 --- a/tef.py +++ b/tef.py @@ -110,17 +110,20 @@ class TEF6686(BaseTEF668X): @_command_wrapper def APPL_Set_OperationMode(self, mode: bool = True): return b"\x40\x01\x01" + mode.to_bytes(2, "big"), None, None + @_command_wrapper + def FM_Set_Stereo_Min(self, mode: int = 0, limit: int = 400): + return b"\x20\x42\x01" + mode.to_bytes(2, "big") + limit.to_bytes(2, "big"), None, None @staticmethod def _get_quality_data(data) -> tuple[None, None, None, None, None, None, None] | tuple[int, int, int, int, int, int, int]: - if data[1] != 0: return None, None, None, None, None, None, None - status = (data[2] << 8) | data[3] - level = int.from_bytes(bytes([data[4], data[5]]), "big", signed=True) - usn = (data[6] << 8) | data[7] # "noise" on AM - wam = (data[8] << 8) | data[9] # "co-channel" on AM - offset = int.from_bytes(bytes([data[10], data[11]]), "big", signed=True) - bandwidth = (data[12] << 8) | data[13] - modulation = (data[14] << 8) | data[15] + if data[0] != 0: return None, None, None, None, None, None, None + status = (data[1] << 8) | data[2] + level = int.from_bytes(bytes([data[3], data[4]]), "big", signed=True) + usn = (data[5] << 8) | data[6] # "noise" on AM + wam = (data[7] << 8) | data[8] # "co-channel" on AM + offset = int.from_bytes(bytes([data[9], data[10]]), "big", signed=True) + bandwidth = (data[11] << 8) | data[12] + modulation = (data[13] << 8) | data[14] return status, level, usn, wam, offset, bandwidth, modulation @_command_wrapper def FM_Get_Quality_Status(self): @@ -136,21 +139,21 @@ class TEF6686(BaseTEF668X): return b"\x21\x81\x01", 14, self._get_quality_data @staticmethod - def _get_rds_data_proc_decoder(data): - if data[1] != 0: return None, None, None, None, None, None - status = (data[2] << 8) | data[3] - A_block = (data[4] << 8) | data[5] - B_block = (data[6] << 8) | data[7] - C_block = (data[8] << 8) | data[9] - D_block = (data[10] << 8) | data[11] - dec_error = (data[12] << 8) | data[13] + def _get_rds_data_proc_decoder(data) -> tuple[None, None, None, None, None, None] | tuple[int, int, int, int, int, int]: + if data[0] != 0: return None, None, None, None, None, None + status = (data[1] << 8) | data[2] + A_block = (data[3] << 8) | data[4] + B_block = (data[5] << 8) | data[6] + C_block = (data[7] << 8) | data[8] + D_block = (data[9] << 8) | data[10] + dec_error = (data[11] << 8) | data[12] return status, A_block, B_block, C_block, D_block, dec_error @staticmethod def _get_rds_data_proc_demodulator(data): - if data[1] != 0: return None, None, None - status = (data[2] << 8) | data[3] - raw_data_high = (data[4] << 8) | data[5] - raw_data_low = (data[6] << 8) | data[7] + if data[0] != 0: return None, None, None + status = (data[1] << 8) | data[2] + raw_data_high = (data[3] << 8) | data[4] + raw_data_low = (data[5] << 8) | data[6] return status, (raw_data_high << 16) | raw_data_low @_command_wrapper def FM_Get_RDS_Status__decoder(self): @@ -171,9 +174,9 @@ class TEF6686(BaseTEF668X): Returns None if there was a I2C error, or a tuple of booleans (FM Stereo/Mono and Analog/Digital radio [DR models]) """ def proc(data): - if data[1] != 0: return None - input_att = (data[2] << 8) | data[3] - feedback_att = (data[4] << 8) | data[5] + if data[0] != 0: return None + input_att = (data[1] << 8) | data[2] + feedback_att = (data[3] << 8) | data[4] return input_att, feedback_att return b"\x20\x84\x01", 4, proc @_command_wrapper @@ -182,16 +185,26 @@ class TEF6686(BaseTEF668X): Returns None if there was a I2C error, or a tuple of booleans (FM Stereo/Mono and Analog/Digital radio [DR models]) """ def proc(data): - if data[1] != 0: return None + if data[0] != 0: return None value = (data[-2] << 8) | data[-1] return (value & (1 << 15)) != 0, (value & (1 << 14)) != 0 return b"\x20\x85\x01", 2, proc @_command_wrapper + def FM_Get_Processing_Status(self): + def proc(data): + if data[0] != 0: return None, None, None, None + softmute = (data[1] << 8) | data[2] + highcut = (data[3] << 8) | data[4] + stereo = (data[5] << 8) | data[6] + sthiblend = (data[7] << 8) | data[8] + return softmute, highcut, stereo, sthiblend + return b"\x20\x86\x01", 2*4, proc + @_command_wrapper def APPL_Get_Identification(self): def proc(data): - if data[1] != 0: return None - device = (data[2] << 8) | data[3] - hw_version = (data[4] << 8) | data[5] - sw_version = (data[6] << 8) | data[7] + if data[0] != 0: return None + device = (data[1] << 8) | data[2] + hw_version = (data[3] << 8) | data[4] + sw_version = (data[5] << 8) | data[6] return device, hw_version, sw_version return b"\x40\x82\x01", 6, proc \ No newline at end of file diff --git a/test.py b/test.py index a45195a..95e209a 100644 --- a/test.py +++ b/test.py @@ -5,12 +5,12 @@ p = I2CPCClient("COM17") print(p._send_packet(bytes([4]))) tef = TEF6686(p) tef.init() -tef.FM_Tune_To(1, 9500) +tef.AM_Tune_To(1, 225) tef.AUDIO_Set_Mute(False) tef.AUDIO_Set_Volume(70) tef.FM_Set_MphSuppression(True) tef.FM_Set_ChannelEqualizer(True) -tef.FM_Set_Specials(1) +# tef.FM_Set_Specials(1) tef.FM_Set_Bandwidth(True) import time time.sleep(0.032) diff --git a/xrd.py b/xrd.py new file mode 100644 index 0000000..ae123df --- /dev/null +++ b/xrd.py @@ -0,0 +1,171 @@ +# This is a compatible server for the TEF6686 firmware, but instead using the i2c control mode +# This fully works with FM-DX-Webserver (no plugins) + +import socket +import hashlib +import secrets +import string +import libtimer +import time +from tef import TEF6686 +from protocol import I2CPCClient + +HOST = '0.0.0.0' +PORT = 7373 +PASSWORD = "test" + +SALT_LENGTH = 16 +SS_UPDATE_INTERVAL = 0.125 +RDS_UPDATE_INTERVAL = 0.086 + +def generate_salt(length=SALT_LENGTH): + return "".join(secrets.choice(string.ascii_lowercase) for _ in range(length)) + +def compute_hash(salt, password): + return hashlib.sha1((salt + password).encode()).hexdigest().encode() + +def init_tef(): + p = I2CPCClient("COM17") + tef = TEF6686(p) + tef.init() + tef.AUDIO_Set_Mute(False) + tef.AUDIO_Set_Volume(60) + tef.FM_Tune_To(1, 9500) + tef.FM_Set_RDS(1) + tef.FM_Set_ChannelEqualizer(True) + tef.FM_Set_MphSuppression(True) + return tef + +def authenticate(conn): + salt = generate_salt() + conn.sendall(salt.encode() + b"\n") + expected_hash = compute_hash(salt, PASSWORD) + + while True: + data = conn.recv(1024) + if not data: return False + if data.strip() == expected_hash.strip(): return True + +def process_command(tef: TEF6686, data: bytes, state): + if data.startswith(b"T"): + freq = int(data.decode().removeprefix("T").strip()) // 10 + state['last_tune'] = freq + tef.FM_Tune_To(1, freq) + return f"T{freq*10}\n".encode() + elif data.startswith(b"G"): + eqims = int(data.decode().removeprefix("G").strip(), 2) + state['last_eqims'] = eqims + tef.FM_Set_ChannelEqualizer((eqims & 1) == 1) + tef.FM_Set_MphSuppression((eqims & 2) == 2) + return f"G{bin(eqims).removeprefix('0b').zfill(2)}\n".encode() + elif data.startswith(b"B"): + mono = bool(int(data.decode().removeprefix("B").strip(), 2)) + state['forced_mono'] = mono + tef.FM_Set_Stereo_Min(2 if mono else 0) + return f"B{int(mono)}\n".encode() + elif data.startswith(b"D"): + deemp = int(data.decode().removeprefix("D").strip()) + time = 500 if deemp == 0 else (750 if deemp == 1 else 0) + state['deemp'] = deemp + tef.FM_Set_Deemphasis(time) + return f"D{deemp}\n".encode() + + if data.startswith(b"x"): + return b"OK" + + return None + +def send_signal_status(tef: TEF6686, conn, state): + _, _, stereo, _ = tef.FM_Get_Processing_Status() + stereo = stereo if stereo is not None else 1000 + + status, level, usn, wam, _, bandwidth, _ = tef.FM_Get_Quality_Data() + if status is None or level is None or wam is None or usn is None or bandwidth is None: return + + level = level / 10 + + data = b"S" + if state['forced_mono']: data += b"M" + elif stereo < 500: data += b"s" + else: data += b"m" + + data += f"{level},{wam//10},{usn//10},{bandwidth}\n\n".encode() + conn.sendall(data) + +def send_rds_data(tef, conn): + status, A, B, C, D, dec_error = tef.FM_Get_RDS_Data__decoder() + if None in (status, A, B, C, D, dec_error): return + + if (status & (1 << 9) == 0) or (status & (1 << 15) == 0): return + + err = 0 + err |= ((dec_error >> 8) & 0x30) >> 4 + err |= ((dec_error >> 8) & 0xC) + err |= ((dec_error >> 8) & 3) << 4 + + data = b"R" + data += f"{B:04X}{C:04X}{D:04X}{err:02X}".encode() + + pi_error = dec_error >> 14 + if pi_error < 3: + data += b"\nP" + f"{A:04X}".encode() + data += b"?" * pi_error + b"\n" + elif status & (1 << 12): + pi_error = (dec_error >> 10) & 0b11 + if pi_error < 3: + data += b"\nP" + f"{C:04X}".encode() + data += b"?" * pi_error + b"\n" + + conn.sendall(data) + +def run_server(): + tef = init_tef() + state = { + 'last_tune': 9500, + 'last_eqims': 0b11, + 'forced_mono': False, + 'deemp': 0 + } + + try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind((HOST, PORT)) + s.listen() + print(f"Server listening on {HOST}:{PORT}") + + while True: + conn, addr = s.accept() + with conn: + print(f"Connected by {addr}") + if not authenticate(conn): + print("Authentication failed.") + continue + + # Send initial state + conn.sendall(f"T{state['last_tune']*10}\n".encode()) + conn.sendall(f"G{bin(state['last_eqims']).removeprefix('0b').zfill(2)}\n".encode()) + conn.sendall(f"B{int(state['forced_mono'])}\n".encode()) + conn.sendall(f"D{state['deemp']}\n".encode()) + conn.setblocking(False) + + ss_timer = libtimer.Timer() + rds_timer = libtimer.Timer() + + while True: + try: + data = conn.recv(1024) + if not data: break + resp = process_command(tef, data, state) + if resp: conn.sendall(resp) + except ConnectionResetError: break + except BlockingIOError: + if ss_timer.get_time() > SS_UPDATE_INTERVAL: + send_signal_status(tef, conn, state) + ss_timer.reset() + if rds_timer.get_time() > RDS_UPDATE_INTERVAL: + send_rds_data(tef, conn) + rds_timer.reset() + time.sleep(0.01) + finally: tef.close() + +if __name__ == "__main__": run_server() \ No newline at end of file