Addressing Performance Bottlenecks: Implementing SQS + Python for Efficient Queue-Worker Services

Nic Lasdoce
16 Oct 20235 minutes read

As software systems scale with surging data and demanding tasks, performance bottlenecks and system crashes become real challenges. This article shows a baseline system to solve these issues using AWS's Simple Queue Service (SQS) + Python as a solution that uses python functions that will be performed as background jobs that runs separately from main application.

Introduction

If you are looking to relieve pressure from your main application the use of background workers with a Queue system works really well.

In this tutorial, we will create a simple implementation of AWS SQS that you can easily use with existing python code. At the end, you should be able to put a python function as job to queue and have a background worker process the job.

The process may seem tedious but once we set it up, we can then insert any python function into the queue as jobs and the worker will execute any of them so please bear with me!

Use Cases

Here are some use cases that may benefit with queue system:

  1. High-Traffic Websites: During traffic surges, websites can become sluggish or even crash. Queued processing ensures tasks like data writing or notifications are handled in the background, maintaining site responsiveness.

  2. E-commerce Transactions: During sales or peak hours, e-commerce platforms witness a surge in transaction requests. By queuing payment processing tasks, the system can handle each transaction systematically, reducing errors and improving user experience.

  3. Batch Processing: Handling vast amounts of data at once can be resource-intensive. Breaking down large tasks into smaller queued jobs ensures efficient processing without overloading the system.

  4. Real-time Analytics: Analyzing high-volume real-time data can strain the main application. Queues can offload data analytics tasks to background workers, ensuring the primary system remains nimble while still deriving actionable insights.

  5. Non Response Essential Writes: A lot of times, an api call may include writes to multiple table or multiple record insertion that is really not required in the response. By moving these write actions or queries to the queue, your api response can be drastically improved.

If your use case falls within the above scenarios or is related or you really think would benefit from queue then Let's get started!

Queue Client Configuration

Queue connection file

The python file here will server as our global client to send/receive queue messages

  1. Install the aws sdk
pip install boto3
  1. Create a file named queue_client.py and paste the following content:
import boto3
sqs_client = boto3.client(
'sqs',
aws_access_key_id='ACCESS_KEY',
aws_secret_access_key='SECRET_ACCESS_KEY',
)
QUEUE_URL = 'SQS_QUEUE_URL' # Replace with the URL of your SQS queue
  1. Proceed to next step to get the SQS_QUEUE_URL

Prepare IAM User

Here we will create an IAM user that will give us credentials to be used by the application to access the queue

  1. Go to https://us-east-1.console.aws.amazon.com/iamv2/home?region=us-east-1#/users and click "Create User"
  2. Under username let's add "queue_user". Remember the queue_user as we will use it later in setup of queue
  3. Click Next until you reach the end (skip other settings as we do not need it) then Create the user
  4. Now under the list of user select "queue_user"
  5. Go to security credentials and find "Access Keys"
  6. Click on Create access key and Select Command Line Interface
  7. Click Next then Create access Key then Download the .csv file
  8. Replace the ACCESS_KEY and SECRET_ACCESS_KEY on queue_client.py using the values shown on your screen now

Create SQS Queue

  1. Go to https://us-east-1.console.aws.amazon.com/sqs/v2/home?region=us-east-1#/queues
  2. Click on Create Queue
  3. Put "test" under name
  4. Scroll down to Access policy and choose Basic then select the "Only the specified AWS accounts, IAM users and roles" on both options
  5. Then under the text boxes below each of "Only the specified AWS accounts, IAM users and roles", copy the AWS ARN of the User we created above, you can see it under IAM -> Users -> queue_user -> Permissions -> Summary -> ARN
  6. Now create the Queue
  7. After creation copy the URL from the SQS Queue Details to the SQS_QUEUE_URL on the queue_client.py

Create the Worker

Now we will create the worker file and the functions that we want to enqueue. For simplicity purposes, we defined all functions we want to enqueue together with the queue process function so that it both resides on same namespace and have the worker access the function easily

  1. Create a file named worker.py
  2. Paste the code below
