Ep. 27 uWSGI Decorators

10 Apr 2019

uWSGI Decorators | Learning Flask Ep. 27

Many of you will be familiar with uWSGI and typically use it as an application or web server for your Python apps, à la Flask or Django.

But did you know that uWSGI has WAY more in store?

After spending more time with uWSGI and digging through the documentation, I've come to understand why it's called the uWSGI project...

Task queues, cron jobs, file/directory monitoring, threads, spools, locks, mules, timers & more.. All with a simple Python decorator!

The uWSGI functionality is vast and ranges from extremely low to high level, however in this guide I'm going to give you an introduction to some of the awesome decorators available in this package using Flask.

Installing uWSGI

Before installing uWSGI, I highly recommend you create a virtual environment in a new directory and activate it:

python -m venv env --prompt UWSGI
source env/bin/activate

Install Flask and uWSGI with pip:

pip install flask uwsgi

Flask skeleton

With our dependencies installed, we can build our Flask application skeleton in run.py:

from flask import Flask, request

app = Flask(__name__)

@app.route("/")
def index():
    return "Hello world"

if __name__ == "__main__":
    app.run()

uWSGI config file

We can start our app on the commend line with the uwsgi command as pass it some arguments, but for simplicity, we'll create a configuration file called app.ini:

[uwsgi]
strict = true
http = :8080

wsgi-file = run.py
callable = app

master = true
processes = 4
threads = 8

We're going to be running our app on localhost, port 8080.

Feel free to change the number of processes/threads to better match your machine specs. We're going to come back to this file shortly.

Running the app with uWSGI

To run the application with uWSGI, simply call the uwsgi command and pass it the name of the configuration file:

uwsgi app.ini

You should be able to go to http://localhost:8080/ and see Hello world in the browser.

To stop uWSGI, simply hit Ctrl + c in the terminal.

uWSGI decorators

uWSGI comes with a range of useful decorators which become available ONLY when running with uWSGI.

This means you're unable to use these decorators when running your application with the Flask development server.

So that we don't have to keep on stopping and starting our app from the command line, we can add something to our app.ini file to reload our app as we change it.

Note - This should only be used in development

Open up app.ini and add the following:

py-autoreload = 2

This will watch our application for changes every 2 seconds.

To access the uWSGI decorators, we need to import them. We'll import everything for now but feel free to only import the individual functions:

from uwsgidecorators import *

We'll also import time:

import time

@timer

The first decorator we're going to look at is timer.

This decorator allows us to execute a function at regular intervals:

@timer(3)
def three_seconds(num):
    print("3 seconds elapsed")

We set the interval in seconds by passing the number of seconds to the @timer decorator.

After running the app and waiting for 10 seconds or so, you'll see the following output in the terminal:

3 seconds elapsed
3 seconds elapsed
3 seconds elapsed
3 seconds elapsed
3 seconds elapsed
3 seconds elapsed

This function will keep on running at regular 3 second intervals for as long as your application is running.

@filemon

The filemon decorator will execute a function every time a file or directory is modified.

We're going to create a directory named log containing a file called test.log in the same directory as run.py and app.ini:

mkdir log
cd log
touch test.log

We'll create 2 decorated functions using the filemon decorator. One to watch a file and one to watch a directory:

@filemon("log/test.log")
def file_has_been_modified(name):
    print("test.log has been modified")

@filemon("log")
def directory_has_been_modified(name):
    print("The log directory has been modified")

With the app running, go ahead and edit test.log. You'll see output in the terminal:

test.log has been modified

Adding or removing a file/directory in the log directory will trigger the directory_has_been_modified function:

The log directory has been modified

@cron

The cron decorator allows us to easily register cron jobs.

We'll create a cron job to run every minute:

@cron(-1, -1, -1, -1, -1)
def cron_every_minute(num):
    print("Running cron on the minute")

And another cron job to run at 5:30 every day:

@cron(30, 17, -1, -1, -1)
def cron_at_five_thirty(num):
    print("it's 17:30pm. Time to go home!")

Fortunately for us, it's just turned 17:30pm and our cron job has just ran:

Running cron on the minute!
Running cron on the minute!
it's 17:30pm. Time to go home!
Running cron on the minute!

@mulefunc

Mules can be considered as a primitive task queue, allowing us to offload tasks to a mule to be executed in the background, allowing our application to return a response and have the mule handle the task.

Before we can use the mulefunc decorator, we need to declare it in app.ini:

mule = true

There's lots of interesting things we can do with mules, however in this example we're just going to create the one. Read more about mules here

To create a mule function, decorate a function with the @mulefunc decorator, passing any arguments into the function itself.

We'll create a simple mulefunc that takes an integer as an argument:

@mulefunc
def mule(num):
    for i in range(num):
        print(i)
        time.sleep(1)

We'll also create a new route in our app to trigger the mule:

@app.route("/mule")
def add_mule():
    num = request.args.get("n", None)
    if num:
        mule(int(num))
    return "Mule"

We can trigger the mulefunc by sending a query string in the URL with a value for n:

Sending a request to this url will return the text Mule immidiately, whilst the fucntion is executed in the background.

In the terminal, you'll see:

0
[pid: 7750|app: 0|req: 1/4] 127.0.0.1 () {40 vars in 913 bytes} [Wed Apr 10 17:45:41 2019] GET /mule?n=8 => generated 4 bytes in 2 msecs (HTTP/1.1 200) 2 headers in 78 bytes (1 switches on core 0)
1
2
3
4
5
6
7

@spool

The uWSGI spooler is a task queue/manager that works like many other popular task queue systems, allowing us to return a response whilst a task is offloaded to be processed in the background.

A spooler works by defining a directory that "spool files" are written to. Spool functions are then ran when the spooler finds a file in the directory.

As with mules, there's lots of advanced things you can do with spoolers and we're only going to cover the basics. To learn more, read the uWSGI spooler docs.

Spooling has a fev advantages over mules including:

  • Spooled tasks will be restarted/retried if uWSGI crashes or is stopped as task information is stored in files

  • Spooled tasks are not limited to a 64 kb parameter size

  • Spoolers offer generally more flexibility and configuration

To work with the spooler, we first need to create the "spool file". We'll call it tasks:

mkdir tasks

We then need to tell uWSGI about our spool file. We can do so in our app.ini file:

spooler = tasks

With the directory created and configuration file updated, we can use the @spool decorator.

We'll start by creating a basic spool that doesn't require any arguments when called:

@spool
def a_basic_spool_function(args):
    print(args)
    for _ in range(5):
        print("Spool task triggered with no args")
        time.sleep(0.5)

You'll notice we've passed args to the function, we'll cover that shortly.

We'll create a new route to trigger the spooler:

@app.route("/spool")
def add_spool():
    a_basic_spool_function.spool()
    return "Spooled"

You'll notice we're calling a_basic_spool_function.spool() without passing in any arguments.

Go to http://localhost:8080/spool to trigger the spooler and keep an eye on the terminal.

The value for args:

{'spooler_task_name': 'uwsgi_spoolfile_on_jnwt_8029_1_844293360_1554916124_156079', 'ud_spool_func': 'a_basic_spool_function', 'ud_spool_ret': '-2'}

Information about the request:

[pid: 8029|app: 0|req: 1/1] 127.0.0.1 () {40 vars in 908 bytes} [Wed Apr 10 18:08:44 2019] GET /spool => generated 7 bytes in 4 msecs (HTTP/1.1 200) 2 headers in 78 bytes (1 switches on core 0)

The spool function output:

Spool task triggered with no args
Spool task triggered with no args
Spool task triggered with no args
Spool task triggered with no args
Spool task triggered with no args

To pass arguments to a @spool function, we can add pass_arguments=True and pass in any values supported by the pickle module.

Let's create another function that takes an int as an argument. We'll use the /spool route to trigger it:

@spool(pass_arguments=True)
def background_task(num):
    print("Background task triggered with args")
    for i in range(num):
        print(i)
        time.sleep(1)
@app.route("/spool")
def add_spool():
    background_task.spool(5)
    return "Spooled"

