rewrite to mimic the behavior of regular rc remotes
This commit is contained in:
		
							parent
							
								
									83474b34c7
								
							
						
					
					
						commit
						023f67a749
					
				
					 2 changed files with 52 additions and 75 deletions
				
			
		
							
								
								
									
										80
									
								
								src/main.py
									
										
									
									
									
								
							
							
						
						
									
										80
									
								
								src/main.py
									
										
									
									
									
								
							| 
						 | 
				
			
			@ -2,8 +2,9 @@
 | 
			
		|||
import machine
 | 
			
		||||
import socket
 | 
			
		||||
import time
 | 
			
		||||
import ujson as json
 | 
			
		||||
from wifi import WifiSTA
 | 
			
		||||
 | 
			
		||||
from credentials import *
 | 
			
		||||
 | 
			
		||||
## Pins
 | 
			
		||||
PIN_UP = 14 # D5
 | 
			
		||||
| 
						 | 
				
			
			@ -15,76 +16,47 @@ PIN_B1 =  0 # D2
 | 
			
		|||
PIN_B2 =  4 # D3
 | 
			
		||||
## Delay
 | 
			
		||||
DELAY_CMD = 0.1
 | 
			
		||||
DELAY_IRQ = 0.1
 | 
			
		||||
## HOST
 | 
			
		||||
RX_ADDR = ("192.168.11.220", 80)
 | 
			
		||||
# RX_ADDR = ("192.168.0.1", 80)
 | 
			
		||||
# RX_ADDR = ("192.168.11.220", 80)
 | 
			
		||||
# RX_ADDR = ("192.168.11.217", 8000)
 | 
			
		||||
RX_ADDR = ("192.168.0.1", 8000)
 | 
			
		||||
 | 
			
		||||
ts_irq = 0
 | 
			
		||||
s = None
 | 
			
		||||
## Runtime
 | 
			
		||||
wifiSTA = None
 | 
			
		||||
 | 
			
		||||
def handle_btn_up(pin):
 | 
			
		||||
    global ts_irq
 | 
			
		||||
    t = time.time()
 | 
			
		||||
    if t > (ts_irq + DELAY_IRQ):
 | 
			
		||||
        ts_irq = t
 | 
			
		||||
        send_cmd("rotate")
 | 
			
		||||
 | 
			
		||||
# def handle_btn_b1(pin):
 | 
			
		||||
#     global ts_irq
 | 
			
		||||
#     t = time.time()
 | 
			
		||||
#     if t > (ts_irq + DELAY_IRQ):
 | 
			
		||||
#         ts_irq = t
 | 
			
		||||
#         send_cmd("b1")
 | 
			
		||||
 | 
			
		||||
# def handle_btn_b2(pin):
 | 
			
		||||
#     global ts_irq
 | 
			
		||||
#     t = time.time()
 | 
			
		||||
#     if t > (ts_irq + DELAY_IRQ):
 | 
			
		||||
#         ts_irq = t
 | 
			
		||||
#         send_cmd("b2")
 | 
			
		||||
 | 
			
		||||
def send_cmd(cmd):
 | 
			
		||||
    global s
 | 
			
		||||
    print("cmd: {}".format(cmd))
 | 
			
		||||
def send_data(s, data):
 | 
			
		||||
    print("data: {}".format(data))
 | 
			
		||||
    try:
 | 
			
		||||
        s.sendto(cmd.encode(), RX_ADDR)
 | 
			
		||||
        s.sendto(data.encode(), RX_ADDR)
 | 
			
		||||
    except Exception as ex:
 | 
			
		||||
        print("Exception Type:{}, args: {}".format(type(ex).__name__, ex.args))
 | 
			
		||||
 | 
			
		||||
