API Documentation

JUG: Coarse Level Parallelisation for Python

The main use of jug is from the command line:

jug status jugfile.py
jug execute jugfile.py

Where jugfile.py is a Python script using the jug library.

class jug.Task(f, dep0, dep1, ..., kw_arg0=kw_val0, kw_arg1=kw_val1, ...)

Defines a task, which will call:

f(dep0, dep1,..., kw_arg0=kw_val0, kw_arg1=kw_val1, ...)

See also

TaskGenerator
function

Attributes

result Result value
store  

Methods

can_load() Returns whether result is available.
can_run() Returns true if all the dependencies have their results available.
dependencies() for dep in task.dependencies():
hash() Returns the hash for this task.
invalidate() Equivalent to t.store.remove(t.hash()).
is_loaded() Returns True if the task is already loaded
is_locked() Note that only calling lock() and checking the result atomically checks for the lock().
load() Loads the results from the storage backend.
lock() Tries to lock the task for the current process.
run([force, save]) Performs the task.
unload() Unload results (can be useful for saving memory).
unload_recursive() Equivalent to:
unlock() Releases the lock.
value()
can_load()

Returns whether result is available.

can_run()

Returns true if all the dependencies have their results available.

dependencies()
for dep in task.dependencies():

Iterates over all the first-level dependencies of task t

Parameters:

self : Task

Returns:

deps : generator

A generator over all of self’s dependencies

See also

recursive_dependencies
retrieve dependencies recursively
hash()

Returns the hash for this task.

The results are cached, so the first call can be much slower than subsequent calls.

invalidate()

Equivalent to t.store.remove(t.hash()). Useful for interactive use (i.e., in jug shell mode).

is_loaded()

Returns True if the task is already loaded

is_locked()

Note that only calling lock() and checking the result atomically checks for the lock(). This function can be much faster, though, and, therefore is sometimes useful.

Returns:

is_locked : boolean

Whether the task appears to be locked.

See also

lock
create lock
unlock
destroy lock
load()

Loads the results from the storage backend.

This function always loads from the backend even if the task is already loaded. You can use is_loaded as a check if you want to avoid this behaviour.

Returns:Nothing
lock()

Tries to lock the task for the current process.

Returns True if the lock was acquired. The correct usage pattern is:

locked = task.lock()
if locked:
    task.run()
else:
    # someone else is already running this task!

Not that using can_lock() can lead to race conditions. The above is the only fully correct method.

Returns:

locked : boolean

Whether the lock was obtained.

result

Result value

run(force=False, save=True)

Performs the task.

Parameters:

force : boolean, optional

if true, always run the task (even if it ran before) (default: False)

save : boolean, optional

if true, save the result to the store (default: True)

unload()

Unload results (can be useful for saving memory).

unload_recursive()

Equivalent to:

for tt in recursive_dependencies(t): tt.unload()
unlock()

Releases the lock.

If the lock was not held, this may remove another thread’s lock!

class jug.Tasklet(base, f)

A Tasklet is a light-weight Task.

It looks like a Task, behaves like a Task, but its results are not saved in the backend.

It is useful for very simple functions and is automatically generated on subscripting a Task object:

t = Task(f, 1)
tlet = t[0]

tlet will be a Tasklet

See also

Task

Methods

can_load()
dependencies()
unload()
unload_recursive()
value()
class jug.TaskGenerator(f)

@TaskGenerator def f(arg0, arg1, …)

Turns f from a function into a task generator.

This means that calling f(arg0, arg1) results in: Task(f, arg0, arg1). This can make your jug-based code feel very similar to what you do with traditional Python.

Methods

__call__(*args, **kwargs)
class jug.iteratetask(base, n)

Examples:

a,b = iteratetask(task, 2)
for a in iteratetask(task, n):
    ...

This creates an iterator that over the sequence task[0], task[1], ..., task[n-1].

Parameters:

task : Task(let)

n : integer

Returns:

iterator

jug.value(obj)

Loads a task object recursively. This correcly handles lists, dictonaries and eny other type handled by the tasks themselves.

Parameters:

obj : object

Anything that can be pickled or a Task

Returns:

value : object

The result of the task obj

jug.CachedFunction(f, *args, **kwargs)

is equivalent to:

task = Task(f, *args, **kwargs)
if not task.can_load():
    task.run()
value = task.value()

That is, it calls the function if the value is available, but caches the result for the future.

You can often use bvalue to achieve similar results:

task = Task(f, *args, **kwargs)
value = bvalues(task)

This alternative method is more flexible, but will only be execute lazily. In particular, a jug status will not see past the bvalue call until jug execute is called to execute f, while a CachedFunction object will always execute.

Parameters:

f : function

Any function except unnamed (lambda) functions

Returns:

value : result

Result of calling f(*args,**kwargs)

See also

bvalue
function An alternative way to achieve similar results to CachedFunction(f) is using bvalue.
jug.CompoundTask(f, *args, **kwargs)

f should be such that it returns a Task, which can depend on other Tasks (even recursively).

If this cannot been loaded (i.e., has not yet been run), then this becomes equivalent to:

f(*args, **kwargs)

However, if it can, then we get a pseudo-task which returns the same value without f ever being executed.

Parameters:f : function returning a jug.Task
Returns:task : jug.Task
jug.CompoundTaskGenerator(f)

@CompoundTaskGenerator def f(arg0, arg1, …)

Turns f from a function into a compound task generator.

This means that calling f(arg0, arg1) results in: CompoundTask(f, arg0, arg1)

See also

TaskGenerator

jug.barrier()

In a jug file, it assures that all tasks defined up to now have been completed. If not, parsing will (temporarily) stop at that point.

This ensures that, after calling barrier() you are free to call value() to get any needed results.

See also

bvalue
function Restricted version of this function. Often faster
jug.bvalue(t)

Named after barrier``+``value, value = bvalue(t) is similar to:

barrier()
value = value(t)

except that it only checks that t is complete (and not all tasks) and thus can be much faster than a full barrier() call.

Thus, bvalue stops interpreting the Jugfile if its argument has not run yet. When it has run, then it returns its value.

See also

barrier
Checks that all tasks have results available.
jug.set_jugdir(jugdir)

Sets the jugdir. This is the programmatic equivalent of passing --jugdir=... on the command line.

Parameters:jugdir : str
Returns:store : a jug backend
jug.init(jugfile={'jugfile'}, jugdir={'jugdata'}, on_error='exit', store=None)

Initializes jug (create backend connection, …). Imports jugfile

Parameters:

jugfile : str, optional

jugfile to import (default: ‘jugfile’)

jugdir : str, optional

jugdir to use (could be a path)

on_error : str, optional

What to do if import fails (default: exit)

store : storage object, optional

If used, this is returned as store again.

Returns:

store : storage object

jugspace : dictionary

jug.is_jug_running()

Returns True if this script is being executed by jug instead of regular Python

Task: contains the Task class.

This is the main class for using jug.

There are two main alternatives:

  • Use the Task class directly to build up tasks, such as Task(function, arg0, ...).
  • Rely on the TaskGenerator decorator as a shortcut for this.
class jug.task.Task(f, dep0, dep1, ..., kw_arg0=kw_val0, kw_arg1=kw_val1, ...)

Defines a task, which will call:

f(dep0, dep1,..., kw_arg0=kw_val0, kw_arg1=kw_val1, ...)

See also

TaskGenerator
function

Attributes

result Result value
store  

Methods

can_load() Returns whether result is available.
can_run() Returns true if all the dependencies have their results available.
dependencies() for dep in task.dependencies():
hash() Returns the hash for this task.
invalidate() Equivalent to t.store.remove(t.hash()).
is_loaded() Returns True if the task is already loaded
is_locked() Note that only calling lock() and checking the result atomically checks for the lock().
load() Loads the results from the storage backend.
lock() Tries to lock the task for the current process.
run([force, save]) Performs the task.
unload() Unload results (can be useful for saving memory).
unload_recursive() Equivalent to:
unlock() Releases the lock.
value()
can_load()

Returns whether result is available.

can_run()

Returns true if all the dependencies have their results available.

dependencies()
for dep in task.dependencies():

Iterates over all the first-level dependencies of task t

Parameters:

self : Task

Returns:

deps : generator

A generator over all of self’s dependencies

See also

recursive_dependencies
retrieve dependencies recursively
hash()

Returns the hash for this task.

The results are cached, so the first call can be much slower than subsequent calls.

invalidate()

Equivalent to t.store.remove(t.hash()). Useful for interactive use (i.e., in jug shell mode).

is_loaded()

Returns True if the task is already loaded

is_locked()

Note that only calling lock() and checking the result atomically checks for the lock(). This function can be much faster, though, and, therefore is sometimes useful.

Returns:

is_locked : boolean

Whether the task appears to be locked.

See also

lock
create lock
unlock
destroy lock
load()

Loads the results from the storage backend.

This function always loads from the backend even if the task is already loaded. You can use is_loaded as a check if you want to avoid this behaviour.

Returns:Nothing
lock()

Tries to lock the task for the current process.

Returns True if the lock was acquired. The correct usage pattern is:

locked = task.lock()
if locked:
    task.run()
else:
    # someone else is already running this task!

Not that using can_lock() can lead to race conditions. The above is the only fully correct method.

Returns:

locked : boolean

Whether the lock was obtained.

result

Result value

run(force=False, save=True)

Performs the task.

Parameters:

force : boolean, optional

if true, always run the task (even if it ran before) (default: False)

save : boolean, optional

if true, save the result to the store (default: True)

unload()

Unload results (can be useful for saving memory).

unload_recursive()

Equivalent to:

for tt in recursive_dependencies(t): tt.unload()
unlock()

Releases the lock.

If the lock was not held, this may remove another thread’s lock!

class jug.task.Tasklet(base, f)

A Tasklet is a light-weight Task.

It looks like a Task, behaves like a Task, but its results are not saved in the backend.

It is useful for very simple functions and is automatically generated on subscripting a Task object:

t = Task(f, 1)
tlet = t[0]

tlet will be a Tasklet

See also

Task

Methods

can_load()
dependencies()
unload()
unload_recursive()
value()
jug.task.recursive_dependencies(t, max_level=-1)
for dep in recursive_dependencies(t, max_level=-1):

Returns a generator that lists all recursive dependencies of task

Parameters:

t : Task

input task

max_level : integer, optional

Maximum recursion depth. Set to -1 or None for no recursion limit.

Returns:

deps : generator

A generator over all dependencies

class jug.task.TaskGenerator(f)

@TaskGenerator def f(arg0, arg1, …)

Turns f from a function into a task generator.

This means that calling f(arg0, arg1) results in: Task(f, arg0, arg1). This can make your jug-based code feel very similar to what you do with traditional Python.

Methods

__call__(*args, **kwargs)
class jug.task.iteratetask(base, n)

Examples:

a,b = iteratetask(task, 2)
for a in iteratetask(task, n):
    ...

This creates an iterator that over the sequence task[0], task[1], ..., task[n-1].

Parameters:

task : Task(let)

n : integer

Returns:

iterator

jug.task.value(obj)

Loads a task object recursively. This correcly handles lists, dictonaries and eny other type handled by the tasks themselves.

Parameters:

obj : object

Anything that can be pickled or a Task

Returns:

value : object

The result of the task obj

mapreduce: Build tasks that follow a map-reduce pattern.

jug.mapreduce.mapreduce(reducer, mapper, inputs, map_step=4, reduce_step=8)

Create a task that does roughly the following:

reduce(reducer, map(mapper, inputs))

Roughly because the order of operations might be different. In particular, reducer should be a true reducer functions (i.e., commutative and associative).

Parameters:

reducer : associative, commutative function

This should map

Y_0,Y_1 -> Y’

mapper : function from X -> Y

inputs : list of X

map_step : integer, optional

Number of mapping operations to do in one go. This is what defines an inner task. (default: 4)

reduce_step : integer, optional

Number of reduce operations to do in one go. (default: 8)

Returns:

task : jug.Task object

jug.mapreduce.map(mapper, sequence, map_step=4)

sequence’ = map(mapper, sequence, map_step=4)

Roughly equivalent to:

sequence' = [Task(mapper,s) for s in sequence]

except that the tasks are grouped in groups of map_step

Parameters:

mapper : function

function from A -> B

sequence : list of A

map_step : integer, optional

nr of elements to process per task. This should be set so that each task takes the right amount of time.

Returns:

sequence’ : list of B

sequence’[i] = mapper(sequence[i])

See also

mapreduce

currymap
function Curried version of this function
jug.mapreduce.reduce(reducer, inputs, reduce_step=8)
Parameters:

reducer : associative, commutative function

This should map

Y_0,Y_1 -> Y’

inputs : list of X

reduce_step : integer, optional

Number of reduce operations to do in one go. (default: 8)

Returns:

task : jug.Task object

See also

mapreduce

