API Documentation¶
JUG: Coarse Level Parallelisation for Python¶
The main use of jug is from the command line:
jug status jugfile.py
jug execute jugfile.py
Where jugfile.py
is a Python script using the jug
library.
- jug.CachedFunction(f, *args, **kwargs)¶
is equivalent to:
task = Task(f, *args, **kwargs) if not task.can_load(): task.run() value = task.value()
That is, it calls the function if the value is available, but caches the result for the future.
You can often use
bvalue
to achieve similar results:task = Task(f, *args, **kwargs) value = bvalues(task)
This alternative method is more flexible, but will only be execute lazily. In particular, a
jug status
will not see past thebvalue
call untiljug execute
is called to executef
, while aCachedFunction
object will always execute.- Parameters:
- ffunction
Any function except unnamed (lambda) functions
- Returns:
- valueresult
Result of calling
f(*args,**kwargs)
See also
bvalue
function An alternative way to achieve similar results to
CachedFunction(f)
is usingbvalue
.
- jug.CompoundTask(f, *args, **kwargs)¶
f should be such that it returns a Task, which can depend on other Tasks (even recursively).
If this cannot been loaded (i.e., has not yet been run), then this becomes equivalent to:
f(*args, **kwargs)
However, if it can, then we get a pseudo-task which returns the same value without f ever being executed.
- Parameters:
- ffunction returning a
jug.Task
- ffunction returning a
- Returns:
- taskjug.Task
- jug.CompoundTaskGenerator(f)¶
@CompoundTaskGenerator def f(arg0, arg1, …)
…
Turns f from a function into a compound task generator.
This means that calling
f(arg0, arg1)
results in:CompoundTask(f, arg0, arg1)
See also
- class jug.Task(f, dep0, dep1, ..., kw_arg0=kw_val0, kw_arg1=kw_val1, ...)¶
Defines a task, which will call:
f(dep0, dep1,..., kw_arg0=kw_val0, kw_arg1=kw_val1, ...)
See also
TaskGenerator
function
- Attributes:
result
Result value
- store
Methods
can_load
()Returns whether result is available.
can_run
()Returns true if all the dependencies have their results available.
for dep in task.dependencies():
fail
()Marks the task as failed
hash
()Returns the hash for this task.
Equivalent to
t.store.remove(t.hash())
.is_failed = t.is_failed()
Returns True if the task is already loaded
Note that only calling lock() and checking the result atomically checks for the lock().
load
()Loads the results from the storage backend.
lock
()Tries to lock the task for the current process.
run
([force, save, debug_mode])Performs the task.
unload
()Unload results (can be useful for saving memory).
Equivalent to.
unlock
()Releases the lock.
value
- can_load()¶
Returns whether result is available.
- can_run()¶
Returns true if all the dependencies have their results available.
- dependencies()¶
- for dep in task.dependencies():
…
Iterates over all the first-level dependencies of task t
- Parameters:
- selfTask
- Returns:
- depsgenerator
A generator over all of self’s dependencies
See also
recursive_dependencies
retrieve dependencies recursively
- fail()¶
Marks the task as failed
If the lock was not held, and exception will be raised
- hash()¶
Returns the hash for this task.
The results are cached, so the first call can be much slower than subsequent calls.
- invalidate()¶
Equivalent to
t.store.remove(t.hash())
. Useful for interactive use (i.e., injug shell
mode).
- is_failed()¶
is_failed = t.is_failed()
- Returns:
- is_failedboolean
Whether the task is in failed state.
- is_loaded()¶
Returns True if the task is already loaded
- is_locked()¶
Note that only calling lock() and checking the result atomically checks for the lock(). This function can be much faster, though, and, therefore is sometimes useful.
- Returns:
- is_lockedboolean
Whether the task appears to be locked.
- load()¶
Loads the results from the storage backend.
This function always loads from the backend even if the task is already loaded. You can use is_loaded as a check if you want to avoid this behaviour.
- Returns:
- Nothing
- lock()¶
Tries to lock the task for the current process.
Returns True if the lock was acquired. The correct usage pattern is:
locked = task.lock() if locked: task.run() else: # someone else is already running this task!
Not that using can_lock() can lead to race conditions. The above is the only fully correct method.
- Returns:
- lockedboolean
Whether the lock was obtained.
- property result¶
Result value
- run(force=False, save=True, debug_mode=False)¶
Performs the task.
- Parameters:
- forceboolean, optional
if true, always run the task (even if it ran before) (default: False)
- saveboolean, optional
if true, save the result to the store (default: True)
- debug_modeboolean, optional
whether to run in debug mode (adds extra checks)
- Returns:
- valreturn value from Task
- unload()¶
Unload results (can be useful for saving memory).
- unload_recursive()¶
Equivalent to:
for tt in recursive_dependencies(t): tt.unload()
- unlock()¶
Releases the lock.
If the lock was not held, this may remove another thread’s lock!
- class jug.TaskGenerator(f)¶
@TaskGenerator def f(arg0, arg1, …)
…
Turns f from a function into a task generator.
This means that calling
f(arg0, arg1)
results in:Task(f, arg0, arg1)
. This can make your jug-based code feel very similar to what you do with traditional Python.Methods
__call__
(*args, **kwargs)Call self as a function.
- class jug.Tasklet(base, f)¶
A Tasklet is a light-weight Task.
It looks like a Task, behaves like a Task, but its results are not saved in the backend.
It is useful for very simple functions and is automatically generated on subscripting a Task object:
t = Task(f, 1) tlet = t[0]
tlet
will be aTasklet
See also
- Attributes:
- base
- f
Methods
can_load
dependencies
unload
unload_recursive
value
- jug.barrier()¶
In a jug file, it assures that all tasks defined up to now have been completed. If not, parsing will (temporarily) stop at that point.
This ensures that, after calling
barrier()
you are free to callvalue()
to get any needed results.See also
bvalue
function Restricted version of this function. Often faster
- jug.bvalue(t)¶
Named after
barrier``+``value
,value = bvalue(t)
is similar to:barrier() value = value(t)
except that it only checks that t is complete (and not all tasks) and thus can be much faster than a full
barrier()
call.Thus,
bvalue
stops interpreting the Jugfile if its argument has not run yet. When it has run, then it returns its value.See also
barrier
Checks that all tasks have results available.
- jug.init(jugfile={'jugfile'}, jugdir={'jugdata'}, on_error='exit', store=None)¶
Initializes jug (create backend connection, …). Imports jugfile
- Parameters:
- jugfilestr, optional
jugfile to import (default: ‘jugfile’)
- jugdirstr, optional
jugdir to use (could be a path)
- on_errorstr, optional
What to do if import fails (default: exit)
- storestorage object, optional
If used, this is returned as
store
again.
- Returns:
- storestorage object
- jugspacedictionary
- jug.is_jug_running()¶
Returns True if this script is being executed by jug instead of regular Python
- class jug.iteratetask(base, n)¶
Examples:
a,b = iteratetask(task, 2) for a in iteratetask(task, n): ...
This creates an iterator that over the sequence
task[0], task[1], ..., task[n-1]
.- Parameters:
- taskTask(let)
- ninteger
- Returns:
- iterator
- jug.set_jugdir(jugdir)¶
Sets the jugdir. This is the programmatic equivalent of passing
--jugdir=...
on the command line.- Parameters:
- jugdirstr
- Returns:
- storea jug backend
- jug.value(obj)¶
Loads a task object recursively. This correcly handles lists, dictonaries and eny other type handled by the tasks themselves.
- Parameters:
- objobject
Anything that can be pickled or a Task
- Returns:
- valueobject
The result of the task
obj
Task: contains the Task class.
This is the main class for using jug.
There are two main alternatives:
Use the
Task
class directly to build up tasks, such asTask(function, arg0, ...)
.Rely on the
TaskGenerator
decorator as a shortcut for this.
- class jug.task.Task(f, dep0, dep1, ..., kw_arg0=kw_val0, kw_arg1=kw_val1, ...)¶
Defines a task, which will call:
f(dep0, dep1,..., kw_arg0=kw_val0, kw_arg1=kw_val1, ...)
See also
TaskGenerator
function
- Attributes:
result
Result value
- store
Methods
can_load
()Returns whether result is available.
can_run
()Returns true if all the dependencies have their results available.
for dep in task.dependencies():
fail
()Marks the task as failed
hash
()Returns the hash for this task.
Equivalent to
t.store.remove(t.hash())
.is_failed = t.is_failed()
Returns True if the task is already loaded
Note that only calling lock() and checking the result atomically checks for the lock().
load
()Loads the results from the storage backend.
lock
()Tries to lock the task for the current process.
run
([force, save, debug_mode])Performs the task.
unload
()Unload results (can be useful for saving memory).
Equivalent to.
unlock
()Releases the lock.
value
- can_load()¶
Returns whether result is available.
- can_run()¶
Returns true if all the dependencies have their results available.
- dependencies()¶
- for dep in task.dependencies():
…
Iterates over all the first-level dependencies of task t
- Parameters:
- selfTask
- Returns:
- depsgenerator
A generator over all of self’s dependencies
See also
recursive_dependencies
retrieve dependencies recursively
- fail()¶
Marks the task as failed
If the lock was not held, and exception will be raised
- hash()¶
Returns the hash for this task.
The results are cached, so the first call can be much slower than subsequent calls.
- invalidate()¶
Equivalent to
t.store.remove(t.hash())
. Useful for interactive use (i.e., injug shell
mode).
- is_failed()¶
is_failed = t.is_failed()
- Returns:
- is_failedboolean
Whether the task is in failed state.
- is_loaded()¶
Returns True if the task is already loaded
- is_locked()¶
Note that only calling lock() and checking the result atomically checks for the lock(). This function can be much faster, though, and, therefore is sometimes useful.
- Returns:
- is_lockedboolean
Whether the task appears to be locked.
- load()¶
Loads the results from the storage backend.
This function always loads from the backend even if the task is already loaded. You can use is_loaded as a check if you want to avoid this behaviour.
- Returns:
- Nothing
- lock()¶
Tries to lock the task for the current process.
Returns True if the lock was acquired. The correct usage pattern is:
locked = task.lock() if locked: task.run() else: # someone else is already running this task!
Not that using can_lock() can lead to race conditions. The above is the only fully correct method.
- Returns:
- lockedboolean
Whether the lock was obtained.
- property result¶
Result value
- run(force=False, save=True, debug_mode=False)¶
Performs the task.
- Parameters:
- forceboolean, optional
if true, always run the task (even if it ran before) (default: False)
- saveboolean, optional
if true, save the result to the store (default: True)
- debug_modeboolean, optional
whether to run in debug mode (adds extra checks)
- Returns:
- valreturn value from Task
- unload()¶
Unload results (can be useful for saving memory).
- unload_recursive()¶
Equivalent to:
for tt in recursive_dependencies(t): tt.unload()
- unlock()¶
Releases the lock.
If the lock was not held, this may remove another thread’s lock!
- class jug.task.TaskGenerator(f)¶
@TaskGenerator def f(arg0, arg1, …)
…
Turns f from a function into a task generator.
This means that calling
f(arg0, arg1)
results in:Task(f, arg0, arg1)
. This can make your jug-based code feel very similar to what you do with traditional Python.Methods
__call__
(*args, **kwargs)Call self as a function.
- class jug.task.Tasklet(base, f)¶
A Tasklet is a light-weight Task.
It looks like a Task, behaves like a Task, but its results are not saved in the backend.
It is useful for very simple functions and is automatically generated on subscripting a Task object:
t = Task(f, 1) tlet = t[0]
tlet
will be aTasklet
See also
- Attributes:
- base
- f
Methods
can_load
dependencies
unload
unload_recursive
value
- class jug.task.iteratetask(base, n)¶
Examples:
a,b = iteratetask(task, 2) for a in iteratetask(task, n): ...
This creates an iterator that over the sequence
task[0], task[1], ..., task[n-1]
.- Parameters:
- taskTask(let)
- ninteger
- Returns:
- iterator
- jug.task.recursive_dependencies(t, max_level=-1)¶
- for dep in recursive_dependencies(t, max_level=-1):
…
Returns a generator that lists all recursive dependencies of task
- Parameters:
- tTask
input task
- max_levelinteger, optional
Maximum recursion depth. Set to -1 or None for no recursion limit.
- Returns:
- depsgenerator
A generator over all dependencies
- jug.task.value(obj)¶
Loads a task object recursively. This correcly handles lists, dictonaries and eny other type handled by the tasks themselves.
- Parameters:
- objobject
Anything that can be pickled or a Task
- Returns:
- valueobject
The result of the task
obj
mapreduce: Build tasks that follow a map-reduce pattern.
- jug.mapreduce.map(mapper, sequence, map_step=4)¶
sequence’ = map(mapper, sequence, map_step=4)
Roughly equivalent to:
sequence' = [Task(mapper,s) for s in sequence]
except that the tasks are grouped in groups of map_step
- Parameters:
- mapperfunction
function from A -> B
- sequencelist of A
- map_stepinteger, optional
nr of elements to process per task. This should be set so that each task takes the right amount of time.
- Returns:
- sequence’list of B
sequence’[i] = mapper(sequence[i])
See also
mapreduce
currymap
function Curried version of this function
- jug.mapreduce.mapreduce(reducer, mapper, inputs, map_step=4, reduce_step=8)¶
Create a task that does roughly the following:
reduce(reducer, map(mapper, inputs))
Roughly because the order of operations might be different. In particular, reducer should be a true reducer functions (i.e., commutative and associative).
- Parameters:
- reducerassociative, commutative function
- This should map
Y_0,Y_1 -> Y’
- mapperfunction from X -> Y
- inputslist of X
- map_stepinteger, optional
Number of mapping operations to do in one go. This is what defines an inner task. (default: 4)
- reduce_stepinteger, optional
Number of reduce operations to do in one go. (default: 8)
- Returns:
- taskjug.Task object
- jug.mapreduce.reduce(reducer, inputs, reduce_step=8)¶
task = reduce(reducer, inputs, reduce_step=8)
- Parameters:
- reducerassociative, commutative function
- This should map
Y_0,Y_1 -> Y’
- inputslist of X
- reduce_stepinteger, optional
Number of reduce operations to do in one go. (default: 8)
- Returns:
- taskjug.Task object
See also
- jug.compound.CompoundTask(f, *args, **kwargs)¶
f should be such that it returns a Task, which can depend on other Tasks (even recursively).
If this cannot been loaded (i.e., has not yet been run), then this becomes equivalent to:
f(*args, **kwargs)
However, if it can, then we get a pseudo-task which returns the same value without f ever being executed.
- Parameters:
- ffunction returning a
jug.Task
- ffunction returning a
- Returns:
- taskjug.Task
- jug.compound.CompoundTaskGenerator(f)¶
@CompoundTaskGenerator def f(arg0, arg1, …)
…
Turns f from a function into a compound task generator.
This means that calling
f(arg0, arg1)
results in:CompoundTask(f, arg0, arg1)
See also
TaskGenerator
- jug.compound.compound_task_execute(x, h)¶
This is an internal function. Do not use directly.
- class jug.utils.CustomHash(obj, hash_function)¶
Set a custom hash function
This is an advanced feature and you can shoot yourself in the foot with it. Make sure you know what you are doing. In particular, hash_function should be a strong hash:
hash_function(obj0) == hash_function(obj1)
is taken to imply thatobj0 == obj1
. The hash function should return abytes
object.You can use the helpers in the
jug.hash
module (in particularhash_one
) to help you. The implementation oftimed_path
is a good example of how to use a CustomHash:def hash_with_mtime_size(path): from .hash import hash_one st = os.stat_result(os.stat(path)) mtime = st.st_mtime size = st.st_size return hash_one((path, mtime, size)) def timed_path(path): return CustomHash(path, hash_with_mtime_size)
The
path
object (a string or bytes) is wrapped with a hashing function which checks the file value.- Parameters:
- objany object
- hash_functionfunction
This should take your object and return a str
- jug.utils.cached_glob(pat)¶
A short-hand for
from jug import CachedFunction from glob import glob CachedFunction(glob, pattern)
with the extra bonus that results are returns sorted
- Parameters:
- pat: Same as glob.glob
- Returns:
- fileslist of str
- jug.utils.identity(x)¶
identity implements the identity function as a Task (i.e.,
value(identity(x)) == x
)This seems pointless, but if
x
is, for example, a very large list, then using this function might speed up some computations. Consider:large = list(range(100000)) large = jug.utils.identity(large) for i in range(100): Task(process, large, i)
This way the list
large
is going to get hashed just once. Without the call tojug.utils.identity
, it would get hashed at each loop iteration.https://jug.readthedocs.io/en/latest/utilities.html#identity
- Parameters:
- xany object
- Returns:
- xx
- jug.utils.sync_move(src, dst)¶
Sync the file and move it
This ensures that the move is truly atomic
- Parameters:
- srcfilename
Source file
- dst: filename
Destination file
- jug.utils.timed_path(path)¶
Returns an object that returns path when passed to a jug Task with the exception that it uses the paths mtime (modification time) and the file size in the hash. Thus, if the file is touched or changes size, this triggers an invalidation of the results (which propagates to all dependent tasks).
- Parameters:
- ipathstr
A filesystem path
- Returns:
- opathstr
A task equivalent to
(lambda: ipath)
.
- jug.hooks.exit_checks.exit_after_n_tasks(n)¶
Exit after a specific number of tasks have been executed
- Parameters:
- nint
Number of tasks to execute
- jug.hooks.exit_checks.exit_after_time(hours=0, minutes=0, seconds=0)¶
Exit after a specific number of tasks have been executed
Note that this only checks the time after each task has finished executing. Thus if you are using this to limit the amount of time each process takes, make sure to specify a lower limit than what is needed.
- Parameters:
- hoursnumber, optional
- minutesnumber, optional
- secondsnumber, optional
- jug.hooks.exit_checks.exit_env_vars(environ={os.environ})¶
Set exit markers based on the environment.
The following variables are used if they are set (if they are not set, they are ignored).
JUG_MAX_TASKS
: Maximum nr. of tasks.JUG_MAX_HOURS
: Maximum hoursJUG_MAX_MINUTES
: Maximum minutesJUG_MAX_SECONDS
: Maximum secondsFor the time based limits, see the comment on exit_after_time on how these limits are not strict as they are only checked after each task completion event.
If either of the variables above is set, its value should be an int or an error will be raised.
JUG_EXIT_IF_FILE_EXISTS: Set exit file name
- jug.hooks.exit_checks.exit_if_file_exists(fname)¶
Before each task execute, check if file exists. If so, exit.
Note that a check is only performed before a Task is execute. Thus, jug will not exit immediately if it is executing another long-running task.
- Parameters:
- fnamestr
path to check
- jug.hooks.exit_checks.exit_when_true(f, function_takes_Task=False)¶
Generic exit check.
After each task, call function
f
and exit if it return true.- Parameters:
- ffunction
Function to call
- function_takes_Taskboolean, optional
Whether to call the function with the task just executed (default: False)
Jug.IO module
write_task_out: write out results, possibly with metadata.
- class jug.io.NoLoad(base)¶
NoLoad can be used to decorate a Task result such that when it is passed to another Task, then it is passed directly (instead of passing the result).
This is for advanced usage.
- Attributes:
- base
- f
- t
Methods
can_load
dependencies
unload
unload_recursive
value
- jug.io.print_task_summary_table(options, groups)¶
Print task summary table given tasks groups.
groups - [(group_title, {(task_name, count)})] grouped summary of tasks.
- jug.io.write_metadata(result, metadata_fname, metadata_format='yaml')¶
Write out the metadata on a Task out.
- Parameters:
- result: a Task object
- metadata_fnamestr
metadata will be written to this file.
- metadata_formatstr, optional
What format to write data in. Currently, ‘yaml’ & ‘json’ are supported.
- jug.io.write_task_out(result, oname, metadata_fname=None, metadata_format='yaml')¶
Write out the results of a Task to file, possibly including metadata.
If
metadata_fname
is not None, it should be the name of a file to which to write metadata on the computation.- Parameters:
- result: a Task object
- onamestr
The target output filename
- metadata_fnamestr, optional
If not None, metadata will be written to this file.
- metadata_formatstr, optional
What format to write data in. Currently, ‘yaml’ & ‘json’ are supported.