If you are looking to relieve pressure from your main application the use of background workers with a Queue system works really well.
In this tutorial, we will create a simple implementation of AWS SQS that you can easily use with existing python code. At the end, you should be able to put a python function as job to queue and have a background worker process the job.
The process may seem tedious but once we set it up, we can then insert any python function into the queue as jobs and the worker will execute any of them so please bear with me!
Here are some use cases that may benefit with queue system:
High-Traffic Websites: During traffic surges, websites can become sluggish or even crash. Queued processing ensures tasks like data writing or notifications are handled in the background, maintaining site responsiveness.
E-commerce Transactions: During sales or peak hours, e-commerce platforms witness a surge in transaction requests. By queuing payment processing tasks, the system can handle each transaction systematically, reducing errors and improving user experience.
Batch Processing: Handling vast amounts of data at once can be resource-intensive. Breaking down large tasks into smaller queued jobs ensures efficient processing without overloading the system.
Real-time Analytics: Analyzing high-volume real-time data can strain the main application. Queues can offload data analytics tasks to background workers, ensuring the primary system remains nimble while still deriving actionable insights.
Non Response Essential Writes: A lot of times, an api call may include writes to multiple table or multiple record insertion that is really not required in the response. By moving these write actions or queries to the queue, your api response can be drastically improved.
If your use case falls within the above scenarios or is related or you really think would benefit from queue then Let's get started!
The python file here will server as our global client to send/receive queue messages
pip install boto3
import boto3sqs_client = boto3.client('sqs',aws_access_key_id='ACCESS_KEY',aws_secret_access_key='SECRET_ACCESS_KEY',)QUEUE_URL = 'SQS_QUEUE_URL' # Replace with the URL of your SQS queue
Here we will create an IAM user that will give us credentials to be used by the application to access the queue
Now we will create the worker file and the functions that we want to enqueue. For simplicity purposes, we defined all functions we want to enqueue together with the queue process function so that it both resides on same namespace and have the worker access the function easily
import jsonfrom queue_client import sqs_client, QUEUE_URL# a simple function that prints whatever message we give itdef display_message(message):print(f'your message is: {message}')# a simple function that takes the sum of two valuesdef add_values(value_1, value_2):print(f'your sum is {value_1 + value_2}')# this function converts the function name into python function object inside the global scope of this file/module# there are other ways to do it but this is the simplest I can think of to run function based on its namedef execute_function_by_name(func_name, *args, **kwargs):if func_name in globals() and callable(globals()[func_name]):print(func_name)return globals()[func_name](*args, **kwargs)else:return f"No function named '{func_name}' found."# this function will fetch data from queue and execution the function with given parametersdef worker():# infinte loop to keep on fetching from queue and process itwhile True:# fetch messagesresponse = sqs_client.receive_message(QueueUrl=QUEUE_URL,MaxNumberOfMessages=10, # adjust as neededWaitTimeSeconds=10, # adjust as neededMessageAttributeNames=['All'],)# process each messagefor message in response.get('Messages', []):print(f"\r\nReceived message: function: {message['Body']} :: parameters: {message['MessageAttributes']}")# get the function namefunction_name = message['MessageAttributes']['function']['StringValue']kwargs = json.loads(message['MessageAttributes']['kwargs']['StringValue'])print(function_name)print(kwargs)execute_function_by_name(function_name, **kwargs)sqs_client.delete_message(QueueUrl=QUEUE_URL,ReceiptHandle=message['ReceiptHandle'])else:print("No messages to process.")if __name__ == "__main__":worker()
Here we will create functions that can insert python functions and its parameters as jobs to the queue
import jsonfrom queue_client import sqs_client, QUEUE_URLfrom worker import display_message, add_values # notice that we imported the function from the worker, this is so we can import its reference# this function will insert a function as message into the queuedef enqueue(function_name, kwargs={}):response = sqs_client.send_message(QueueUrl=QUEUE_URL,MessageBody=function_name.__name__,MessageAttributes={'function': {'DataType': 'String','StringValue': function_name.__name__},'kwargs': {'DataType': 'String','StringValue': json.dumps(kwargs)}},)print(f'{function_name} added to queue')return response['MessageId']if __name__ == "__main__":# insert the display message as jobenqueue(display_message, kwargs={"message": "hello"})# insert the add values as jobenqueue(add_values, kwargs={"value_1": 1, "value_2": 2})
python enqueue.py
python worker.py
That's it! We were able to create a python queue service with AWS SQS. Using this as baseline, we can keep adding more functions that we can simply insert into the queue using the enqueue function and the worker will always process it
Stay ahead of the curve with our cutting-edge tech guides, providing expert insights and knowledge to empower your tech journey.
Subscribe to get updated on latest and relevant career opportunities