Pro Notes 🚀

2025-05-01 20:39:20

Wi-Fi ( ESP32-S3 как точка доступа ) Код отправляет GPS-данные через UDP на широковещательный адрес, сохраняя обработку GPS и вывод на TM1637. Приёмники (ESP32-C3) будут принимать данные через Wi-Fi.

import board
import busio
import TM1637  # https://github.com/bablokb/circuitpython-tm1637
import wifi
import socketpool

# Глобальная переменная для режима продакшн
IS_PRODUCTION = True  # True для продакшн, False для отладки

# Инициализация UART для GPS
uart = busio.UART(board.RX, board.TX, baudrate=9600, timeout=10)

# Настройка TM1637
display = TM1637.TM1637(board.D7, board.D8)  # clk, dio
display.brightness(1)
display.show("    ")  # Очищаем экран при старте

# Настройка Wi-Fi (точка доступа)
try:
    wifi.radio.start_ap(ssid="GPS_Transmitter", password="12345678")
    if not IS_PRODUCTION:
        print(f"Wi-Fi AP started: SSID=GPS_Transmitter, IP={wifi.radio.ipv4_address}")
except Exception as e:
    if not IS_PRODUCTION:
        print(f"Wi-Fi setup error: {e}")

# Настройка UDP
pool = socketpool.SocketPool(wifi.radio)
udp = pool.socket(pool.AF_INET, pool.SOCK_DGRAM)
BROADCAST_ADDR = ("255.255.255.255", 12345)

# Буфер для данных
raw_buffer = bytearray()

# Переменные для хранения данных
latitude = None
longitude = None
altitude = None
date = None
time = None
speed = None
strong_satellites = 0  # Для подсчёта спутников с SNR > 10
gpgsv_block = []  # Для хранения строк блока GPGSV

def parse_gpgga(line):
    """Парсинг GPGGA для широты, долготы, высоты и времени."""
    try:
        fields = line.split(',')
        if fields[0] != '$GPGGA' or fields[6] == '0':
            return False
        
        # Широта: fields[2] (DDMM.MMMM), fields[3] (N/S)
        lat = float(fields[2]) / 100
        lat_deg = int(lat)
        lat_min = (lat - lat_deg) * 100 / 60
        lat = lat_deg + lat_min
        if fields[3] == 'S':
            lat = -lat

        # Долгота: fields[4] (DDDMM.MMMM), fields[5] (E/W)
        lon = float(fields[4]) / 100
        lon_deg = int(lon)
        lon_min = (lon - lon_deg) * 100 / 60
        lon = lon_deg + lon_min
        if fields[5] == 'W':
            lon = -lon

        # Высота: fields[9] (метры)
        alt = float(fields[9]) if fields[9] else None

        # Время: fields[1] (HHMMSS.SS)
        t = fields[1]
        time_str = f"{t[:2]}:{t[2:4]}:{t[4:6]}" if t else None

        return {'latitude': lat, 'longitude': lon, 'altitude': alt, 'time': time_str}
    except:
        return False

def parse_gprmc(line):
    """Парсинг GPRMC для даты, времени и скорости."""
    try:
        fields = line.split(',')
        if fields[0] != '$GPRMC':
            return False
        
        # Время: fields[1] (HHMMSS.SS)
        t = fields[1]
        time_str = f"{t[:2]}:{t[2:4]}:{t[4:6]}" if t else None

        # Дата: fields[9] (DDMMYY)
        d = fields[9]
        date_str = f"20{d[4:6]}-{d[2:4]}-{d[:2]}" if d else None

        # Скорость: fields[7] (узлы), конвертация в км/ч (1 узел = 1.852 км/ч)
        spd = None
        if fields[2] == 'A' and fields[7]:
            spd = float(fields[7]) * 1.852

        return {'date': date_str, 'time': time_str, 'speed': spd}
    except:
        return False

def parse_gpgsv_block(block):
    """Парсинг полного блока GPGSV для подсчёта спутников с SNR > 10."""
    try:
        satellite_count = 0
        for line in block:
            fields = line.split(',')
            if fields[0] != '$GPGSV':
                continue
            for i in range(4, len(fields) - 1, 4):  # SNR в каждом 4-м поле
                snr = fields[i + 3] if i + 3 < len(fields) else ''
                if snr and snr.isdigit() and int(snr) > 10:
                    satellite_count += 1
        return satellite_count
    except:
        return 0

