Multithreading Raspberry Pi Game Controller in Python

Multithreading Raspberry Pi Game Controller for Robot Control

Multithreading on the Raspberry Pi is an excellent way to keep the main thread of your program running as efficiently as possible. To demonstrate, I show how you can move game controller polling routines to a separate thread. Find the Python example code for robot control at the end of this feature.

In the last article, featuring this controller, I include example code to show how you might interface with the device using python code. Unfortunately, while using the inputs Python package, the Python program will only run one program cycle between each controller event update. This code blocking event meant that the program would not perform other tasks while the controller is inactive. So to fix this code blocking, I am going to add multithreading to the new example code; which I include at the end of this feature.

Multithreading Raspberry Pi Game Controller Robot Control
Multithreading Raspberry Pi Game Controller for Robot Control

Despite the code execution blocking of the Python inputs package, I was still able to code an excellent remote control robot vehicle. Variable speed and steering control were very responsive with no noticeable lag. And I can probably add a lot more functions to the code without the need to use multithreading. However, it will not be possible to add tasks that you expect to run during controller idle times. For example:

  • Robot sensor scans and updates to a display cannot be made during controller idle times.
  • An autonomous robot control function will not be able to poll events from an idle controller without becoming blocked.
  • It will not be possible to switch from an active autonomous function to manual control without the independent function polling controller events.
  • OpenCV applications can be less efficient if processing is restricted to fit between controller event polling.
Multithreading Performance
HTOP Terminal Application
HTOP Terminal Application

The Terminal application above can help check the multithreading performance of your Python application. You can install the program by entering the following command at the Terminal command prompt of the Raspberry Pi:

sudo apt-get install htop

To run the program just enter the following:

htop

Then open a second Terminal session to execute a Python program.

Multithreading

Ideally, I want the main thread of my program to run as efficiently as possible while looping continuously; executing function after function. However, connected devices such as game controllers, LCDs and many kinds of external sensors can drastically slow down or block the main program. This slowdown is due to slow IO interfaces rather than processor utilisation limits.

The game controller can restrict the scope of my robot programming in ways explained above. If I use multithreading programming techniques, I can run the game controller class methods in a separate thread from the main program. I can then use custom class methods to poll the game controller indirectly, thus avoiding the code execution blocking methods of the game controller class.

Running IO devices, in separate program threads, will allow more processor utilisation in the program’s main thread.

Multithreading Example Code

The example Python program shows one way to implement multithreading. However, this implementation may not suit all situations so research for a suitable implementation for your specific application. The example program is experimental and is so far working on a Raspberry Pi 3.

Note that multithreading works best with IO devices if the thread remains active for long durations, this is due to IO devices like the game controller having very low processor utilisation.

Robot Control Controller Button Map
Robot Control Raspberry Pi Controller Button Map for Python Inputs Module
Code Snippets

Here I will make a few notes on code snippets taken from the full Python program.

gamepadInputs = {'ABS_X': 128, 
				'ABS_RZ': 127, 
				'BTN_SOUTH': 0, 
				'BTN_WEST': 0,
				'BTN_START': 0}

Using the button reference in the game controller image above we can create a list of buttons we want to use. The default value is included to indicate the controller button resting position. The dictionary data structure allows access to button values utilising the button name as the key. The button names are referred to as commands throughout the program.

gamepad = ThreadedInputs()

We initialise the class that contains the gamepad and multithreading methods.

for gamepadInput in gamepadInputs:
	gamepad.append_command(gamepadInput, gamepadInputs[gamepadInput])

Before we start the gamepad object, we need to load it with controller buttons from the list created earlier. The object multithreading method will only record gamepad events from the controller buttons included in the list.

gamepad.start()

We execute the object start method to begin the multithreading. This object method will start the polling of the game controller in a separate thread from the main program. Therefore, the idle game controller will no longer stop the main program thread from executing.

commandInput, commandValue = gamepad.read()

The gamepad object read method returns any new controller event update; which returns button name and value. If there is no event update, the read method will return ‘No Match’.

We can now poll the game controller as many times as we want without blocking the main program.

if commandInput == 'ABS_X' or commandInput == 'ABS_RZ':
	# Drive and steering
	drive_control()
elif commandInput == 'BTN_SOUTH':
	# Fire Nerf dart button for example
	fire_nerf_dart(commandInput, commandValue)
elif commandInput == 'BTN_WEST':
	# Switch the LED Beacon for example
	led_beacon(commandInput, commandValue)
elif commandInput == 'BTN_START':
	# Exit the while loop - this program is closing
	break

If there is a new event update from the game controller, the gamepad object read method will return the name of the button. Python ‘if’ statements is an excellent to apply actions to individual controller buttons.

sleep(0.01)

I slow the program without affecting current functions. Reducing processing loops will help save battery power on a project such as a robot vehicle.

Class Methods
def read(self):
	# Return the latest command from gamepad event
	if not self.q.empty():
		newCommand = self.q.get()
		while not self.q.empty():
			trashBin = self.q.get()
			
		return newCommand, self.gamepadInputs[newCommand]
	else:
		return self.NOMATCH, 0

So, here is the gamepad read method that will return any new game controller updates. Updates are put in the queue object, but the read method will only pass the last entry in the queue (LIFO – last in first out). Therefore, the read method will flush any older entries from the queue object. The read method will return a ‘No Match’ if the queue is empty.

def command_value(self, commandKey):
	# Get command value
	if commandKey in self.gamepadInputs:
		return self.gamepadInputs[commandKey]
	else:
		return None

The above method will return the latest value for any button in the list.

def gamepad_update(self):
	while True:
		# Should the thread exit?
		if self.stopped:
			return
		# Code execution stops at the following line until a gamepad event occurs.
		events = get_gamepad()
		for event in events:
			event_test = self.gamepadInputs.get(event.code, self.NOMATCH)
			if event_test != self.NOMATCH:
				self.gamepadInputs[event.code] = event.state
				self.lastEventCode = event.code
				self.q.put(event.code)

The start method starts the above method to run in a separate thread. Every game controller event is collected and filtered here. If the idle game controller blocks this method, the main program thread will continue to execute.

The Full Multithreading Program

You can also get the full multithreading Python code example from GitHub here.

Or download the code from the Raspberry Pi Terminal by entering the following at the command prompt.

wget https://github.com/MarkAHeywood/bluetin/blob/master/multithreading-raspberry-pi-game-controller-in-python/multithreading_gamepad.py

Copy and paste the above line to the Terminal if possible.

#!/usr/bin/env python3
"""
multithreading_gamepad.py
Mark Heywood, 2018
www.bluetin.io
10/01/2018
"""

__author__ = "Mark Heywood"
__version__ = "0.1.1"
__license__ = "MIT"


from time import sleep
from threading import Thread
import queue
import time

from inputs import get_gamepad


class ThreadedInputs:
	NOMATCH = 'No Match'
	
	def __init__(self):
		# Initialise gamepad command dictionary.
		# Add gamepad commands using the append method before executing the start method.
		self.gamepadInputs = {}
		self.lastEventCode = self.NOMATCH
		# Initialise the thread status flag
		self.stopped = False
		self.q = queue.LifoQueue()

	def start(self):
		# Start the thread to poll gamepad event updates
		t = Thread(target=self.gamepad_update, args=())
		t.daemon = True
		t.start()
		
	def gamepad_update(self):
		while True:
			# Should the thread exit?
			if self.stopped:
				return
			# Code execution stops at the following line until a gamepad event occurs.
			events = get_gamepad()
			for event in events:
				event_test = self.gamepadInputs.get(event.code, self.NOMATCH)
				if event_test != self.NOMATCH:
					self.gamepadInputs[event.code] = event.state
					self.lastEventCode = event.code
					self.q.put(event.code)

	def read(self):
		# Return the latest command from gamepad event
		if not self.q.empty():
			newCommand = self.q.get()
			while not self.q.empty():
				trashBin = self.q.get()
	
			return newCommand, self.gamepadInputs[newCommand]
		else:
			return self.NOMATCH, 0

	def stop(self):
		# Stop the game pad thread
		self.stopped = True
		
	def append_command(self, newCommand, newValue):
		# Add new controller command to the list
		if newCommand not in self.gamepadInputs:
			self.gamepadInputs[newCommand] = newValue
		else:
			print('New command already exists')
		
	def delete_command(self, commandKey):
		# Remove controller command from list
		if commandKey in self.gamepadInputs:
			del self.gamepadInputs[commandKey]
		else:
			print('No command to delete')

	def command_value(self, commandKey):
		# Get command value
		if commandKey in self.gamepadInputs:
			return self.gamepadInputs[commandKey]
		else:
			return None


