Tar and Transfer with Globus Compute
These examples demonstrate how to build flows that combine Globus Compute and Globus Transfer to process and move data.
Each of these examples creates an archive file from the user’s files and transfers that archive to a destination. In one case the source data is already on the server running Globus Connect Server and Globus Compute, and in the other it is on a source collection owned by the end user.
Prerequisites
To run these examples, you must have a properly configured server and some local software installed.
You must have a co-located Globus Connect Server Collection and Globus Compute endpoint, either hosted on the same server or at least with access to a shared filesystem.
- Globus Connect Server Collection
-
You can follow this guide for setting up a Globus Connect Server Collection to install Globus Connect Server and configure a collection.
For ease of use, we recommend using a Guest Collection.
- Globus Compute Endpoint
-
This guide for setting up a Globus Compute Endpoint covers installation of the Globus Compute software.
This Compute endpoint must have read/write permissions on the same storage location where the Globus Connect Server ollection is hosted.
- Globus CLI
-
You will also need the Globus CLI installed (CLI installation docs).
Globus CLI documentation recommends installation with
pipx
, as inpipx install globus-cli
. - Globus Compute SDK
-
You must have the
globus-compute-sdk
Python package available. We strongly recommend using a virtual environment for this installation; installing withpip install globus-compute-sdk
.You can follow the Globus Compute install documentation to install the Compute SDK client package in a virtualenv.
Register the do_tar
Compute Function
For users executing these examples, the next step is to register the do_tar
Compute function.
This is the Compute function which the example flows will invoke to create archives.
You can run the provided python script from the environment with globus-compute-sdk
installed:
python ./register_compute_function.py
and save the Compute function's UUID.
Make sure to note the function's UUID, it will be needed later!
#!/usr/bin/env python
def do_tar(
src_paths: list[str],
dest_path: str,
gcs_base_path: str = "/",
) -> str:
"""
Create a tar.gz archive from source files or directories and save it to the
given destination.
Parameters:
src_paths (list[str]): Source paths of files or directories to be archived.
dest_path (str): Destination path where the tar.gz archive will be
written. This can be either an existing directory or a
file path (with the parent directory existing).
gcs_base_path (str): The shared GCS collection's configured base path.
Default is "/".
Returns:
str: The output tar archive file path.
Raises:
ValueError: If src_paths is empty, dest_path is None, any provided path
does not begin with the expected prefix, or if any source
path or destination (or its parent) is invalid.
RuntimeError: If an error occurs during the creation of the tar archive.
Example:
>>> output = do_tar(
... src_paths=["/file1.txt", "/dir1/file2.txt"],
... dest_path="/tar_output",
... gcs_base_path="/path/to/root/"
... )
>>> print(output)
/tar_output/7f9c3f9a-2d75-4d2f-8b0a-0f0d7e6b1e3a.tar.gz
"""
import tarfile
import uuid
from pathlib import Path
def transform_path_to_absolute(path: str) -> str:
"""Transform a GCS-style path to an absolute filesystem path."""
if not path.startswith("/"):
raise ValueError(
f"Path '{path}' does not start with the expected prefix '/'."
)
return path.replace("/", gcs_base_path, 1)
def transform_path_from_absolute(path: str) -> str:
"""Transform an absolute filesystem path back to a GCS-style path."""
if not path.startswith(gcs_base_path):
raise ValueError(
f"Path '{path}' does not start with "
f"the expected prefix '{gcs_base_path}'."
)
return path.replace(gcs_base_path, "/", 1)
# Validate src_paths and dest_path
if not src_paths:
raise ValueError("src_paths must not be empty.")
if dest_path is None:
raise ValueError("dest_path must not be None.")
# Transform destination path
transformed_dest_path = Path(transform_path_to_absolute(dest_path))
# Transform and validate all source paths
transformed_src_paths = []
for src_path in src_paths:
transformed_src_path = Path(transform_path_to_absolute(src_path))
if not transformed_src_path.exists():
raise ValueError(f"Source path '{src_path}' does not exist.")
transformed_src_paths.append(transformed_src_path)
# Validate transformed_dest_path
if transformed_dest_path.exists():
if not (transformed_dest_path.is_dir() or transformed_dest_path.is_file()):
raise ValueError(
f"Destination path '{dest_path}' is neither a directory nor a file."
)
else:
if not transformed_dest_path.parent.exists():
raise ValueError(
f"Parent directory of destination path '{dest_path}' does not exist."
)
# Determine the final tar file path.
if transformed_dest_path.exists() and transformed_dest_path.is_dir():
# If destination is an existing directory, generate a unique tar file name.
tar_file_name = f"{uuid.uuid4()}.tar.gz"
transformed_dest_tar_path = transformed_dest_path / tar_file_name
else:
# Destination is treated as a file path.
fn = transformed_dest_path.name
if fn.endswith(".gz"):
transformed_dest_tar_path = transformed_dest_path
elif fn.endswith(".tar"):
transformed_dest_tar_path = transformed_dest_path.with_name(fn + ".gz")
else:
transformed_dest_tar_path = transformed_dest_path.with_name(fn + ".tar.gz")
# Informative message (could be replaced with logging.info in production code)
print(
f"Creating tar file at {transformed_dest_tar_path.absolute()} "
f"with {len(transformed_src_paths)} source(s)"
)
# Create the tar.gz archive with exception handling.
try:
with tarfile.open(transformed_dest_tar_path, "w:gz") as tar:
for src_path in transformed_src_paths:
tar.add(src_path, arcname=src_path.name)
except Exception as e:
# Attempt to remove any incomplete tar file.
if transformed_dest_tar_path.exists():
try:
transformed_dest_tar_path.unlink()
except Exception as unlink_err:
print(
f"Warning: Failed to remove incomplete tar file "
f"'{transformed_dest_tar_path}': {unlink_err}"
)
raise RuntimeError(f"Failed to create tar archive: {e}") from e
# Transform the output path back to a GCS-style path and return.
result_path = transform_path_from_absolute(
str(transformed_dest_tar_path.absolute())
)
return result_path
if __name__ == "__main__":
from globus_compute_sdk import Client
gcc = Client()
do_tar_fuid = gcc.register_function(do_tar)
print(f"Tar func UUID is {do_tar_fuid}")
For the best results, use the same Python version for registration as the one running on your Globus Compute Endpoint.
This ensures that you always get the most consistent behavior between the two environments.
do_tar
Behaviors
do_tar
is a function which compiles multiple files into a tar
archive.
It accepts source paths (multiple files) and a destination where the new tar
file should be created.
It also is aware that it is meant to operate in the same location as a Globus Connect Server collection.
Parameter | Description |
---|---|
|
List of paths to the files/directories to be archived. |
|
Where to write the tar archive (directory or file path). |
|
The co-located Globus Connect Server collection's configured base path. (default: |
Collection Base Paths
The gcs_base_path
parameter is provided to the Compute function to allow it to transform the user input paths to absolute paths.
If
the collection has a configured base_path
,
this option is needed in order to handle the path rewrites this entails.
For example,
-
The collection has configured its base path to
/path/to/root/
. -
A user wants to tar the files at the absolute path
/path/to/root/input_files/
. -
This path therefore appears as
/input_files/
on the collection. -
The
do_tar
function will access the same storage filesystem, and will need to know that/input_files/
maps to/path/to/root/input_files/
.
In this scenario, gcs_base_path
would be set to /path/to/root/
, allowing the function to match the behavior of the collection on the same filesystem.
Example Flow 1
In this first example, the Compute and Transfer flow takes a user-provided list of files that already exist in a preconfigured source collection.
The flow creates a tarfile from those files and transfers the tarfile to a user-provided destination collection.
The flow will:
-
Set constants for the run
-
Create an output directory named after the run's ID on the source collection
-
Invoke the
do_tar
function to create a tar archive from the input source files and save it in the output directory -
Transfer the resulting tarfile to the destination collection provided in the flow input
-
Delete the output directory on the source collection
Create the Flow
-
Edit
compute_transfer_example_1_definition.json
and replace the placeholder values:-
gcs_endpoint_id
: The source collection ID -
compute_endpoint_id
: The Compute endpoint ID -
compute_function_id
: The UUID of the registereddo_tar
function
-
If the collection has a configured base path, also edit gcs_base_path
.
-
Create the flow:
globus flows create "Compute and Transfer Flow Example 1" \ ./compute_transfer_example_1_definition.json \ --input-schema ./compute_transfer_example_1_schema.json
-
Save the flow ID returned by this command
{
"StartAt": "SetConstants",
"States": {
"SetConstants": {
"Type": "ExpressionEval",
"Parameters": {
"gcs_endpoint_id": "<INSERT YOUR GCS ENDPOINT ID HERE>",
"gcs_base_path": "/",
"compute_endpoint_id": "<INSERT YOUR COMPUTE ENDPOINT ID HERE>",
"compute_function_id": "<INSERT YOUR COMPUTE FUNCTION ID HERE>",
"compute_output_directory.=": "'/' + _context.run_id + '/'"
},
"ResultPath": "$.constants",
"Next": "MakeComputeWorkingDir"
},
"MakeComputeWorkingDir": {
"Type": "Action",
"ActionUrl": "https://transfer.actions.globus.org/mkdir",
"Parameters": {
"endpoint_id.$": "$.constants.gcs_endpoint_id",
"path.$": "$.constants.compute_output_directory"
},
"ResultPath": "$.mkdir_result",
"Next": "RunComputeFunction"
},
"RunComputeFunction": {
"Type": "Action",
"ActionUrl": "https://compute.actions.globus.org/v3",
"Parameters": {
"endpoint_id.$": "$.constants.compute_endpoint_id",
"tasks": [
{
"function_id.$": "$.constants.compute_function_id",
"args": [],
"kwargs": {
"src_paths.$": "$.source_paths",
"dest_path.$": "$.constants.compute_output_directory",
"gcs_base_path.$": "$.constants.gcs_base_path"
}
}
]
},
"ResultPath": "$.compute_result",
"Next": "TransferFromComputeEndpoint"
},
"TransferFromComputeEndpoint": {
"Type": "Action",
"ActionUrl": "https://transfer.actions.globus.org/transfer",
"Parameters": {
"source_endpoint.$": "$.constants.gcs_endpoint_id",
"destination_endpoint.$": "$.destination_endpoint_id",
"DATA": [
{
"source_path.=": "compute_result.details.result[0]",
"destination_path.$": "$.destination_path"
}
]
},
"ResultPath": "$.transfer_to_dest_result",
"Next": "CleanupComputeEndpoint"
},
"CleanupComputeEndpoint": {
"Type": "Action",
"ActionUrl": "https://transfer.actions.globus.org/delete",
"Parameters": {
"endpoint.$": "$.constants.gcs_endpoint_id",
"recursive": true,
"DATA": [
{
"path.$": "$.constants.compute_output_directory"
}
]
},
"ResultPath": "$.delete_compute_output_result",
"End": true
}
}
}
{
"type": "object",
"required": [
"source_paths",
"destination_path",
"destination_endpoint_id"
],
"properties": {
"source_paths": {
"type": "array",
"title": "Source Collection Paths",
"description": "A list of paths on the source collection for the data"
},
"destination_path": {
"type": "string",
"title": "Destination Collection Path",
"description": "The path on the destination collection for the data"
},
"destination_endpoint_id": {
"type": "string",
"title": "Destination Collection Endpoint ID",
"description": "The endpoint id of destination collection"
}
},
"additionalProperties": false
}
Run the Flow
-
Create the flow input JSON file:
{ "source_paths": ["/path/to/file1", "/path/to/file2"], "destination_path": "/path/to/your/destination/file.tar.gz", "destination_endpoint_id": "your-destination-endpoint-uuid" }
-
Start the flow:
globus flows start "$FLOW_ID" \ --input "<FLOW INPUT FILE>" \ --label "Compute and Transfer Flow Example 1 Run"
And save the run ID for use in the next command.
-
Monitor the run progress:
globus flows run show "<RUN_ID>"
-
At this point, the run may become
INACTIVE
, depending on the type of collection being used. -
For inactive runs due to data access requirements, this can be resolved by resuming the run and following the prompts:
globus flows run resume "<RUN_ID>"
When prompted, run
globus session consent
and rerunglobus flows run resume
to resume the run.
-
Example Flow 2
In this second example, the flow takes a user-provided list of source files that exist on a user-provided source collection, distinct from the collection which is colocated with the Compute endpoint.
The flow transfers the files to the collection which is colocated with the Compute endpoint, creates a tarfile from them, and transfers the tarfile to a user-provided destination collection.
The flow will:
-
Set constants for the run
-
Create an output directory named after the run's ID on the intermediate collection
-
Compute destination paths for the input files on the intermediate collection
-
Transfer the source files to the newly created output directory folder on the intermediate collection
-
Invoke the
do_tar
function to create a tar archive from the source files and save it in the output directory -
Transfer the resulting tarfile to the destination collection provided in the flow input
-
Delete the output directory on the intermediate collection
Computing destination paths (step 3) is implemented using six different states in the flow (SetSourcePathsIteratorVariables
, EvalShouldIterateSourcePaths
, IterateSourcePaths
, EvalGetSourcePath
, GetSourcePathInfo
, and EvalSourcePathInfo
).
These states loop over each path. An alternative approach, with fewer states, would be to create a separate Compute function to handle this work.
Create the Flow
-
Edit
compute_transfer_example_2_definition.json
and replace the placeholder values:-
gcs_endpoint_id
: The intermediate collection ID -
compute_endpoint_id
: The Compute endpoint ID -
compute_function_id
: The UUID of the registereddo_tar
function
-
-
Create the flow:
globus flows create "Compute and Transfer Flow Example 2" \ ./compute_transfer_example_2_definition.json \ --input-schema ./compute_transfer_example_2_schema.json
Or update the existing flow from example 1:
globus flows update <FLOW_ID> \ --title "Compute and Transfer Flow Example 2" \ --definition ./compute_transfer_example_2_definition.json \ --input-schema ./compute_transfer_example_2_schema.json
-
Save the flow ID returned by this command
{
"StartAt": "SetConstants",
"States": {
"SetConstants": {
"Type": "ExpressionEval",
"Parameters": {
"gcs_endpoint_id": "<INSERT YOUR GCS ENDPOINT ID HERE>",
"gcs_base_path": "/",
"compute_endpoint_id": "<INSERT YOUR COMPUTE ENDPOINT ID HERE>",
"compute_function_id": "<INSERT YOUR COMPUTE FUNCTION ID HERE>",
"compute_output_directory.=": "'/' + _context.run_id + '/'"
},
"ResultPath": "$.constants",
"Next": "MakeComputeWorkingDir"
},
"MakeComputeWorkingDir": {
"Type": "Action",
"ActionUrl": "https://transfer.actions.globus.org/mkdir",
"Parameters": {
"endpoint_id.$": "$.constants.gcs_endpoint_id",
"path.$": "$.constants.compute_output_directory"
},
"ResultPath": "$.mkdir_result",
"Next": "SetSourcePathsIteratorVariables"
},
"SetSourcePathsIteratorVariables": {
"Type": "ExpressionEval",
"Parameters": {
"src_paths_iterator_pos": 0,
"compute_src_paths": [],
"transfer_from_src_data": []
},
"ResultPath": "$.iterator_vars",
"Next": "EvalShouldIterateSourcePaths"
},
"EvalShouldIterateSourcePaths": {
"Type": "ExpressionEval",
"Parameters": {
"should_iterate.=": "len(source_paths) > iterator_vars.src_paths_iterator_pos"
},
"ResultPath": "$.iterate_eval",
"Next": "IterateSourcePaths"
},
"IterateSourcePaths": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.iterate_eval.should_iterate",
"BooleanEquals": true,
"Next": "EvalGetSourcePath"
}
],
"Default": "TransferToComputeEndpoint"
},
"EvalGetSourcePath": {
"Type": "ExpressionEval",
"Parameters": {
"source_path.=": "source_paths[iterator_vars.src_paths_iterator_pos]"
},
"ResultPath": "$.get_path",
"Next": "GetSourcePathInfo"
},
"GetSourcePathInfo": {
"Type": "Action",
"ActionUrl": "https://transfer.actions.globus.org/stat",
"Parameters": {
"endpoint_id.$": "$.source_endpoint",
"path.$": "$.get_path.source_path"
},
"ResultPath": "$.source_path_stat_result",
"Next": "EvalSourcePathInfo"
},
"EvalSourcePathInfo": {
"Type": "ExpressionEval",
"Parameters": {
"compute_src_paths.=": "iterator_vars.compute_src_paths + [constants.compute_output_directory + source_path_stat_result.details.name]",
"transfer_from_src_data.=": "iterator_vars.transfer_from_src_data + [{'source_path': get_path.source_path, 'destination_path': constants.compute_output_directory + source_path_stat_result.details.name}]",
"src_paths_iterator_pos.=": "iterator_vars.src_paths_iterator_pos + 1"
},
"ResultPath": "$.iterator_vars",
"Next": "EvalShouldIterateSourcePaths"
},
"TransferToComputeEndpoint": {
"Type": "Action",
"ActionUrl": "https://transfer.actions.globus.org/transfer",
"Parameters": {
"source_endpoint.$": "$.source_endpoint",
"destination_endpoint.$": "$.constants.gcs_endpoint_id",
"DATA.$": "$.iterator_vars.transfer_from_src_data"
},
"ResultPath": "$.transfer_from_src_result",
"Next": "RunComputeFunction"
},
"RunComputeFunction": {
"Type": "Action",
"ActionUrl": "https://compute.actions.globus.org/v3",
"Parameters": {
"endpoint_id.$": "$.constants.compute_endpoint_id",
"tasks": [
{
"function_id.$": "$.constants.compute_function_id",
"args": [],
"kwargs": {
"src_paths.$": "$.iterator_vars.compute_src_paths",
"dest_path.$": "$.constants.compute_output_directory",
"gcs_base_path.$": "$.constants.gcs_base_path"
}
}
]
},
"ResultPath": "$.compute_result",
"Next": "TransferFromComputeEndpoint"
},
"TransferFromComputeEndpoint": {
"Type": "Action",
"ActionUrl": "https://transfer.actions.globus.org/transfer",
"Parameters": {
"source_endpoint.$": "$.constants.gcs_endpoint_id",
"destination_endpoint.$": "$.destination.id",
"DATA": [
{
"source_path.=": "compute_result.details.result[0]",
"destination_path.$": "$.destination.path"
}
]
},
"ResultPath": "$.transfer_to_dest_result",
"Next": "CleanupComputeEndpoint"
},
"CleanupComputeEndpoint": {
"Type": "Action",
"ActionUrl": "https://transfer.actions.globus.org/delete",
"Parameters": {
"endpoint.$": "$.constants.gcs_endpoint_id",
"recursive": true,
"DATA": [
{
"path.$": "$.constants.compute_output_directory"
}
]
},
"ResultPath": "$.delete_compute_output_result",
"End": true
}
}
}
{
"type": "object",
"required": [
"source_endpoint",
"source_paths",
"destination"
],
"properties": {
"source_endpoint": {
"type": "string",
"title": "Source Endpoint ID",
"description": "The endpoint id of the source collection"
},
"source_paths": {
"type": "array",
"title": "Source Collection Paths",
"description": "A list of paths on the source collection for the data"
},
"destination": {
"type": "object",
"title": "Destination",
"format": "globus-collection",
"required": [
"id",
"path"
],
"properties": {
"id": {
"type": "string",
"title": "Destination Collection ID",
"format": "uuid",
"description": "The endpoint id of the destination collection"
},
"path": {
"type": "string",
"title": "Destination Collection Path",
"description": "The path on the destination collection to transfer the compute output"
}
},
"description": "The destination for the data",
"additionalProperties": false
}
},
"additionalProperties": false
}
Run the Flow
-
Create the flow input JSON file:
{ "source_endpoint": "your-source-endpoint-uuid", "source_paths": ["/path/to/file1", "/path/to/file2"], "destination": { "id": "your-destination-endpoint-uuid", "path": "/path/to/your/destination/archive.tar.gz" } }
-
Start the flow:
globus flows start "$FLOW_ID" \ --input <YOUR FLOW INPUT FILE> \ --label "Compute and Transfer Flow Example 2 Run"
And save the run ID for use in the next command.
-
Monitor the run progress:
globus flows run show "<RUN_ID>"
-
At this point, the run may become
INACTIVE
, depending on the type of collection being used. -
For inactive runs due to data access requirements, this can be resolved by resuming the run and following the prompts:
globus flows run resume "<RUN_ID>"
When prompted, run
globus session consent
and rerunglobus flows run resume
to resume the run.
-