def main():
 | 
			
		||||
    global s
 | 
			
		||||
    wifiSTA = WifiSTA()
 | 
			
		||||
    ## initialize buttons and IRQs
 | 
			
		||||
    global wifiSTA
 | 
			
		||||
    wifiSTA = WifiSTA(SSID, PASS)
 | 
			
		||||
    # initialize buttons
 | 
			
		||||
    btn_up = machine.Pin(PIN_UP, machine.Pin.IN, machine.Pin.PULL_UP)
 | 
			
		||||
    btn_up.irq(trigger=machine.Pin.IRQ_FALLING, handler=handle_btn_up)
 | 
			
		||||
    btn_dn = machine.Pin(PIN_DN, machine.Pin.IN, machine.Pin.PULL_UP)
 | 
			
		||||
    # btn_dn.irq(trigger=machine.Pin.IRQ_FALLING, handler=handle_btn_dn)
 | 
			
		||||
    btn_le = machine.Pin(PIN_LE, machine.Pin.IN, machine.Pin.PULL_UP)
 | 
			
		||||
    # btn_le.irq(trigger=machine.Pin.IRQ_FALLING, handler=handle_btn_le)
 | 
			
		||||
    btn_ri = machine.Pin(PIN_RI, machine.Pin.IN, machine.Pin.PULL_UP)
 | 
			
		||||
    # btn_ri.irq(trigger=machine.Pin.IRQ_FALLING, handler=handle_btn_ri)
 | 
			
		||||
    btn_b1 = machine.Pin(PIN_B1, machine.Pin.IN, machine.Pin.PULL_UP)
 | 
			
		||||
    # btn_b1.irq(trigger=machine.Pin.IRQ_FALLING, handler=handle_btn_b1)
 | 
			
		||||
    btn_b2 = machine.Pin(PIN_B2, machine.Pin.IN, machine.Pin.PULL_UP)
 | 
			
		||||
    # btn_b2.irq(trigger=machine.Pin.IRQ_FALLING, handler=handle_btn_b2)
 | 
			
		||||
    # initialize socket
 | 
			
		||||
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
 | 
			
		||||
 | 
			
		||||
    # runtime
 | 
			
		||||
    buttons = {"btn_up": btn_up, "btn_dn": btn_dn, "btn_le": btn_le,
 | 
			
		||||
               "btn_ri": btn_ri, "btn_b1": btn_b1, "btn_b2": btn_b2}
 | 
			
		||||
    values = {"btn_up": 0, "btn_dn": 0, "btn_le": 0,
 | 
			
		||||
              "btn_ri": 0, "btn_b1": 0, "btn_b2": 0}
 | 
			
		||||
    # loop
 | 
			
		||||
    while True:
 | 
			
		||||
        if not btn_le.value():
 | 
			
		||||
            send_cmd("left")
 | 
			
		||||
        elif not btn_ri.value():
 | 
			
		||||
            send_cmd("right")
 | 
			
		||||
        elif not btn_b1.value():
 | 
			
		||||
            send_cmd("accel")
 | 
			
		||||
        elif not btn_b2.value():
 | 
			
		||||
            send_cmd("decel")
 | 
			
		||||
        elif not btn_up.value():
 | 
			
		||||
            send_cmd("rotate")
 | 
			
		||||
        elif not btn_dn.value():
 | 
			
		||||
            send_cmd("stop")
 | 
			
		||||
        wifiSTA.check()
 | 
			
		||||
 | 
			
		||||
        for name, button in buttons.items():
 | 
			
		||||
            values[name] = not button.value()
 | 
			
		||||
 | 
			
		||||
        send_data(s, json.dumps(values))
 | 
			
		||||
        time.sleep(DELAY_CMD)
 | 
			
		||||
 | 
			
		||||
if __name__ == "__main__":
 | 
			
		||||
    main()
 | 
			
		||||
    main()
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
							
								
								
									
										47
									
								
								src/wifi.py
									
										
									
									
									
								
							
							
						
						
									
										47
									
								
								src/wifi.py
									
										
									
									
									
								
							| 
						 | 
				
			
			@ -1,4 +1,3 @@
 | 
			
		|||
