Utilities¶
The module jug.utils has a few functions which are meant to be used in
writing jugfiles.
Identity¶
This is simply implemented as:
@TaskGenerator
def identity(x):
return x
This might seem like the most pointless function, but it can be helpful in speeding things up. Consider the following case:
from glob import glob
def load(fname):
return open(fname).readlines()
@TaskGenerator
def process(inputs, parameter):
...
inputs = []
for f in glob('*.data'):
inputs.extend(load(f))
# inputs is a large list
results = {}
for p in range(1000):
results[p] = process(inputs, p)
How is this processed? Every time process is called, a new jug.Task is
generated. This task has two arguments: inputs and an integer. When the hash
of the task is computed, both its arguments are analysed. inputs is a large
list of strings. Therefore, it is going to take a very long time to process all
of the hashes.
Consider the variation:
from jug.utils import identity
# ...
# same as above
inputs = identity(inputs)
results = {}
for p in range(1000):
results[p] = process(inputs, p)
Now, the long list is only hashed once! It is transformed into a Task (we
reuse the name inputs to keep things clear) and each process call can
now compute its hash very fast.
Using identity to induce dependencies¶
identity can also be used to introduce dependencies. One can define a
helper function:
def value_after(val, token):
from jug.utils import identity
return identity( [val, token] )[0]
Now, this function, will always return its first argument, but will only run once its second argument is available. Here is a typical use case:
Function
processtakes an output file nameFunction
postprocesstakes as input the output filename ofprocess
Now, you want to run process and then postprocess, but since
communication is done with files, Jug does not see that these functions depend
on each other. value_after is the solution:
token = process(input, ofile='output.txt')
postprocess(value_after('output.txt', token))
This works independently of whatever process returns (even if it is
None).
jug_execute¶
This is a simple wrapper around subprocess.call(). It adds two important
pieces of functionality:
it checks the exit code and raises an exception if not zero (this can be disabled by passing
check_exit=False).It takes an argument called
run_afterwhich is ignored but can be used to declare dependencies between tasks. Thus, it can be used to ensure that a specific process only runs after something else has run:from jug.utils import jug_execute from jug import TaskGenerator @TaskGenerator def my_computation(input, ouput_filename): ... token = my_computation(input, 'output.txt') # We want to run gzip, but **only after** `my_computation` has run: jug_execute(['gzip', 'output.txt'], run_after=token)
cached_glob¶
cached_glob is a simple utility to perform the following common operation:
from glob import glob
from jug import CachedFunction
files = CachedFunction(glob, pattern)
files.sort()
Where pattern is a glob pattern can be simply written as:
from jug.utils import cached_glob
files = cached_glob(pattern)
NoHash¶
This is imported from the jug.unsafe module as it bypasses the hashing
mechanism and can lead to incorrect results if used incorrectly.
This marks certain arguments to Tasks as not being part of the hash that defines the results. It can be useful for arguments that do not change the results, but nonetheless need to be passsed to functions.
Example usage:
jug_execute(['SemiBin2', 'single_easy_bin',
'--cpus', NoHash('8'),
'-i', 'input/contigs.fna.gz',
'--bam', 'input/mapped.bam',
'--output', 'output'
])
- class jug.utils.CustomHash(obj, hash_function)¶
Set a custom hash function
This is an advanced feature and you can shoot yourself in the foot with it. Make sure you know what you are doing. In particular, hash_function should be a strong hash:
hash_function(obj0) == hash_function(obj1)is taken to imply thatobj0 == obj1. The hash function should return abytesobject.You can use the helpers in the
jug.hashmodule (in particularhash_one) to help you. The implementation oftimed_pathis a good example of how to use a CustomHash:def hash_with_mtime_size(path): from .hash import hash_one st = os.stat_result(os.stat(path)) mtime = st.st_mtime size = st.st_size return hash_one((path, mtime, size)) def timed_path(path): return CustomHash(path, hash_with_mtime_size)
The
pathobject (a string or bytes) is wrapped with a hashing function which checks the file value.- Parameters:
- objany object
- hash_functionfunction
This should take your object and return a str
- jug.utils.cached_glob(pat)¶
A short-hand for
from jug import CachedFunction from glob import glob CachedFunction(glob, pattern)
with the extra bonus that results are returns sorted
- Parameters:
- pat: Same as glob.glob
- Returns:
- fileslist of str
- jug.utils.identity(x)¶
identity implements the identity function as a Task (i.e.,
value(identity(x)) == x)This seems pointless, but if
xis, for example, a very large list, then using this function might speed up some computations. Consider:large = list(range(100000)) large = jug.utils.identity(large) for i in range(100): Task(process, large, i)
This way the list
largeis going to get hashed just once. Without the call tojug.utils.identity, it would get hashed at each loop iteration.https://jug.readthedocs.io/en/latest/utilities.html#identity
- Parameters:
- xany object
- Returns:
- xx
- jug.utils.sync_move(src, dst)¶
Sync the file and move it
This ensures that the move is truly atomic
- Parameters:
- srcfilename
Source file
- dst: filename
Destination file
- jug.utils.timed_path(path)¶
Returns an object that returns path when passed to a jug Task with the exception that it uses the paths mtime (modification time) and the file size in the hash. Thus, if the file is touched or changes size, this triggers an invalidation of the results (which propagates to all dependent tasks).
- Parameters:
- ipathstr
A filesystem path
- Returns:
- opathstr
A task equivalent to
(lambda: ipath).