You've already forked TEF6686_Driver
xrd protocol
This commit is contained in:
@@ -24,7 +24,7 @@ class BaseTEF668X:
|
|||||||
def send_patch(_patch: bytes):
|
def send_patch(_patch: bytes):
|
||||||
for i in range(0, len(_patch), 24):
|
for i in range(0, len(_patch), 24):
|
||||||
data = _patch[i:i+24]
|
data = _patch[i:i+24]
|
||||||
if self.p.write_i2c(ADDRESS, b"\x1b" + data)[1] != 0: raise Exception
|
if self.p.write_i2c(ADDRESS, b"\x1b" + data)[0] != 0: raise Exception
|
||||||
send_patch(bytes(patch))
|
send_patch(bytes(patch))
|
||||||
self.p.write_i2c(ADDRESS, b"\x1c\x00\x00")
|
self.p.write_i2c(ADDRESS, b"\x1c\x00\x00")
|
||||||
self.p.write_i2c(ADDRESS, b"\x1c\x00\x75")
|
self.p.write_i2c(ADDRESS, b"\x1c\x00\x75")
|
||||||
@@ -33,7 +33,7 @@ class BaseTEF668X:
|
|||||||
|
|
||||||
def APPL_Get_Operation_Status(self):
|
def APPL_Get_Operation_Status(self):
|
||||||
data = self.p.write_read_i2c(ADDRESS, b"\x40\x80\x01", 2)
|
data = self.p.write_read_i2c(ADDRESS, b"\x40\x80\x01", 2)
|
||||||
while data[1] != 0:
|
while data[0] != 0:
|
||||||
data = self.p.write_read_i2c(ADDRESS, b"\x40\x80\x01", 2)
|
data = self.p.write_read_i2c(ADDRESS, b"\x40\x80\x01", 2)
|
||||||
time.sleep(0.01)
|
time.sleep(0.01)
|
||||||
return data[-1]
|
return data[-1]
|
||||||
|
|||||||
12
protocol.py
12
protocol.py
@@ -49,8 +49,8 @@ class I2CPCClient:
|
|||||||
body = resp_len_raw + response[:-1]
|
body = resp_len_raw + response[:-1]
|
||||||
if crc8(body) != received_crc: raise ValueError("CRC mismatch")
|
if crc8(body) != received_crc: raise ValueError("CRC mismatch")
|
||||||
response = response[:-1]
|
response = response[:-1]
|
||||||
|
if response[0] == 0xff and resp_len == 1: raise Exception(f"Error from device: {response[1]}")
|
||||||
return response
|
return response[1:]
|
||||||
|
|
||||||
|
|
||||||
def set_clock(self, clock_hz: int):
|
def set_clock(self, clock_hz: int):
|
||||||
@@ -59,18 +59,18 @@ class I2CPCClient:
|
|||||||
|
|
||||||
def write_i2c(self, addr: int, data: bytes):
|
def write_i2c(self, addr: int, data: bytes):
|
||||||
payload = bytes([1, addr]) + data
|
payload = bytes([1, addr]) + data
|
||||||
return self._send_packet(payload)
|
return self._send_packet(payload, False)
|
||||||
|
|
||||||
def write_read_i2c(self, addr: int, write_data: bytes, read_len: int):
|
def write_read_i2c(self, addr: int, write_data: bytes, read_len: int):
|
||||||
payload = bytes([2, addr, len(write_data)]) + write_data + bytes([read_len])
|
payload = bytes([2, addr, len(write_data)]) + write_data + bytes([read_len])
|
||||||
return self._send_packet(payload)
|
return self._send_packet(payload, False)
|
||||||
|
|
||||||
def write_eeprom(self, addr: int, data: bytes):
|
def write_eeprom(self, addr: int, data: bytes):
|
||||||
payload = bytes([7, (addr >> 8) & 0xff, addr & 0xff]) + data
|
payload = bytes([7, (addr >> 8) & 0xff, addr & 0xff]) + data
|
||||||
return self._send_packet(payload)
|
return self._send_packet(payload, False)
|
||||||
def read_eeprom(self, addr: int, len: int):
|
def read_eeprom(self, addr: int, len: int):
|
||||||
payload = bytes([8, (addr >> 8) & 0xff, addr & 0xff, len])
|
payload = bytes([8, (addr >> 8) & 0xff, addr & 0xff, len])
|
||||||
return self._send_packet(payload)
|
return self._send_packet(payload, False)
|
||||||
|
|
||||||
def version(self): return self._send_packet(bytes([4]))
|
def version(self): return self._send_packet(bytes([4]))
|
||||||
|
|
||||||
|
|||||||
69
tef.py
69
tef.py
@@ -110,17 +110,20 @@ class TEF6686(BaseTEF668X):
|
|||||||
@_command_wrapper
|
@_command_wrapper
|
||||||
def APPL_Set_OperationMode(self, mode: bool = True):
|
def APPL_Set_OperationMode(self, mode: bool = True):
|
||||||
return b"\x40\x01\x01" + mode.to_bytes(2, "big"), None, None
|
return b"\x40\x01\x01" + mode.to_bytes(2, "big"), None, None
|
||||||
|
@_command_wrapper
|
||||||
|
def FM_Set_Stereo_Min(self, mode: int = 0, limit: int = 400):
|
||||||
|
return b"\x20\x42\x01" + mode.to_bytes(2, "big") + limit.to_bytes(2, "big"), None, None
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _get_quality_data(data) -> tuple[None, None, None, None, None, None, None] | tuple[int, int, int, int, int, int, int]:
|
def _get_quality_data(data) -> tuple[None, None, None, None, None, None, None] | tuple[int, int, int, int, int, int, int]:
|
||||||
if data[1] != 0: return None, None, None, None, None, None, None
|
if data[0] != 0: return None, None, None, None, None, None, None
|
||||||
status = (data[2] << 8) | data[3]
|
status = (data[1] << 8) | data[2]
|
||||||
level = int.from_bytes(bytes([data[4], data[5]]), "big", signed=True)
|
level = int.from_bytes(bytes([data[3], data[4]]), "big", signed=True)
|
||||||
usn = (data[6] << 8) | data[7] # "noise" on AM
|
usn = (data[5] << 8) | data[6] # "noise" on AM
|
||||||
wam = (data[8] << 8) | data[9] # "co-channel" on AM
|
wam = (data[7] << 8) | data[8] # "co-channel" on AM
|
||||||
offset = int.from_bytes(bytes([data[10], data[11]]), "big", signed=True)
|
offset = int.from_bytes(bytes([data[9], data[10]]), "big", signed=True)
|
||||||
bandwidth = (data[12] << 8) | data[13]
|
bandwidth = (data[11] << 8) | data[12]
|
||||||
modulation = (data[14] << 8) | data[15]
|
modulation = (data[13] << 8) | data[14]
|
||||||
return status, level, usn, wam, offset, bandwidth, modulation
|
return status, level, usn, wam, offset, bandwidth, modulation
|
||||||
@_command_wrapper
|
@_command_wrapper
|
||||||
def FM_Get_Quality_Status(self):
|
def FM_Get_Quality_Status(self):
|
||||||
@@ -136,21 +139,21 @@ class TEF6686(BaseTEF668X):
|
|||||||
return b"\x21\x81\x01", 14, self._get_quality_data
|
return b"\x21\x81\x01", 14, self._get_quality_data
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _get_rds_data_proc_decoder(data):
|
def _get_rds_data_proc_decoder(data) -> tuple[None, None, None, None, None, None] | tuple[int, int, int, int, int, int]:
|
||||||
if data[1] != 0: return None, None, None, None, None, None
|
if data[0] != 0: return None, None, None, None, None, None
|
||||||
status = (data[2] << 8) | data[3]
|
status = (data[1] << 8) | data[2]
|
||||||
A_block = (data[4] << 8) | data[5]
|
A_block = (data[3] << 8) | data[4]
|
||||||
B_block = (data[6] << 8) | data[7]
|
B_block = (data[5] << 8) | data[6]
|
||||||
C_block = (data[8] << 8) | data[9]
|
C_block = (data[7] << 8) | data[8]
|
||||||
D_block = (data[10] << 8) | data[11]
|
D_block = (data[9] << 8) | data[10]
|
||||||
dec_error = (data[12] << 8) | data[13]
|
dec_error = (data[11] << 8) | data[12]
|
||||||
return status, A_block, B_block, C_block, D_block, dec_error
|
return status, A_block, B_block, C_block, D_block, dec_error
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _get_rds_data_proc_demodulator(data):
|
def _get_rds_data_proc_demodulator(data):
|
||||||
if data[1] != 0: return None, None, None
|
if data[0] != 0: return None, None, None
|
||||||
status = (data[2] << 8) | data[3]
|
status = (data[1] << 8) | data[2]
|
||||||
raw_data_high = (data[4] << 8) | data[5]
|
raw_data_high = (data[3] << 8) | data[4]
|
||||||
raw_data_low = (data[6] << 8) | data[7]
|
raw_data_low = (data[5] << 8) | data[6]
|
||||||
return status, (raw_data_high << 16) | raw_data_low
|
return status, (raw_data_high << 16) | raw_data_low
|
||||||
@_command_wrapper
|
@_command_wrapper
|
||||||
def FM_Get_RDS_Status__decoder(self):
|
def FM_Get_RDS_Status__decoder(self):
|
||||||
@@ -171,9 +174,9 @@ class TEF6686(BaseTEF668X):
|
|||||||
Returns None if there was a I2C error, or a tuple of booleans (FM Stereo/Mono and Analog/Digital radio [DR models])
|
Returns None if there was a I2C error, or a tuple of booleans (FM Stereo/Mono and Analog/Digital radio [DR models])
|
||||||
"""
|
"""
|
||||||
def proc(data):
|
def proc(data):
|
||||||
if data[1] != 0: return None
|
if data[0] != 0: return None
|
||||||
input_att = (data[2] << 8) | data[3]
|
input_att = (data[1] << 8) | data[2]
|
||||||
feedback_att = (data[4] << 8) | data[5]
|
feedback_att = (data[3] << 8) | data[4]
|
||||||
return input_att, feedback_att
|
return input_att, feedback_att
|
||||||
return b"\x20\x84\x01", 4, proc
|
return b"\x20\x84\x01", 4, proc
|
||||||
@_command_wrapper
|
@_command_wrapper
|
||||||
@@ -182,16 +185,26 @@ class TEF6686(BaseTEF668X):
|
|||||||
Returns None if there was a I2C error, or a tuple of booleans (FM Stereo/Mono and Analog/Digital radio [DR models])
|
Returns None if there was a I2C error, or a tuple of booleans (FM Stereo/Mono and Analog/Digital radio [DR models])
|
||||||
"""
|
"""
|
||||||
def proc(data):
|
def proc(data):
|
||||||
if data[1] != 0: return None
|
if data[0] != 0: return None
|
||||||
value = (data[-2] << 8) | data[-1]
|
value = (data[-2] << 8) | data[-1]
|
||||||
return (value & (1 << 15)) != 0, (value & (1 << 14)) != 0
|
return (value & (1 << 15)) != 0, (value & (1 << 14)) != 0
|
||||||
return b"\x20\x85\x01", 2, proc
|
return b"\x20\x85\x01", 2, proc
|
||||||
@_command_wrapper
|
@_command_wrapper
|
||||||
|
def FM_Get_Processing_Status(self):
|
||||||
|
def proc(data):
|
||||||
|
if data[0] != 0: return None, None, None, None
|
||||||
|
softmute = (data[1] << 8) | data[2]
|
||||||
|
highcut = (data[3] << 8) | data[4]
|
||||||
|
stereo = (data[5] << 8) | data[6]
|
||||||
|
sthiblend = (data[7] << 8) | data[8]
|
||||||
|
return softmute, highcut, stereo, sthiblend
|
||||||
|
return b"\x20\x86\x01", 2*4, proc
|
||||||
|
@_command_wrapper
|
||||||
def APPL_Get_Identification(self):
|
def APPL_Get_Identification(self):
|
||||||
def proc(data):
|
def proc(data):
|
||||||
if data[1] != 0: return None
|
if data[0] != 0: return None
|
||||||
device = (data[2] << 8) | data[3]
|
device = (data[1] << 8) | data[2]
|
||||||
hw_version = (data[4] << 8) | data[5]
|
hw_version = (data[3] << 8) | data[4]
|
||||||
sw_version = (data[6] << 8) | data[7]
|
sw_version = (data[5] << 8) | data[6]
|
||||||
return device, hw_version, sw_version
|
return device, hw_version, sw_version
|
||||||
return b"\x40\x82\x01", 6, proc
|
return b"\x40\x82\x01", 6, proc
|
||||||
4
test.py
4
test.py
@@ -5,12 +5,12 @@ p = I2CPCClient("COM17")
|
|||||||
print(p._send_packet(bytes([4])))
|
print(p._send_packet(bytes([4])))
|
||||||
tef = TEF6686(p)
|
tef = TEF6686(p)
|
||||||
tef.init()
|
tef.init()
|
||||||
tef.FM_Tune_To(1, 9500)
|
tef.AM_Tune_To(1, 225)
|
||||||
tef.AUDIO_Set_Mute(False)
|
tef.AUDIO_Set_Mute(False)
|
||||||
tef.AUDIO_Set_Volume(70)
|
tef.AUDIO_Set_Volume(70)
|
||||||
tef.FM_Set_MphSuppression(True)
|
tef.FM_Set_MphSuppression(True)
|
||||||
tef.FM_Set_ChannelEqualizer(True)
|
tef.FM_Set_ChannelEqualizer(True)
|
||||||
tef.FM_Set_Specials(1)
|
# tef.FM_Set_Specials(1)
|
||||||
tef.FM_Set_Bandwidth(True)
|
tef.FM_Set_Bandwidth(True)
|
||||||
import time
|
import time
|
||||||
time.sleep(0.032)
|
time.sleep(0.032)
|
||||||
|
|||||||
171
xrd.py
Normal file
171
xrd.py
Normal file
@@ -0,0 +1,171 @@
|
|||||||
|
# This is a compatible server for the TEF6686 firmware, but instead using the i2c control mode
|
||||||
|
# This fully works with FM-DX-Webserver (no plugins)
|
||||||
|
|
||||||
|
import socket
|
||||||
|
import hashlib
|
||||||
|
import secrets
|
||||||
|
import string
|
||||||
|
import libtimer
|
||||||
|
import time
|
||||||
|
from tef import TEF6686
|
||||||
|
from protocol import I2CPCClient
|
||||||
|
|
||||||
|
HOST = '0.0.0.0'
|
||||||
|
PORT = 7373
|
||||||
|
PASSWORD = "test"
|
||||||
|
|
||||||
|
SALT_LENGTH = 16
|
||||||
|
SS_UPDATE_INTERVAL = 0.125
|
||||||
|
RDS_UPDATE_INTERVAL = 0.086
|
||||||
|
|
||||||
|
def generate_salt(length=SALT_LENGTH):
|
||||||
|
return "".join(secrets.choice(string.ascii_lowercase) for _ in range(length))
|
||||||
|
|
||||||
|
def compute_hash(salt, password):
|
||||||
|
return hashlib.sha1((salt + password).encode()).hexdigest().encode()
|
||||||
|
|
||||||
|
def init_tef():
|
||||||
|
p = I2CPCClient("COM17")
|
||||||
|
tef = TEF6686(p)
|
||||||
|
tef.init()
|
||||||
|
tef.AUDIO_Set_Mute(False)
|
||||||
|
tef.AUDIO_Set_Volume(60)
|
||||||
|
tef.FM_Tune_To(1, 9500)
|
||||||
|
tef.FM_Set_RDS(1)
|
||||||
|
tef.FM_Set_ChannelEqualizer(True)
|
||||||
|
tef.FM_Set_MphSuppression(True)
|
||||||
|
return tef
|
||||||
|
|
||||||
|
def authenticate(conn):
|
||||||
|
salt = generate_salt()
|
||||||
|
conn.sendall(salt.encode() + b"\n")
|
||||||
|
expected_hash = compute_hash(salt, PASSWORD)
|
||||||
|
|
||||||
|
while True:
|
||||||
|
data = conn.recv(1024)
|
||||||
|
if not data: return False
|
||||||
|
if data.strip() == expected_hash.strip(): return True
|
||||||
|
|
||||||
|
def process_command(tef: TEF6686, data: bytes, state):
|
||||||
|
if data.startswith(b"T"):
|
||||||
|
freq = int(data.decode().removeprefix("T").strip()) // 10
|
||||||
|
state['last_tune'] = freq
|
||||||
|
tef.FM_Tune_To(1, freq)
|
||||||
|
return f"T{freq*10}\n".encode()
|
||||||
|
elif data.startswith(b"G"):
|
||||||
|
eqims = int(data.decode().removeprefix("G").strip(), 2)
|
||||||
|
state['last_eqims'] = eqims
|
||||||
|
tef.FM_Set_ChannelEqualizer((eqims & 1) == 1)
|
||||||
|
tef.FM_Set_MphSuppression((eqims & 2) == 2)
|
||||||
|
return f"G{bin(eqims).removeprefix('0b').zfill(2)}\n".encode()
|
||||||
|
elif data.startswith(b"B"):
|
||||||
|
mono = bool(int(data.decode().removeprefix("B").strip(), 2))
|
||||||
|
state['forced_mono'] = mono
|
||||||
|
tef.FM_Set_Stereo_Min(2 if mono else 0)
|
||||||
|
return f"B{int(mono)}\n".encode()
|
||||||
|
elif data.startswith(b"D"):
|
||||||
|
deemp = int(data.decode().removeprefix("D").strip())
|
||||||
|
time = 500 if deemp == 0 else (750 if deemp == 1 else 0)
|
||||||
|
state['deemp'] = deemp
|
||||||
|
tef.FM_Set_Deemphasis(time)
|
||||||
|
return f"D{deemp}\n".encode()
|
||||||
|
|
||||||
|
if data.startswith(b"x"):
|
||||||
|
return b"OK"
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
def send_signal_status(tef: TEF6686, conn, state):
|
||||||
|
_, _, stereo, _ = tef.FM_Get_Processing_Status()
|
||||||
|
stereo = stereo if stereo is not None else 1000
|
||||||
|
|
||||||
|
status, level, usn, wam, _, bandwidth, _ = tef.FM_Get_Quality_Data()
|
||||||
|
if status is None or level is None or wam is None or usn is None or bandwidth is None: return
|
||||||
|
|
||||||
|
level = level / 10
|
||||||
|
|
||||||
|
data = b"S"
|
||||||
|
if state['forced_mono']: data += b"M"
|
||||||
|
elif stereo < 500: data += b"s"
|
||||||
|
else: data += b"m"
|
||||||
|
|
||||||
|
data += f"{level},{wam//10},{usn//10},{bandwidth}\n\n".encode()
|
||||||
|
conn.sendall(data)
|
||||||
|
|
||||||
|
def send_rds_data(tef, conn):
|
||||||
|
status, A, B, C, D, dec_error = tef.FM_Get_RDS_Data__decoder()
|
||||||
|
if None in (status, A, B, C, D, dec_error): return
|
||||||
|
|
||||||
|
if (status & (1 << 9) == 0) or (status & (1 << 15) == 0): return
|
||||||
|
|
||||||
|
err = 0
|
||||||
|
err |= ((dec_error >> 8) & 0x30) >> 4
|
||||||
|
err |= ((dec_error >> 8) & 0xC)
|
||||||
|
err |= ((dec_error >> 8) & 3) << 4
|
||||||
|
|
||||||
|
data = b"R"
|
||||||
|
data += f"{B:04X}{C:04X}{D:04X}{err:02X}".encode()
|
||||||
|
|
||||||
|
pi_error = dec_error >> 14
|
||||||
|
if pi_error < 3:
|
||||||
|
data += b"\nP" + f"{A:04X}".encode()
|
||||||
|
data += b"?" * pi_error + b"\n"
|
||||||
|
elif status & (1 << 12):
|
||||||
|
pi_error = (dec_error >> 10) & 0b11
|
||||||
|
if pi_error < 3:
|
||||||
|
data += b"\nP" + f"{C:04X}".encode()
|
||||||
|
data += b"?" * pi_error + b"\n"
|
||||||
|
|
||||||
|
conn.sendall(data)
|
||||||
|
|
||||||
|
def run_server():
|
||||||
|
tef = init_tef()
|
||||||
|
state = {
|
||||||
|
'last_tune': 9500,
|
||||||
|
'last_eqims': 0b11,
|
||||||
|
'forced_mono': False,
|
||||||
|
'deemp': 0
|
||||||
|
}
|
||||||
|
|
||||||
|
try:
|
||||||
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||||||
|
s.bind((HOST, PORT))
|
||||||
|
s.listen()
|
||||||
|
print(f"Server listening on {HOST}:{PORT}")
|
||||||
|
|
||||||
|
while True:
|
||||||
|
conn, addr = s.accept()
|
||||||
|
with conn:
|
||||||
|
print(f"Connected by {addr}")
|
||||||
|
if not authenticate(conn):
|
||||||
|
print("Authentication failed.")
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Send initial state
|
||||||
|
conn.sendall(f"T{state['last_tune']*10}\n".encode())
|
||||||
|
conn.sendall(f"G{bin(state['last_eqims']).removeprefix('0b').zfill(2)}\n".encode())
|
||||||
|
conn.sendall(f"B{int(state['forced_mono'])}\n".encode())
|
||||||
|
conn.sendall(f"D{state['deemp']}\n".encode())
|
||||||
|
conn.setblocking(False)
|
||||||
|
|
||||||
|
ss_timer = libtimer.Timer()
|
||||||
|
rds_timer = libtimer.Timer()
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
data = conn.recv(1024)
|
||||||
|
if not data: break
|
||||||
|
resp = process_command(tef, data, state)
|
||||||
|
if resp: conn.sendall(resp)
|
||||||
|
except ConnectionResetError: break
|
||||||
|
except BlockingIOError:
|
||||||
|
if ss_timer.get_time() > SS_UPDATE_INTERVAL:
|
||||||
|
send_signal_status(tef, conn, state)
|
||||||
|
ss_timer.reset()
|
||||||
|
if rds_timer.get_time() > RDS_UPDATE_INTERVAL:
|
||||||
|
send_rds_data(tef, conn)
|
||||||
|
rds_timer.reset()
|
||||||
|
time.sleep(0.01)
|
||||||
|
finally: tef.close()
|
||||||
|
|
||||||
|
if __name__ == "__main__": run_server()
|
||||||
Reference in New Issue
Block a user