Utilities

The module jug.utils has a few functions which are meant to be used in writing jugfiles.

Identity

This is simply implemented as:

@TaskGenerator
def identity(x):
    return x

This might seem like the most pointless function, but it can be helpful in speeding things up. Consider the following case:

from glob import glob

def load(fname):
   return open(fname).readlines()

@TaskGenerator
def process(inputs, parameter):
    ...

inputs = []
for f in glob('*.data'):
    inputs.extend(load(f))
# inputs is a large list

results = {}
for p in range(1000):
    results[p] = process(inputs, p)

How is this processed? Every time process is called, a new jug.Task is generated. This task has two arguments: inputs and an integer. When the hash of the task is computed, both its arguments are analysed. inputs is a large list of strings. Therefore, it is going to take a very long time to process all of the hashes.

Consider the variation:

from jug.utils import identity

# ...
# same as above

inputs = identity(inputs)
results = {}
for p in range(1000):
    results[p] = process(inputs, p)

Now, the long list is only hashed once! It is transformed into a Task (we reuse the name inputs to keep things clear) and each process call can now compute its hash very fast.

Using identity to induce dependencies

identity can also be used to introduce dependencies. One can define a helper function:

def value_after(val, token):
    from jug.utils import identity
    return identity( [val, token] )[0]

Now, this function, will always return its first argument, but will only run once its second argument is available. Here is a typical use case:

  1. Function process takes an output file name

  2. Function postprocess takes as input the output filename of process

Now, you want to run process and then postprocess, but since communication is done with files, Jug does not see that these functions depend on each other. value_after is the solution:

token = process(input, ofile='output.txt')
postprocess(value_after('output.txt', token))

This works independently of whatever process returns (even if it is None).

jug_execute

This is a simple wrapper around subprocess.call(). It adds two important pieces of functionality:

  1. it checks the exit code and raises an exception if not zero (this can be disabled by passing check_exit=False).

  2. It takes an argument called run_after which is ignored but can be used to declare dependencies between tasks. Thus, it can be used to ensure that a specific process only runs after something else has run:

    from jug.utils import jug_execute
    from jug import TaskGenerator
    
    @TaskGenerator
    def my_computation(input, ouput_filename):
        ...
    
    token = my_computation(input, 'output.txt')
    # We want to run gzip, but **only after** `my_computation` has run:
    jug_execute(['gzip', 'output.txt'], run_after=token)
    

cached_glob

cached_glob is a simple utility to perform the following common operation:

from glob import glob
from jug import CachedFunction
files = CachedFunction(glob, pattern)
files.sort()

Where pattern is a glob pattern can be simply written as:

from jug.utils import cached_glob
files = cached_glob(pattern)
class jug.utils.CustomHash(obj, hash_function)

Set a custom hash function

This is an advanced feature and you can shoot yourself in the foot with it. Make sure you know what you are doing. In particular, hash_function should be a strong hash: hash_function(obj0) == hash_function(obj1) is taken to imply that obj0 == obj1. The hash function should return a bytes object.

You can use the helpers in the jug.hash module (in particular hash_one) to help you. The implementation of timed_path is a good example of how to use a CustomHash:

def hash_with_mtime_size(path):
    from .hash import hash_one
    st = os.stat_result(os.stat(path))
    mtime = st.st_mtime
    size = st.st_size
    return hash_one((path, mtime, size))

def timed_path(path):
    return CustomHash(path, hash_with_mtime_size)

The path object (a string or bytes) is wrapped with a hashing function which checks the file value.

Parameters:
objany object
hash_functionfunction

This should take your object and return a str

jug.utils.cached_glob(pat)

A short-hand for

from jug import CachedFunction from glob import glob CachedFunction(glob, pattern)

with the extra bonus that results are returns sorted

Parameters:
pat: Same as glob.glob
Returns:
fileslist of str
jug.utils.identity(x)

identity implements the identity function as a Task (i.e., value(identity(x)) == x)

This seems pointless, but if x is, for example, a very large list, then using this function might speed up some computations. Consider:

large = list(range(100000))
large = jug.utils.identity(large)
for i in range(100):
    Task(process, large, i)

This way the list large is going to get hashed just once. Without the call to jug.utils.identity, it would get hashed at each loop iteration.

https://jug.readthedocs.io/en/latest/utilities.html#identity

Parameters:
xany object
Returns:
xx
jug.utils.sync_move(src, dst)

Sync the file and move it

This ensures that the move is truly atomic

Parameters:
srcfilename

Source file

dst: filename

Destination file

jug.utils.timed_path(path)

Returns an object that returns path when passed to a jug Task with the exception that it uses the paths mtime (modification time) and the file size in the hash. Thus, if the file is touched or changes size, this triggers an invalidation of the results (which propagates to all dependent tasks).

Parameters:
ipathstr

A filesystem path

Returns:
opathstr

A task equivalent to (lambda: ipath).