Чот накодил
This commit is contained in:
@@ -0,0 +1,79 @@
|
||||
import serial
|
||||
|
||||
MAX_PAYLOAD = 31
|
||||
|
||||
TYPE_FIRST = 0b00
|
||||
TYPE_MIDDLE = 0b01
|
||||
TYPE_LAST = 0b10
|
||||
CRC4_POLY = 0x13 # x^4 + x + 1
|
||||
|
||||
def crc4_bytes(data: bytes) -> int:
|
||||
reg = 0
|
||||
for byte in data:
|
||||
for i in range(7, -1, -1):
|
||||
bit = (byte >> i) & 1
|
||||
reg = ((reg << 1) | bit) & 0x1F
|
||||
if reg & 0x10:
|
||||
reg ^= CRC4_POLY
|
||||
for _ in range(4):
|
||||
reg = (reg << 1) & 0x1F
|
||||
if reg & 0x10:
|
||||
reg ^= CRC4_POLY
|
||||
return reg & 0x0F
|
||||
|
||||
def build_header(pkt_type, session, nibble):
|
||||
return ((nibble & 0x0F) << 4) | ((session & 3) << 2) | (pkt_type & 3)
|
||||
|
||||
def create_packets(msg: bytes, session: int):
|
||||
# Разбиваем сообщение
|
||||
chunks = [msg[i:i+MAX_PAYLOAD] for i in range(0, len(msg), MAX_PAYLOAD)]
|
||||
data_packets = max(1, len(chunks))
|
||||
|
||||
# CRC
|
||||
crc = crc4_bytes(msg)
|
||||
|
||||
packets = []
|
||||
|
||||
# FIRST
|
||||
header = build_header(TYPE_FIRST, session, data_packets)
|
||||
packets.append(bytes([header]) + chunks[0])
|
||||
|
||||
# MIDDLE
|
||||
for i in range(1, data_packets):
|
||||
header = build_header(TYPE_MIDDLE, session, i)
|
||||
packets.append(bytes([header]) + chunks[i])
|
||||
|
||||
# LAST (payload пустой)
|
||||
header = build_header(TYPE_LAST, session, crc)
|
||||
packets.append(bytes([header]))
|
||||
|
||||
return packets
|
||||
|
||||
|
||||
def main():
|
||||
#port = input("COM-порт (пример: COM5 или /dev/ttyUSB0): ")
|
||||
port = "/dev/ttyACM0"
|
||||
#text = input("Введите строку для отправки: ")
|
||||
text = "Some long string to send probably it shouldn't fit into one packet so several of them should arrive"
|
||||
|
||||
msg = text.encode("utf-8")
|
||||
session = 1
|
||||
|
||||
packets = create_packets(msg, session)
|
||||
|
||||
print("\nГотовые пакеты:")
|
||||
for i, pkt in enumerate(packets):
|
||||
print(f"{i}: " + " ".join(f"{b:02X}" for b in pkt))
|
||||
|
||||
# отправка
|
||||
with serial.Serial(port, 115200, timeout=1) as ser:
|
||||
for pkt in packets:
|
||||
ser.write(pkt)
|
||||
ser.flush()
|
||||
print(f"Отправлено: {len(pkt)} bytes")
|
||||
|
||||
print("Готово.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user