from credentials import *
 | 
			
		||||
import network
 | 
			
		||||
import time
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -9,35 +8,41 @@ dns = "192.168.0.1"
 | 
			
		|||
 | 
			
		||||
 | 
			
		||||
class WifiAP:
 | 
			
		||||
    def __init__(self):
 | 
			
		||||
        self.if_ap = network.WLAN(network.AP_IF)
 | 
			
		||||
    def __init__(self, ssid, password):
 | 
			
		||||
        self.ssid = ssid
 | 
			
		||||
        self.password = password
 | 
			
		||||
        # STA: disable
 | 
			
		||||
        self.if_sta = network.WLAN(network.STA_IF)
 | 
			
		||||
        self.if_sta.active(False)
 | 
			
		||||
        # AP: enable
 | 
			
		||||
        self.if_ap = network.WLAN(network.AP_IF)
 | 
			
		||||
 | 
			
		||||
    def connect(self):
 | 
			
		||||
        self.if_ap.active(True)
 | 
			
		||||
        self.if_ap.ifconfig([address, netmask, gateway, dns])
 | 
			
		||||
        self.if_ap.config(essid=SSID, password=PASS)
 | 
			
		||||
        self.if_ap.config(essid=self.ssid, password=self.password)
 | 
			
		||||
        print("network config {}".format(self.if_ap.ifconfig()))
 | 
			
		||||
 | 
			
		||||
class WifiSTA:
 | 
			
		||||
    def __init__(self):
 | 
			
		||||
    def __init__(self, ssid, password):
 | 
			
		||||
        self.ssid = ssid
 | 
			
		||||
        self.password = password
 | 
			
		||||
        # AP: disable
 | 
			
		||||
        self.if_ap = network.WLAN(network.AP_IF)
 | 
			
		||||
        self.if_sta = network.WLAN(network.STA_IF)
 | 
			
		||||
        self.if_ap.active(False)
 | 
			
		||||
        # STA: enable
 | 
			
		||||
        self.if_sta = network.WLAN(network.STA_IF)
 | 
			
		||||
        self.if_sta.active(True)
 | 
			
		||||
        self.if_sta.connect(SSID, PASS)
 | 
			
		||||
        # self.wifi_thread = threading.Thread(target=self.wifi_thread_fn)
 | 
			
		||||
 | 
			
		||||
    def check(self):
 | 
			
		||||
        if not self.if_sta.isconnected():
 | 
			
		||||
            print("wifi down, reconnecting...")
 | 
			
		||||
            self.connect()
 | 
			
		||||
 | 
			
		||||
    def connect(self):
 | 
			
		||||
        print("connecting wifi")
 | 
			
		||||
        self.if_sta.active(True)
 | 
			
		||||
        self.if_sta.connect(self.ssid, self.password)
 | 
			
		||||
        while not self.if_sta.isconnected():
 | 
			
		||||
            time.sleep(0.1)
 | 
			
		||||
        print("network config {}".format(self.if_sta.ifconfig()))
 | 
			
		||||
        # self.wifi_thread.start()
 | 
			
		||||
 | 
			
		||||
    # def wifi_thread_fn(self):
 | 
			
		||||
    #     while True:
 | 
			
		||||
    #         if not self.if_sta.isconnected():
 | 
			
		||||
    #             print("reconnecting...")
 | 
			
		||||
    #             self.if_sta.connect(SSID, PASS)
 | 
			
		||||
    #             while not self.if_sta.isconnected():
 | 
			
		||||
    #                 time.sleep(0.1)
 | 
			
		||||
    #             print("network config {}".format(self.if_sta.ifconfig()))
 | 
			
		||||
    #         else:
 | 
			
		||||
    #             time.sleep(0.1)
 | 
			
		||||
        print("network config {}".format(self.if_sta.ifconfig()))
 | 
			
		||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue