Utilities¶
The module jug.utils
has a few functions which are meant to be used in
writing jugfiles.
Identity¶
This is simply implemented as:
@TaskGenerator
def identity(x):
return x
This might seem like the most pointless function, but it can be helpful in speeding things up. Consider the following case:
from glob import glob
def load(fname):
return open(fname).readlines()
@TaskGenerator
def process(inputs, parameter):
...
inputs = []
for f in glob('*.data'):
inputs.extend(load(f))
# inputs is a large list
results = {}
for p in range(1000):
results[p] = process(inputs, p)
How is this processed? Every time process
is called, a new jug.Task
is
generated. This task has two arguments: inputs
and an integer. When the hash
of the task is computed, both its arguments are analysed. inputs
is a large
list of strings. Therefore, it is going to take a very long time to process all
of the hashes.
Consider the variation:
from jug.utils import identity
# ...
# same as above
inputs = identity(inputs)
results = {}
for p in range(1000):
results[p] = process(inputs, p)
Now, the long list is only hashed once! It is transformed into a Task
(we
reuse the name inputs
to keep things clear) and each process
call can
now compute its hash very fast.
Using identity
to induce dependencies¶
identity
can also be used to introduce dependencies. One can define a
helper function:
def value_after(val, token):
from jug.utils import identity
return identity( [val, token] )[0]
Now, this function, will always return its first argument, but will only run once its second argument is available. Here is a typical use case:
Function
process
takes an output file nameFunction
postprocess
takes as input the output filename ofprocess
Now, you want to run process
and then postprocess
, but since
communication is done with files, Jug does not see that these functions depend
on each other. value_after
is the solution:
token = process(input, ofile='output.txt')
postprocess(value_after('output.txt', token))
This works independently of whatever process
returns (even if it is
None
).
jug_execute¶
This is a simple wrapper around subprocess.call()
. It adds two important
pieces of functionality:
it checks the exit code and raises an exception if not zero (this can be disabled by passing
check_exit=False
).It takes an argument called
run_after
which is ignored but can be used to declare dependencies between tasks. Thus, it can be used to ensure that a specific process only runs after something else has run:from jug.utils import jug_execute from jug import TaskGenerator @TaskGenerator def my_computation(input, ouput_filename): ... token = my_computation(input, 'output.txt') # We want to run gzip, but **only after** `my_computation` has run: jug_execute(['gzip', 'output.txt'], run_after=token)
cached_glob¶
cached_glob
is a simple utility to perform the following common operation:
from glob import glob
from jug import CachedFunction
files = CachedFunction(glob, pattern)
files.sort()
Where pattern
is a glob pattern can be simply written as:
from jug.utils import cached_glob
files = cached_glob(pattern)
- class jug.utils.CustomHash(obj, hash_function)¶
Set a custom hash function
This is an advanced feature and you can shoot yourself in the foot with it. Make sure you know what you are doing. In particular, hash_function should be a strong hash:
hash_function(obj0) == hash_function(obj1)
is taken to imply thatobj0 == obj1
. The hash function should return abytes
object.You can use the helpers in the
jug.hash
module (in particularhash_one
) to help you. The implementation oftimed_path
is a good example of how to use a CustomHash:def hash_with_mtime_size(path): from .hash import hash_one st = os.stat_result(os.stat(path)) mtime = st.st_mtime size = st.st_size return hash_one((path, mtime, size)) def timed_path(path): return CustomHash(path, hash_with_mtime_size)
The
path
object (a string or bytes) is wrapped with a hashing function which checks the file value.- Parameters:
- objany object
- hash_functionfunction
This should take your object and return a str
- jug.utils.cached_glob(pat)¶
A short-hand for
from jug import CachedFunction from glob import glob CachedFunction(glob, pattern)
with the extra bonus that results are returns sorted
- Parameters:
- pat: Same as glob.glob
- Returns:
- fileslist of str
- jug.utils.identity(x)¶
identity implements the identity function as a Task (i.e.,
value(identity(x)) == x
)This seems pointless, but if
x
is, for example, a very large list, then using this function might speed up some computations. Consider:large = list(range(100000)) large = jug.utils.identity(large) for i in range(100): Task(process, large, i)
This way the list
large
is going to get hashed just once. Without the call tojug.utils.identity
, it would get hashed at each loop iteration.https://jug.readthedocs.io/en/latest/utilities.html#identity
- Parameters:
- xany object
- Returns:
- xx
- jug.utils.sync_move(src, dst)¶
Sync the file and move it
This ensures that the move is truly atomic
- Parameters:
- srcfilename
Source file
- dst: filename
Destination file
- jug.utils.timed_path(path)¶
Returns an object that returns path when passed to a jug Task with the exception that it uses the paths mtime (modification time) and the file size in the hash. Thus, if the file is touched or changes size, this triggers an invalidation of the results (which propagates to all dependent tasks).
- Parameters:
- ipathstr
A filesystem path
- Returns:
- opathstr
A task equivalent to
(lambda: ipath)
.