I have a singleton in which I want to receive data from a service. Since I’m new to python the only way I could figure this out was that a service sends the data up to the singleton via a method pointer. I use the thread to hinder the service to lock the singleton with its infinite loop.
The actual circumstances it that I want to extend Mock.GPIO(a mock implementation of RPi.GPIO), https://github.com/codenio/Mock.GPIO, to be able to simulate a button push on a real RPi. The singleton represents a simplified RPi board.
I want to call the socket service with the following
echo -n "22:HI" > /dev/tcp/127.0.0.1/5566
I want it to result in
Service __init__ piBoardCallback 22:HI 22 HI <something that represents event>
The code acctually runs now.
Instead, I get 5 errors, which is the problem at the moment!!!
Exception in thread Thread-1: Traceback (most recent call last): File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner self.run() File "/usr/lib/python2.7/threading.py", line 754, in run self.__target(*self.__args, **self.__kwargs) File "/Extra/home/max/DEV/Code/workspaces/2019/Mock.GPIO/Mock/PiBoard.py", line 130, in run self.svc.listen(self.piBoardCallback) File "/Extra/home/max/DEV/Code/workspaces/2019/Mock.GPIO/Mock/PiBoard.py", line 108, in listen self.serviceThreadCallback(data) TypeError: piBoardCallback() takes exactly 2 arguments (1 given)
I’m lost when it comes to self, class, class(type) and class(object)
PLS point me in the right direction.
Below is the classes I have so far. They are in no way finished.
If you have some other solution to pass a signal change to a callback method registered via the method add_event_detect, I’m open to suggestions.
Reference to the original Mock.GPIO code, https://github.com/codenio/Mock.GPIO/blob/master/Mock/GPIO.py
My Source
import GPIO as GPIO import threading import time import socket as sk class Board(): channelConfigs = {} channelEvents = {} __instance = None global serviceThread global piBoardCallback @staticmethod def getInstance(): """ Static access method. """ if Board.__instance == None: Board() return Board.__instance def __init__(self): if Board.__instance != None: raise Exception("This class is a singleton!") else: self.serviceThread = ServiceThread() self.serviceThread.piBoardCallback = piBoardCallback self.serviceThread.threadify() Board.__instance = self def piBoardCallback(self,val): print "piBoardCallback" # This assumes that val is in format {channel:[HI|LOW]} x = val.split(":") print(val) channel = x[0] edge = x[1] print(channel) print(edge) event = self.channelEvents[channel] print(event) def setChannelConfig(self, channel): if channel != None: self.channelConfigs[channel.chanel] = channel def setChannelEvent(self, channel, edge, eventCallback): if channel != None: event = Event(edge,eventCallback) self.channelEvents[channel] = Event(edge,eventCallback) # class Event(object): class Event: ''' classdocs ''' global eventCallback global edge def __init__(self,edge,eventCallback): ''' Constructor ''' self.edge = edge self.eventCallback = eventCallback def getEventCallback(): return self.eventCallback def setEventCallback(eventCallback): self.eventCallback = eventCallback class Service: global serviceThreadCallback def __init__(self): print "Service __init__" def listen(self,serviceThreadCallback): self.serviceThreadCallback = serviceThreadCallback connection = sk.socket(sk.AF_INET, sk.SOCK_STREAM) connection.setsockopt(sk.SOL_SOCKET, sk.SO_REUSEADDR, 1) connection.bind(('0.0.0.0', 5566)) connection.listen(10) while True: current_connection, address = connection.accept() while True: data = current_connection.recv(2048) if data == 'quit\n': current_connection.shutdown(1) current_connection.close() break elif data == 'stop\n': current_connection.shutdown(1) current_connection.close() exit() elif data: self.serviceThreadCallback(data) else: break def setCallback(self, serviceThreadCallback): self.serviceThreadCallback = serviceThreadCallback def getCallback(serviceThreadCallback): return self.serviceThreadCallback class ServiceThread: global piBoardCallback global thread global svc def __init__(self, interval=1): self.interval = interval def run(self): self.svc = Service() self.svc.listen(self.piBoardCallback) def setPiBoardCallback(piBoardCallback): self.piBoardCallback = piBoardCallback def getPiBoardCallback(): return self.piBoardCallback def threadify(self): self.thread = threading.Thread(target=self.run) self.thread.daemon = True # Daemonize thread self.thread.start() # Start the execution def serviceThreadCallback(val): self.piBoardCallback(val) def ext_callback(val): print "ext_callback" print val if __name__ == '__main__': try: while True: GPIO.setmode(GPIO.BCM) GPIO.setup(22, GPIO.IN, 0, GPIO.PUD_UP) GPIO.add_event_detect(22, GPIO.FALLING, ext_callback, bouncetime=1500) time.sleep(1000) except KeyboardInterrupt: pass
My GPIO code
""" Mock Library for RPi.GPIO """ import time import logging import os # import yaml import PiBoard logger = logging.getLogger(__name__) log_level = os.getenv('LOG_LEVEL') if log_level is not None: if log_level == "Info": logger.setLevel(logging.INFO) if log_level == "Debug": logger.setLevel(logging.DEBUG) if log_level == "Warning": logger.setLevel(logging.WARNING) if log_level == "Error": logger.setLevel(logging.ERROR) if log_level == "Critical": logger.setLevel(logging.CRITICAL) else: logger.setLevel(logging.ERROR) stream_formatter = logging.Formatter('%(asctime)s:%(levelname)s: %(message)s') stream_handler = logging.StreamHandler() stream_handler.setFormatter(stream_formatter) logger.addHandler(stream_handler) BCM = 11 BOARD = 10 BOTH = 33 FALLING = 32 HARD_PWM = 43 HIGH = 1 I2C = 42 IN = 1 LOW = 0 OUT = 0 PUD_DOWN = 21 PUD_OFF = 20 PUD_UP = 22 RISING = 31 RPI_INFO = {'MANUFACTURER': 'Sony', 'P1_REVISION': 3, 'PROCESSOR': 'BCM2837', 'RAM': '1G', 'REVISION': 'a020d3', 'TYPE': 'Pi 3 Model B+'} RPI_REVISION = 3 SERIAL = 40 SPI = 41 UNKNOWN = -1 VERSION = '0.7.0' _mode = 0 channel_config = {} #flags setModeDone = False class Channel: def __init__(self,channel, direction, initial=0,pull_up_down=PUD_OFF): self.chanel = channel self.direction = direction self.initial = initial self.current = initial self.pull_up_down = pull_up_down #GPIO LIBRARY Functions def setmode(mode): """ Set up numbering mode to use for channels. BOARD - Use Raspberry Pi board numbers BCM - Use Broadcom GPIO 00..nn numbers """ board = getBoard() # GPIO = GPIO() time.sleep(1) if(mode == BCM): setModeDone = True _mode = mode elif (mode == BOARD): setModeDone = True else: setModeDone = False def getmode(): """ Get numbering mode used for channel numbers. Returns BOARD, BCM or None """ board = getBoard() return _mode def setwarnings(flag): """ Enable or disable warning messages """ board = getBoard() logger.info("Set Warings as {}".format(flag)) def setup(channel, direction, initial=0,pull_up_down=PUD_OFF): """ Set up a GPIO channel or list of channels with a direction and (optional) pull/up down control channel - either board pin number or BCM number depending on which mode is set. direction - IN or OUT [pull_up_down] - PUD_OFF (default), PUD_UP or PUD_DOWN [initial] - Initial value for an output channel """ logger.info("setup channel : {} as {} with intial :{} and pull_up_dowm {}".format(channel,direction,initial,pull_up_down)) board = getBoard() global channel_config channel_config[channel] = Channel(channel, direction, initial, pull_up_down) board.setChannelConfig(channel_config[channel]) def output(channel, value): """ Output to a GPIO channel or list of channels channel - either board pin number or BCM number depending on which mode is set. value - 0/1 or False/True or LOW/HIGH """ board = getBoard() logger.info("output channel : {} with value : {}".format(channel, value)) def input(channel): """ Input from a GPIO channel. Returns HIGH=1=True or LOW=0=False channel - either board pin number or BCM number depending on which mode is set. """ board = getBoard() logger.info("reading from chanel {}".format(channel)) def wait_for_edge(channel,edge,bouncetime,timeout): """ Wait for an edge. Returns the channel number or None on timeout. channel - either board pin number or BCM number depending on which mode is set. edge - RISING, FALLING or BOTH [bouncetime] - time allowed between calls to allow for switchbounce [timeout] - timeout in ms """ board = getBoard() logger.info("waiting for edge : {} on channel : {} with bounce time : {} and Timeout :{}".format(edge,channel,bouncetime,timeout)) def add_event_detect(channel,edge,callback,bouncetime): """ Enable edge detection events for a particular GPIO channel. channel - either board pin number or BCM number depending on which mode is set. edge - RISING, FALLING or BOTH [callback] - A callback function for the event (optional) [bouncetime] - Switch bounce timeout in ms for callback """ # board = getBoard().setChannelEvent(channel, edge, callback) logger.info("Event detect added for edge : {} on channel : {} with bouce time : {} and callback {}".format(edge,channel,bouncetime,callback)) def event_detected(channel): """ Returns True if an edge has occurred on a given GPIO. You need to enable edge detection using add_event_detect() first. channel - either board pin number or BCM number depending on which mode is set. """ board = getBoard() logger.info("Waiting for even detection on channel :{}".format(channel)) def add_event_callback(channel,callback): """ Add a callback for an event already defined using add_event_detect() channel - either board pin number or BCM number depending on which mode is set. callback - a callback function """ logger.info("Event Calback : {} added for channel : {}".format(callback,channel)) def remove_event_detect(channel): """ Remove edge detection for a particular GPIO channel channel - either board pin number or BCM number depending on which mode is set. """ board = getBoard() logger.info("Event Detect Removed for channel : {}".format(channel)) def gpio_function(channel): """ Return the current GPIO function (IN, OUT, PWM, SERIAL, I2C, SPI) channel - either board pin number or BCM number depending on which mode is set. """ board = getBoard() logger.info("GPIO function of Channel : {} is {}".format(channel,channel_config[channel].direction)) class PWM: # initialise PWM channel def __init__(self, channel, frequency): """ x.__init__(...) initializes x; see help(type(x)) for signature """ self.chanel = channel self.frequency = frequency self.dutycycle = 0 global channel_config channel_config[channel] = Channel(channel,PWM,) board = getBoard() logger.info("Initialized PWM for Channel : {} at frequency : {}".format(channel,frequency)) # where dc is the duty cycle (0.0 <= dc <= 100.0) def start(self, dutycycle): """ Start software PWM dutycycle - the duty cycle (0.0 to 100.0) """ self.dutycycle = dutycycle board = getBoard() logger.info("start pwm on channel : {} with Duty cycle : {}".format(self.chanel,dutycycle)) # where freq is the new frequency in Hz def ChangeFrequency(self, frequency): """ Change the frequency frequency - frequency in Hz (freq > 1.0) """ board = getBoard() logger.info("Freqency Changed for channel : {} from : {} -> to : {}".format(self.chanel,self.frequency,frequency)) self.frequency = frequency # where 0.0 <= dc <= 100.0 def ChangeDutyCycle(self, dutycycle): """ Change the duty cycle dutycycle - between 0.0 and 100.0 """ board = getBoard() self.dutycycle = dutycycle logger.info("Dutycycle Changed for channel : {} from : {} -> to : {}".format(self.chanel,self.dutycycle,dutycycle)) # stop PWM generation def stop(self): board = getBoard() logger.info("Stop pwm on channel : {} with Duty cycle : {}".format(self.chanel,self.dutycycle)) def cleanup(channel=None): """ Clean up by resetting all GPIO channels that have been used by this program to INPUT with no pullup/pulldown and no event detection [channel] - individual channel or list/tuple of channels to clean up. Default - clean every channel that has been used. """ board = getBoard() if channel is not None: logger.info("Cleaning Up Channel : {}".format(channel)) else: logger.info("Cleaning Up all channels") def getBoard(): rpib = PiBoard.Board.getInstance() if rpib == None: rpib = PiBoard.Board() return rpib
As you can see the “only” difference from the original is that my GPIO instantiates a PiBoard and gets those instants in the methods. In the method add_event_detect
I call setChannelEvent
in PiBoard.
Advertisement
Answer
The following is a butt ugly solution, but it works. Needs some logic but it has a socket service inside a thread living inside a Singleton. When the service is called an external method, ext_callback, is called, eventually.
import threading import time import socket as sk class Board: channelConfigs = {} channelEvents = {} __instance = None serviceThread = None @staticmethod def getInstance(): """ Static access method. """ if Board.__instance == None: Board() return Board.__instance def __init__(self): if Board.__instance != None: raise Exception("This class is a singleton!") else: Board.__instance = self Board.__instance.serviceThread = ServiceThread() Board.__instance.serviceThread.setPiBoardCallback(Board.__instance.piBoardCallback) Board.__instance.serviceThread.threadify() def piBoardCallback(_piBoardInstance, _value): global channelEvents # This assumes that _value is in format {channel:[HI|LOW]}, i.e. 22:HI values = _value.split(":") channel = values[0] edge = values[1] event = _piBoardInstance.channelEvents[int(channel)] # TODO: Handle logic on wether to call event callback or not. event.eventCallback(event) def setChannelConfig(_piBoardInstance, channel): if channel != None: _piBoardInstance.channelConfigs[channel.chanel] = channel def setChannelEvent(_piBoardInstance, _channel, _edge, _channelEventCallback): if _channel != None: event = Event(_edge, _channelEventCallback, _channel) _piBoardInstance.channelEvents[_channel] = event class Event: eventCallback = None edge = None channel = None def __init__(self, _edge, _eventCallback, _channel): self.eventCallback = _eventCallback self.edge = _edge self.channel = _channel class Service: serviceThreadCallback = None def __init__(self): print(self) def listen(self, _serviceThreadCallback): global serviceThreadCallback serviceThreadCallback = _serviceThreadCallback connection = sk.socket(sk.AF_INET, sk.SOCK_STREAM) connection.setsockopt(sk.SOL_SOCKET, sk.SO_REUSEADDR, 1) connection.bind(('0.0.0.0', 5566)) connection.listen(10) while True: current_connection, address = connection.accept() while True: data = current_connection.recv(2048) if data == 'quit\n': current_connection.shutdown(1) current_connection.close() break elif data == 'stop\n': current_connection.shutdown(1) current_connection.close() exit() elif data: _serviceThreadCallback(data) else: break def setCallback(_serviceThreadCallback): global serviceThreadCallback serviceThreadCallback = _serviceThreadCallback class ServiceThread: thread = None svc = None piBoardCallback = None def __init__(self, interval=1): self.interval = interval def run(self): global piBoardCallback self.svc = Service() self.svc.listen(piBoardCallback) def setPiBoardCallback(_serviceThread, _piBoardCallback): global piBoardCallback piBoardCallback = _piBoardCallback def threadify(self): global thread thread = threading.Thread(target=self.run) thread.daemon = True # Daemonize thread thread.start() # Start the execution def ext_callback(_event): print "ext_callback" print _event.channel print _event.edge print _event.eventCallback def getBoard(): _rpib = Board.getInstance() if _rpib == None: _rpib = Board() return _rpib class Channel: def __init__(self,channel, direction, initial=0,pull_up_down=0): self.chanel = channel self.direction = direction self.initial = initial self.current = initial self.pull_up_down = pull_up_down if __name__ == '__main__': try: while True: rpib = getBoard() rpib.setChannelConfig(Channel(22, 32, 0, 0)) rpib.setChannelEvent(22, 32, ext_callback) time.sleep(1000) except KeyboardInterrupt: pass