Search code examples
pythonpython-requestsesp8266micropython

[Python][Micropython] Im trying to port over a python script to micropython


I am trying to port over this script to micropython but im not having any luck. The python code works as intended but i want an esp8266 to monitor the json for the relay status. Im getting OSerror 2 and now Exception once i print it out . Looking for ways to help find out what exception .

Python Code:

import requests
import binascii
from requests.auth import HTTPBasicAuth

url = "http://apex.local/cgi-bin/status.json"
username = "admin"
password = "1234"
previous_status = None

def login():
    credentials = '{}:{}'.format(username, password)
    encoded_credentials = binascii.b2a_base64(credentials.encode())[:-1].decode()
    headers = {'Authorization': 'Basic {}'.format(encoded_credentials)}

    response = requests.get(url, headers=headers)
    return response.json()

def get_relay_status(json_data):
    for output in json_data['istat']['outputs']:
        if output['name'] == 'Relay_1':
            return output['status'][0]

def check_status():
    global previous_status
    
    try:
        json_data = login()
        current_status = get_relay_status(json_data)

        if previous_status is not None and previous_status != current_status:
            print(f"Relay status changed to {current_status}")

        previous_status = current_status
    except requests.exceptions.RequestException as e:
        print(f"Error fetching JSON data: {e}")
while True:
    check_status()

Micropython Code:

import machine
import time
import urequests
import ubinascii

# Wi-Fi credentials
WIFI_SSID = "CREDS"
WIFI_PASSWORD = "CREDS"

# Apex controller credentials
#url = "http://ip.jsontest.com/"
url = "http://apex.local/cgi-bin/status.json"
username = "admin"
password = "1234"
previous_status = None

# LED pin
LED_PIN = 2

# Connect to Wi-Fi network
import network

station = network.WLAN(network.STA_IF)
station.active(True)
station.connect(WIFI_SSID, WIFI_PASSWORD)
while not station.isconnected():
    time.sleep(1)
print('Wi-Fi connection successful')

# Define LED pin as output
led = machine.Pin(LED_PIN, machine.Pin.OUT)

def login():
    credentials = '{}:{}'.format(username, password)
    encoded_credentials = ubinascii.b2a_base64(credentials.encode())[:-1].decode()
    headers = {'Authorization': 'Basic {}'.format(encoded_credentials)}

    response = urequests.get(url, headers=headers)
    return response.json(),response
    
# Fetch relay status and toggle LED accordingly
def get_relay_status(json_data):
    for output in json_data['istat']['outputs']:
        if output['name'] == 'Relay_1':
            status = output['status'][0]
            if status == 'ON':
                led.on()
            else:
                led.off()

def check_status():
    global previous_status
    
    try:
        json_data = login()
        current_status = get_relay_status(json_data)
        if previous_status is not None and previous_status != current_status:
          print(current_status)
          previous_status = current_status
    except:
        print("RequestException")
      
while True:
    check_status()

Solution

  • What i ended up using . Having some memory issues and watchdog . But this code works for the most part on a wemos d1 mini.

    import machine
    import time
    import urequests
    import ubinascii
    import gc
    
    MIN_HEAP_MEMORY = 10240
    
    # Wi-Fi credentials
    WIFI_SSID = "cred"
    WIFI_PASSWORD = "cred"
    
    # Apex controller credentials
    url = "http://192.168.1.128/cgi-bin/status.json"
    username = "admin"
    password = "1234"
    previous_status = None
    
    # LED pin
    LED_PIN = 2
    RELAY_PIN = 5
    
    # Connect to Wi-Fi network
    import network
    
    station = network.WLAN(network.STA_IF)
    station.active(True)
    station.connect(WIFI_SSID, WIFI_PASSWORD)
    while not station.isconnected():
        time.sleep(1)
        machine.idle()
    print('Wi-Fi connection successful')
    ip_address = station.ifconfig()[0]
    print(ip_address)
    
    # Define LED pin as output
    led_pin = machine.Pin(LED_PIN, machine.Pin.OUT)
    relay_pin = machine.Pin(RELAY_PIN, machine.Pin.OUT)
    
    # Precompute headers for requests
    credentials = '{}:{}'.format(username, password)
    encoded_credentials = ubinascii.b2a_base64(credentials.encode())[:-1].decode()
    headers = {'Authorization': 'Basic {}'.format(encoded_credentials)}
    
    def login():
        response = urequests.get(url, headers=headers)
        json_data = response.json()
        return json_data
    
    # Fetch relay status and toggle LED accordingly
    def get_relay_status(json_data):
        for output in json_data['istat']['outputs']:
            if output['name'] == 'Relay_1':
                status = output['status'][0]
                if status == 'ON':
                    
                    led_pin.off()
                    relay_pin.on()
                    control_status = "2"
                else:
                    led_pin.on()
                    relay_pin.off()
                    control_status = "1"
        return status, control_status
    
    def check_status():
        global previous_status
        json_data = login()
        current_status, control_status = get_relay_status(json_data)
    
        if previous_status is not None and previous_status != current_status:
            control_inject(control_status)
            previous_status = current_status
        else:
            previous_status = current_status
    
    def control_inject(control_status):
        data = {
            "Control_1_state": control_status,
            "noResponse": 1
        }
        data_str = '&'.join(f"{k}={str(v)}" for k, v in data.items())
        print(data_str)
    
        try:
            # Send the request without checking the response
            urequests.post("http://192.168.1.128/cgi-bin/status.cgi", data=data_str, headers=headers)
        except Exception as e:
            print("If zero all good {e}")
    
    while True:
        try:
            check_status()
            time.sleep(1)
        except Exception as e:
            print("Exception occurred:", e)
    
        # Force garbage collection and check available heap memory
        gc.collect()
        free_memory = gc.mem_free()
        print("Free heap memory: {} bytes".format(free_memory))
    
        if free_memory < MIN_HEAP_MEMORY:
            print("Low heap memory, rebooting...")
            machine.reset()
    
        machine.idle()