def drive_control():
	# Function to drive robot motors
	print('Speed -> {} || Value -> {}'.format('ABS_RZ', gamepad.command_value('ABS_RZ')))
	print('Direction -> {} || Value -> {}'.format('ABS_X', gamepad.command_value('ABS_X')))
    

def fire_nerf_dart(commandInput, commandValue):
	# Function to fire Nerf dart gun on the robot
	print('Fire Nerf Dart -> {} Value -> {}'.format(commandInput, commandValue))


def led_beacon(commandInput, commandValue):
	# Function to switch led beacon on/off on the robot
	print('Switch LED Beacon -> {} Value -> {}'.format(commandInput, commandValue))

#-----------------------------------------------------------

# Dictionary of game controller buttons we want to include.
gamepadInputs = {'ABS_X': 128, 
				'ABS_RZ': 127, 
				'BTN_SOUTH': 0, 
				'BTN_WEST': 0,
				'BTN_START': 0}

# Initialise the gamepad object using the gamepad inputs Python package
gamepad = ThreadedInputs()

def main():
	""" Main entry point of this program """
	# Load the object with gamepad buttons we want to catch 
	for gamepadInput in gamepadInputs:
		gamepad.append_command(gamepadInput, gamepadInputs[gamepadInput])
	# Start the gamepad event update thread
	gamepad.start()

	while 1:
		#timeCheck = time.time()
		# Get the next gamepad button event
		commandInput, commandValue = gamepad.read()
		# Gamepad button command filter
		if commandInput == 'ABS_X' or commandInput == 'ABS_RZ':
			# Drive and steering
			drive_control()
		elif commandInput == 'BTN_SOUTH':
			# Fire Nerf dart button for example
			fire_nerf_dart(commandInput, commandValue)
		elif commandInput == 'BTN_WEST':
			# Switch the LED Beacon for example
			led_beacon(commandInput, commandValue)
		elif commandInput == 'BTN_START':
			# Exit the while loop - this program is closing
			break 

		sleep(0.01)
		#print(commandInput, commandValue)
		#print(1/(time.time() - timeCheck))

	# Stop the gamepad thread and close this program
	gamepad.stop()
	exit()
	
	
#-----------------------------------------------------------

if __name__ == "__main__":
	""" This is executed when run from the command line """
	main()

Related Articles

Raspberry Pi Controller for Robot Control Review

Raspberry Pi Controller For Robot Control Review Link.

Robot Control with Raspberry Pi and Python

Robot Control with Raspberry Pi and Python – Link.

Buying Featured Items

The purchase price is going to vary greatly depending on how quickly you want the items. Therefore shop around checking out Amazon, Ebay, Adafruit and local electronic stores.

The Pi Hut

  • Raspberry Pi Compatible Wireless Gamepad / Controller – Link

UK Searches:

UK Amazon:

  • Raspberry Pi – Search
  • MicroSD Card – Search
  • Raspberry Pi Compatible Wi-Fi Dongle – Search

US Searches:

US Amazon:

  • Raspberry Pi – Search
  • MicroSD Card – Search
  • Raspberry Pi Compatible Wi-Fi Dongle – Search

On Closing

I hope you find this article useful – Multithreading Raspberry Pi Game Controller in Python, please like and share.

2 thoughts on “Multithreading Raspberry Pi Game Controller in Python”

Comments are closed.