Trigger the function by heading to /spool. You'll see in the terminal:

Background task triggered
0
1
2
3
4
[spooler /mnt/c/wsl/projects/pythonise/tutorials/flask_series/ep_26_uwsgi_decorators/decorators/tasks pid: 8555] done with task uwsgi_spoolfile_on_jnwt_8574_1_490298766_1554918176_882267 after 5 seconds

The route returned an immidiate response whilst our function was executed in the background.

We can in fact pass any kind of Python object to a spool function, providing they can be pickled:

@spool(pass_arguments=True)
def spool_with_args(*args, **kwargs):
    print(args)
    print(kwargs)
    print("Background task triggered")
    for i in range(5):
        print(i)
        time.sleep(1)

We can trigger the spooled function with:

@app.route("/spool")
def add_spool():
    spool_with_args.spool(name="uwsgi", data=["a", "b", "c"], dt=datetime.datetime.utcnow())
    return "Spooled"

Accessing the route will print the following:

()
{'name': 'uwsgi', 'data': ['a', 'b', 'c'], 'dt': datetime.datetime(2019, 4, 10, 20, 12, 54, 757350)}
Background task triggered
0
1
2
3
4

When passing arguments to a spooled function, some arguments have a special meaning and must be bytes:

  • spooler: specify the absolute path of the spooler that has to manage this task

  • at: unix time at which the task must be executed (the task will not be run until the at time is passed)

  • priority: this will be the subdirectory in the spooler directory in which the task will be placed, you can use that trick to give a good-enough prioritization to tasks (for better approach use multiple spoolers)

Spooler priority

One of the nice things about spoolers is the ability to set a simple priority queue, using numbers to indicate the priority.

Providing a priority argument will give order to the spooler parsing, creating numbered directories in your "spool file", each containing their respective tasks.

To setup a priority queue, we need to add a couple more options to our uWSGI ini config:

spooler-ordered = true
spooler-frequency = 3

Priority queues only work when spooler-ordered is enabled, allowing the spooler to scan the directories in alphabetical order (The spooler will do its best to maintain the priority order)

spooler-frequency isn't required, but will activate the spooler after n seconds if any tasks aren't executed.

For now, we'll just create a simple spool fucntion and call it from the /spool route:

@spool(pass_arguments=True)
def some_task(*args, **kwargs):
    print(args)
    print(kwargs)
    time.sleep(2)
    print(kwargs.get("name"), "done!")

In our route, we'll call the spool function 4 times, setting a priority for each call:

@app.route("/spool")
def add_spool():

    some_task.spool(name="No priority", data=[9, 7, 8])
    some_task.spool(priority=b"3", name="Priority 3", data={"foo": "bar"})
    some_task.spool(priority=b"2", name="Priority 2", data=["a", "b", "c"])
    some_task.spool(priority=b"1", name="Priority 1", data=[1, 2, 3])

    return "Spooled"

You'll notice we've provided the special priority parameter, with a binary version of the priority we wish to assign to the task, with descending priority.

When we request this route, the spool functions will be called and a directory will be created for each level of priority within the tasks directory (the "spool directory" we created earlier).

The spooler will do its best to run the spooled functions in order of priority, but it can't be guaranteed (from my initial testing)

Accessing this route, we see the following output:

{'name': 'Priority 3', 'data': {'foo': 'bar'}}
Priority 3 done!

{'name': 'No priority', 'data': [9, 7, 8]}
No priority done!

{'name': 'Priority 1', 'data': [1, 2, 3]}
Priority 1 done!

{'name': 'Priority 2', 'data': ['a', 'b', 'c']}
Priority 2 done!

Not quite in the order or priority, but I'm sure there's something I'm missing (this was just after some initial testing)

Another area I'm having mixed results is with the spool function return values.

Looking through the documentation, we have an option to return 3 values:

  • uwsgi.SPOOL_OK - The task has been completed and the spool file will be removed

  • uwsgi.SPOOL_RETRY - Something went wrong and the task will be retried in the next spooler iteration

  • uwsgi.SPOOL_IGNORE - Ignore the task