def print_output():
    """Формирование и вывод строки, отображение времени на TM1637, отправка по UDP."""
    if date and time:
        lon_str = f"{longitude:.6f}" if longitude is not None else "--"
        lat_str = f"{latitude:.6f}" if latitude is not None else "--"
        alt_str = f"{altitude:.1f}" if altitude is not None else "--"
        spd_str = f"{speed:.1f}" if speed is not None else "--"
        quality = str(strong_satellites)
        output = f"{date} {time} / {lon_str} {lat_str} {alt_str} {spd_str} {quality}\n"
        print(output, end="")  # Вывод в консоль
        try:
            udp.sendto(output.encode(), BROADCAST_ADDR)  # Отправка через UDP
        except Exception as e:
            if not IS_PRODUCTION:
                print(f"UDP send error: {e}")
        # Отображение времени на TM1637
        time_for_display = time.replace(":", "")[:4]
        display.show(time_for_display)

while True:
    if not IS_PRODUCTION:
        print('----')
    
    # Чтение данных из UART
    raw_data = uart.read(100)
    if raw_data:
        raw_buffer.extend(raw_data)

        # Обработка буфера
        while b'\n' in raw_buffer:
            index = raw_buffer.find(b'\n')
            line = raw_buffer[:index]
            raw_buffer = raw_buffer[index + 1:]

            try:
                # Фильтрация ASCII-символов
                ascii_line = ''
                for b in line:
                    if 32 <= b <= 126:  # Печатные ASCII
                        ascii_line += chr(b)
                
                if ascii_line:
                    if not IS_PRODUCTION:
                        print(f"Raw NMEA: {ascii_line}")

                    # Парсинг строк
                    if ascii_line.startswith('$GPGGA'):
                        gpgga_data = parse_gpgga(ascii_line)
                        if gpgga_data:
                            latitude = gpgga_data['latitude']
                            longitude = gpgga_data['longitude']
                            altitude = gpgga_data['altitude']
                            if gpgga_data['time']:
                                time = gpgga_data['time']
                        else:
                            latitude = None
                            longitude = None
                            altitude = None

                    elif ascii_line.startswith('$GPRMC'):
                        gprmc_data = parse_gprmc(ascii_line)
                        if gprmc_data:
                            if gprmc_data['date']:
                                date = gprmc_data['date']
                            if gprmc_data['time']:
                                time = gprmc_data['time']
                            speed = gprmc_data['speed']
                            # Вывод при обработке GPRMC
                            print_output()

                    elif ascii_line.startswith('$GPGSV'):
                        # Добавляем строку в блок
                        gpgsv_block.append(ascii_line)
                        if ascii_line.split(',')[1] == '1':  # Первая строка блока
                            gpgsv_block = [ascii_line]  # Сбрасываем блок
                        # Если это последняя строка блока, обрабатываем
                        total_msgs = int(ascii_line.split(',')[1])
                        current_msg = int(ascii_line.split(',')[2])
                        if current_msg == total_msgs:
                            strong_satellites = parse_gpgsv_block(gpgsv_block)
                            gpgsv_block = []  # Очищаем блок
                        if not IS_PRODUCTION:
                            print(f"Спутники с SNR > 10: {strong_satellites}")

            except Exception as e:
                if not IS_PRODUCTION:
                    print(f"Ошибка обработки: {e}")

============= Пример кода для приёмника на ESP32-C3 (CircuitPython): ===============

import wifi
import socketpool

# Подключение к Wi-Fi
try:
    wifi.radio.connect(ssid="GPS_Transmitter", password="12345678")
    print(f"Connected to Wi-Fi, IP={wifi.radio.ipv4_address}")
except Exception as e:
    print(f"Wi-Fi connect error: {e}")

# Настройка UDP
pool = socketpool.SocketPool(wifi.radio)
udp = pool.socket(pool.AF_INET, pool.SOCK_DGRAM)
udp.bind(("0.0.0.0", 12345))

while True:
    try:
        data, addr = udp.recvfrom(1024)
        print(data.decode(), end="")  # Вывод строки GPS-данных
    except Exception as e:
        print(f"UDP receive error: {e}")

==============================

Альтернатива: Используйте смартфон с приложением “UDP Terminal” (Android/iOS), настроенным на порт 12345, для теста.

        
← Previous Next →
Back to list