jug.compound.CompoundTask(f, *args, **kwargs)

f should be such that it returns a Task, which can depend on other Tasks (even recursively).

If this cannot been loaded (i.e., has not yet been run), then this becomes equivalent to:

f(*args, **kwargs)

However, if it can, then we get a pseudo-task which returns the same value without f ever being executed.

Parameters:f : function returning a jug.Task
Returns:task : jug.Task
jug.compound.CompoundTaskGenerator(f)

@CompoundTaskGenerator def f(arg0, arg1, …)

Turns f from a function into a compound task generator.

This means that calling f(arg0, arg1) results in: CompoundTask(f, arg0, arg1)

See also

TaskGenerator

jug.compound.compound_task_execute(x, h)

This is an internal function. Do not use directly.

jug.utils.timed_path(path)

Returns a Task object that simply returns path with the exception that it uses the paths mtime (modification time) and the file size in the hash. Thus, if the file is touched or changes size, this triggers an invalidation of the results (which propagates to all dependent tasks).

Parameters:

ipath : str

A filesystem path

Returns:

opath : str

A task equivalent to (lambda: ipath).

jug.utils.identity(x)

identity implements the identity function as a Task (i.e., value(identity(x)) == x)

This seems pointless, but if x is, for example, a very large list, then using this function might speed up some computations. Consider:

large = list(range(100000))
large = jug.utils.identity(large)
for i in range(100):
    Task(process, large, i)

This way the list large is going to get hashed just once. Without the call to jug.utils.identity, it would get hashed at each loop iteration.

https://jug.readthedocs.io/en/latest/utilities.html#identity

Parameters:x : any object
Returns:x : x
class jug.utils.CustomHash(obj, hash_function)

Set a custom hash function

This is an advanced feature and you can shoot yourself in the foot with it. Make sure you know what you are doing. In particular, hash_function should be a strong hash: hash_function(obj0) == hash_function(obj1) is taken to imply that obj0 == obj1. The hash function should return a bytes object.

You can use the helpers in the jug.hash module (in particular hash_one) to help you. The implementation of timed_path is a good example of how to use a CustomHash:

def hash_with_mtime_size(path):
    from .hash import hash_one
    st = os.stat_result(os.stat(path))
    mtime = st.st_mtime
    size = st.st_size
    return hash_one((path, mtime, size))

def timed_path(path):
    return CustomHash(path, hash_with_mtime_size)

The path object (a string or bytes) is wrapped with a hashing function which checks the file value.

Parameters:

obj : any object

hash_function : function

This should take your object and return a str

jug.utils.sync_move(src, dst)

Sync the file and move it

This ensures that the move is truly atomic

Parameters:

src : filename

Source file

dst: filename

Destination file

jug.hooks.exit_checks.exit_after_n_tasks(n)

Exit after a specific number of tasks have been executed

Parameters:

n : int

Number of tasks to execute

jug.hooks.exit_checks.exit_after_time(hours=0, minutes=0, seconds=0)

Exit after a specific number of tasks have been executed

Note that this only checks the time after each task has finished executing. Thus if you are using this to limit the amount of time each process takes, make sure to specify a lower limit than what is needed.

Parameters:

hours : number, optional

minutes : number, optional

seconds : number, optional

jug.hooks.exit_checks.exit_env_vars(environ={os.environ})

Set exit markers based on the environment.

The following variables are used if they are set (if they are not set, they are ignored).

JUG_MAX_TASKS: Maximum nr. of tasks.

JUG_MAX_HOURS: Maximum hours JUG_MAX_MINUTES: Maximum minutes JUG_MAX_SECONDS: Maximum seconds

For the time based limits, see the comment on exit_after_time on how these limits are not strict as they are only checked after each task completion event.

If either of the variables above is set, its value should be an int or an error will be raised.

JUG_EXIT_IF_FILE_EXISTS: Set exit file name

jug.hooks.exit_checks.exit_if_file_exists(fname)

Before each task execute, check if file exists. If so, exit.

Note that a check is only performed before a Task is execute. Thus, jug will not exit immediately if it is executing another long-running task.

Parameters:

fname : str

path to check

jug.hooks.exit_checks.exit_when_true(f, function_takes_Task=False)

Generic exit check.

After each task, call function f and exit if it return true.

Parameters:

f : function

Function to call

function_takes_Task : boolean, optional

Whether to call the function with the task just executed (default: False)