My initial testing and thoughts:

import uwsgi

@spool(pass_arguments=True)
def spool_ok(*args, **kwargs):
    time.sleep(2)
    print("Spool OK")
    return uwsgi.SPOOL_OK

@spool(pass_arguments=True)
def spool_retry(*args, **kwargs):
    time.sleep(2)
    print("Spool retry")
    return uwsgi.SPOOL_RETRY

@spool(pass_arguments=True)
def spool_ignore(*args, **kwargs):
    time.sleep(2)
    print("Spool ignored")
    return uwsgi.SPOOL_IGNORE

My idea was to call each spool function, expecting the spool file for spool_retry to remain in the spool file:

@app.route("/spool")
def add_spool():
    spool_ok.spool(id="1")
    spool_retry.spool(id="2")
    spool_ignore.spool(id="3")
    return "Spooled"

However after missing something in the documentation, I found out that we can use the @spoolraw decorator to control the return values of a spool!

@spoolraw

To control the return value of a spool, we can use the spoolraw decorator, returning 3 possible values:

  • uwsgi.SPOOL_OK - The task has been completed and the spool file will be removed

  • uwsgi.SPOOL_RETRY - Something went wrong and the task will be retried in the next spooler iteration

  • uwsgi.SPOOL_IGNORE - Ignore the task - If multiple languages are loaded in the instance all of them will fight for managing the task. This return values allows you to skip a task in specific languages

Let's re-run the same tests as above using the spoolraw decorator:

@spoolraw(pass_arguments=True)
def spool_ok(*args, **kwargs):
    time.sleep(2)
    print("Spool OK")
    return uwsgi.SPOOL_OK

@spoolraw(pass_arguments=True)
def spool_retry(*args, **kwargs):
    time.sleep(2)
    print("Spool retry")
    return uwsgi.SPOOL_RETRY

@spoolraw(pass_arguments=True)
def spool_ignore(*args, **kwargs):
    time.sleep(2)
    print("Spool ignored")
    return uwsgi.SPOOL_IGNORE

Calling the functions:

@app.route("/spool")
def add_spool():
    spool_ok.spool(id="1")
    spool_retry.spool(id="2")
    spool_ignore.spool(id="3")
    return "Spooled"

And now, as expected:

  • spool_ok - Ran succesfully and the spool file was removed

  • spool_retry - Ran but returned the retry signal. The spool file was kept and retried every 3 seconds (the spooler-frequency we set in the ini file)

  • spool_ignore - Was ignored and the spool file remained, producing the following output every 3 seconds

Spool ignored
unable to find the spooler function, have you loaded it into the spooler process ?

Which makes sense as we told uWSGI to ignore it.

These options make it easy for us to retry a task if a condition isn't met or there's an exception in the function, for example:

@spoolraw
def critical_function(path):
    try:
        compress_images(path)
    except Exception as e:
        log_error(e)
        return uwsgi.SPOOL_RETRY
    return uwsgi.SPOOL_OK

@spoolforever

Need a function to run forever? use the @spoolforever decorator.

@spoolforever
def forever_and_ever(args):
    print(args)
    for i in range(10):
        print(i)
        time.sleep(0.5)

Calling it from our route:

@app.route("/spool")
def add_spool():
    forever_and_ever.spool()
    return "Spooled"

The forever_and_ever function will now run forever, even after stopping and starting the application.

If you need to remove a spoolforever task, you'll have to delete the spool file found in the spool folder.

@thread

The thread decorator can be used to execute a function in a separate thread.

To enable threading, you must add it as an option in your ini file or pass it to uwsgi as an argument on the cli:

enable-threads = true
# or
threads = <n>

If you're following along, we already set a value for threads in app.ini.

Let's decorate 3 functions with the @thread decorator and call them from the index route:

@thread
def thread_func_a():
    for i in range(5):
        print("Thread a running")

@thread
def thread_func_b():
    for i in range(5):
        print("Thread b running")

@thread
def thread_func_c(adj):
    for i in range(5):
        print("Thread c running with args ", adj)