import json
from queue_client import sqs_client, QUEUE_URL
# a simple function that prints whatever message we give it
def display_message(message):
print(f'your message is: {message}')
# a simple function that takes the sum of two values
def add_values(value_1, value_2):
print(f'your sum is {value_1 + value_2}')
# this function converts the function name into python function object inside the global scope of this file/module
# there are other ways to do it but this is the simplest I can think of to run function based on its name
def execute_function_by_name(func_name, *args, **kwargs):
if func_name in globals() and callable(globals()[func_name]):
print(func_name)
return globals()[func_name](*args, **kwargs)
else:
return f"No function named '{func_name}' found."
# this function will fetch data from queue and execution the function with given parameters
def worker():
# infinte loop to keep on fetching from queue and process it
while True:
# fetch messages
response = sqs_client.receive_message(
QueueUrl=QUEUE_URL,
MaxNumberOfMessages=10, # adjust as needed
WaitTimeSeconds=10, # adjust as needed
MessageAttributeNames=['All'],
)
# process each message
for message in response.get('Messages', []):
print(f"\r\nReceived message: function: {message['Body']} :: parameters: {message['MessageAttributes']}")
# get the function name
function_name = message['MessageAttributes']['function']['StringValue']
kwargs = json.loads(message['MessageAttributes']['kwargs']['StringValue'])
print(function_name)
print(kwargs)
execute_function_by_name(function_name, **kwargs)
sqs_client.delete_message(
QueueUrl=QUEUE_URL,
ReceiptHandle=message['ReceiptHandle']
)
else:
print("No messages to process.")
if __name__ == "__main__":
worker()

Insert jobs to queue

Here we will create functions that can insert python functions and its parameters as jobs to the queue

  1. Create a file named enqueue.py
  2. Paste the following code into the file
import json
from queue_client import sqs_client, QUEUE_URL
from worker import display_message, add_values # notice that we imported the function from the worker, this is so we can import its reference
# this function will insert a function as message into the queue
def enqueue(function_name, kwargs={}):
response = sqs_client.send_message(
QueueUrl=QUEUE_URL,
MessageBody=function_name.__name__,
MessageAttributes={
'function': {
'DataType': 'String',
'StringValue': function_name.__name__
},
'kwargs': {
'DataType': 'String',
'StringValue': json.dumps(kwargs)
}
},
)
print(f'{function_name} added to queue')
return response['MessageId']
if __name__ == "__main__":
# insert the display message as job
enqueue(display_message, kwargs={"message": "hello"})
# insert the add values as job
enqueue(add_values, kwargs={"value_1": 1, "value_2": 2})

Test it out

  1. Go to terminal
  2. Insert the messages to queue
python enqueue.py
  1. Go to your sqs and you should now see Messages Available
  2. Open another terminal tab or window
  3. Now we will execute the jobs inside the queue
python worker.py
  1. You should see that it processed the jobs
  2. If you go back to the enqueue terminal and keep on running the enqueue.py, you can see that the worker will also process new jobs

Conclusion

That's it! We were able to create a python queue service with AWS SQS. Using this as baseline, we can keep adding more functions that we can simply insert into the queue using the enqueue function and the worker will always process it

Bonus

If you are a founder needing help in your Software Architecture or Cloud Infrastructure, we do free assessment and we will tell you if we can do it or not! Feel free to contact us at any of the following:
Social
Contact

Email: nic@triglon.tech

Drop a Message

Tags:
Software Development
TechStack
AWS
NodeJS

Nic Lasdoce

Software Architect

Unmasking Challenges, Architecting Solutions, Deploying Results

Member since Mar 15, 2021

Tech Hub

Unleash Your Tech Potential: Explore Our Cutting-Edge Guides!

Stay ahead of the curve with our cutting-edge tech guides, providing expert insights and knowledge to empower your tech journey.

View All
The Quest for MicroAgents: Loosely Coupled, Highly Cohesive (Part 2.3)
19 Nov 20242 minutes read
View All

Get The Right Job For You

Subscribe to get updated on latest and relevant career opportunities