We'll call the functions from the index route:

@app.route("/")
def index():
    thread_func_a()
    thread_func_b()
    thread_func_c("AWESOME!")
    return "Hello world"

Upon requesting the route, we see the following output:

Thread a running
Thread a running
Thread b running
Thread c running with args  AWESOME!
Thread c running with args  AWESOME!
Thread c running with args  AWESOME!
Thread b running
Thread c running with args  AWESOME!
Thread c running with args  AWESOME!
Thread b running
Thread b running

@postfork

The postfork decorator allows us to decorate functions that will be executed when uWSGI forks the application.

From the uWSGI docs:

"uWSGI is a preforking (or “fork-abusing”) server, so you might need to execute a fixup task after each fork(). The postfork decorator is just the ticket. You can declare multiple postfork tasks. Each decorated function will be executed in sequence after each fork()."

For example, you may want to reconnect to a database after forking:

@postfork
def connect_to_db():
    # Connect to the database after fork
    db.connect()
    print("Connected to db after fork")

Any functions decorated with @postfork will be executed sequentially. Let's add another one:

@postfork
def second_postfork_func():
    print("Running second postfork function")

When we first startup our app, uWSGI will fork based on how many processes we set in the ini file:

Connected to db after fork
Running second postfork function
spawned uWSGI worker 1 (pid: 11087, cores: 8)
Connected to db after fork
Running second postfork function
spawned uWSGI worker 2 (pid: 11096, cores: 8)
Connected to db after fork
Running second postfork function
spawned uWSGI worker 3 (pid: 11105, cores: 8)
Connected to db after fork
Running second postfork function
spawned uWSGI worker 4 (pid: 11114, cores: 8)

@lock

The lock decorator will execute a function in a fully locked environment.

From the uWSGI docs:

"This decorator will execute a function in fully locked environment, making it impossible for other workers or threads (or the master, if you’re foolish or brave enough) to run it simultaneously."

To create a lock function, simply decorate it with @lock:

@lock
def locked_function():
    print("Concurrency is for fools!")

We'll call it from the index route:

@app.route("/")
def index():
    locked_function()
    return "Hello world"

Requesting the index route, we see:

Concurrency is for fools!

To better illustrate the @lock decorator, we can combine it with the @timer decorator:

@timer(2)
@lock
def locked_function(num):
    print("Concurrency is for fools!", time.time())

locked_function as expected will run every 2 seconds and print Concurrency is for fools! to the terminal:

Concurrency is for fools! 1554936572.0891757
Concurrency is for fools! 1554936574.0893521
Concurrency is for fools! 1554936576.08939
Concurrency is for fools! 1554936578.0899732
Concurrency is for fools! 1554936580.0901859

However, if we modify the function to include a delay:

@timer(2)
@lock
def locked_function(num):
    time.sleep(4)
    print("Concurrency is for fools!", time.time())

The timer will attemp to run locked_function every 2 seconds, but due to the @lock decorator and addition of adding a 4 second delay, the funcion is not ran and instead has to wait for the delay to finish.

We can see this is the terminal output:

Concurrency is for fools! 1554936725.4628816
Concurrency is for fools! 1554936729.4640946
Concurrency is for fools! 1554936733.4651072
Concurrency is for fools! 1554936737.4664443
Concurrency is for fools! 1554936741.4673338

If you have a function that must not be called by any other process, @lock is your friend.

Other decorators

Some other interesting decorators, out of the scope of this guide include:

  • @hakari(n) - kill a worker if the given call is taking too long

  • @rpc('x')- Used for remotely calling functions using the uWSGI RPC stack

  • @signal(n) - Registers signals for the uWSGI signal framework

Be sure to read the uWSGI decorator docs here

Wrapping up

This guide was just to introduce you to some of the useful decorators available in uWSGI and I highly recommend you check out the documentation, have a play around and do some testing for yourself.

Also, you may want to check out this awesome package/repo for working with many of the uWSGI tasks:

Last modified · 10 Apr 2019 Reference : https://pythonise.com/series/learning-flask/exploring-uwsgi-decorators

Last updated