Flows API
  • Globus Flows
  • Overview
  • Getting Started
    • How to Run a Flow
    • How to Monitor a Flow Run
    • How to Create a Flow
  • Authoring Flows
    • Introduction
    • Actions
    • Expressions
    • Choice States
    • Wait States
    • Fail States
    • Pass States
    • Protecting Secrets
    • Handling Exceptions
    • Performing Actions as Different Users
    • Run Context
    • Validating Flow Definitions
  • Authoring Input Schemas
  • Authentication and Authorization
  • Consents and Resuming Runs
  • Permissions
  • Limits
  • Hosted Action Providers
    • Hello World
    • Globus Search - Ingest Task
    • Globus Search - Delete Task
    • Send Notification Email
    • Wait For User Selection
    • Expression Evaluation
    • DataCite Mint
    • Transfer APs
    • Compute AP
  • Example Flows
    • Simple Transfer
    • Move (copy and delete) files
    • Transfer and Share Files
    • Two Stage Globus Transfer
    • Transfer After Approval
    • Looping Batched Move
    • Tar and Transfer with Globus Compute
Skip to main content
Globus Docs
  • APIs
    Auth Flows Groups Search Timers Transfer Globus Connect Server Compute Helper Pages
  • Applications
    Globus Connect Personal Globus Connect Server Premium Storage Connectors Compute Command Line Interface Python SDK JavaScript SDK
  • Guides
  • Support
    FAQs Mailing Lists Contact Us Check Support Tickets
  1. Home
  2. Globus Services
  3. Globus Flows
  4. Example Flows
  5. Tar and Transfer with Globus Compute

Tar and Transfer with Globus Compute

These examples demonstrate how to build flows that combine Globus Compute and Globus Transfer to process and move data.

Each of these examples creates an archive file from the user’s files and transfers that archive to a destination. In one case the source data is already on the server running Globus Connect Server and Globus Compute, and in the other it is on a source collection owned by the end user.

Prerequisites

To run these examples, you must have a properly configured server and some local software installed.

You must have a co-located Globus Connect Server Collection and Globus Compute endpoint, either hosted on the same server or at least with access to a shared filesystem.

Globus Connect Server Collection

You can follow this guide for setting up a Globus Connect Server Collection to install Globus Connect Server and configure a collection.

For ease of use, we recommend using a Guest Collection.

Globus Compute Endpoint

This guide for setting up a Globus Compute Endpoint covers installation of the Globus Compute software.

This Compute endpoint must have read/write permissions on the same storage location where the Globus Connect Server ollection is hosted.

Globus CLI

You will also need the Globus CLI installed (CLI installation docs).

Globus CLI documentation recommends installation with pipx, as in pipx install globus-cli.

Globus Compute SDK

You must have the globus-compute-sdk Python package available. We strongly recommend using a virtual environment for this installation; installing with pip install globus-compute-sdk.

You can follow the Globus Compute install documentation to install the Compute SDK client package in a virtualenv.

Register the do_tar Compute Function

For users executing these examples, the next step is to register the do_tar Compute function. This is the Compute function which the example flows will invoke to create archives.

You can run the provided python script from the environment with globus-compute-sdk installed:

python ./register_compute_function.py

and save the Compute function's UUID.

Important

Make sure to note the function's UUID, it will be needed later!

#!/usr/bin/env python


def do_tar(
    src_paths: list[str],
    dest_path: str,
    gcs_base_path: str = "/",
) -> str:
    """
    Create a tar.gz archive from source files or directories and save it to the
    given destination.

    Parameters:
        src_paths (list[str]):  Source paths of files or directories to be archived.
        dest_path (str): Destination path where the tar.gz archive will be
                         written. This can be either an existing directory or a
                         file path (with the parent directory existing).
        gcs_base_path (str): The shared GCS collection's configured base path.
                             Default is "/".

    Returns:
        str: The output tar archive file path.

    Raises:
        ValueError: If src_paths is empty, dest_path is None, any provided path
                    does not begin with the expected prefix, or if any source
                    path or destination (or its parent) is invalid.
        RuntimeError: If an error occurs during the creation of the tar archive.

    Example:
        >>> output = do_tar(
        ...     src_paths=["/file1.txt", "/dir1/file2.txt"],
        ...     dest_path="/tar_output",
        ...     gcs_base_path="/path/to/root/"
        ... )
        >>> print(output)
        /tar_output/7f9c3f9a-2d75-4d2f-8b0a-0f0d7e6b1e3a.tar.gz
    """

    import tarfile
    import uuid
    from pathlib import Path

    def transform_path_to_absolute(path: str) -> str:
        """Transform a GCS-style path to an absolute filesystem path."""
        if not path.startswith("/"):
            raise ValueError(
                f"Path '{path}' does not start with the expected prefix '/'."
            )
        return path.replace("/", gcs_base_path, 1)

    def transform_path_from_absolute(path: str) -> str:
        """Transform an absolute filesystem path back to a GCS-style path."""
        if not path.startswith(gcs_base_path):
            raise ValueError(
                f"Path '{path}' does not start with "
                f"the expected prefix '{gcs_base_path}'."
            )
        return path.replace(gcs_base_path, "/", 1)

    # Validate src_paths and dest_path
    if not src_paths:
        raise ValueError("src_paths must not be empty.")
    if dest_path is None:
        raise ValueError("dest_path must not be None.")

    # Transform destination path
    transformed_dest_path = Path(transform_path_to_absolute(dest_path))

    # Transform and validate all source paths
    transformed_src_paths = []
    for src_path in src_paths:
        transformed_src_path = Path(transform_path_to_absolute(src_path))
        if not transformed_src_path.exists():
            raise ValueError(f"Source path '{src_path}' does not exist.")
        transformed_src_paths.append(transformed_src_path)

    # Validate transformed_dest_path
    if transformed_dest_path.exists():
        if not (transformed_dest_path.is_dir() or transformed_dest_path.is_file()):
            raise ValueError(
                f"Destination path '{dest_path}' is neither a directory nor a file."
            )
    else:
        if not transformed_dest_path.parent.exists():
            raise ValueError(
                f"Parent directory of destination path '{dest_path}' does not exist."
            )

    # Determine the final tar file path.
    if transformed_dest_path.exists() and transformed_dest_path.is_dir():
        # If destination is an existing directory, generate a unique tar file name.
        tar_file_name = f"{uuid.uuid4()}.tar.gz"
        transformed_dest_tar_path = transformed_dest_path / tar_file_name
    else:
        # Destination is treated as a file path.
        fn = transformed_dest_path.name
        if fn.endswith(".gz"):
            transformed_dest_tar_path = transformed_dest_path
        elif fn.endswith(".tar"):
            transformed_dest_tar_path = transformed_dest_path.with_name(fn + ".gz")
        else:
            transformed_dest_tar_path = transformed_dest_path.with_name(fn + ".tar.gz")

    # Informative message (could be replaced with logging.info in production code)
    print(
        f"Creating tar file at {transformed_dest_tar_path.absolute()} "
        f"with {len(transformed_src_paths)} source(s)"
    )

    # Create the tar.gz archive with exception handling.
    try:
        with tarfile.open(transformed_dest_tar_path, "w:gz") as tar:
            for src_path in transformed_src_paths:
                tar.add(src_path, arcname=src_path.name)
    except Exception as e:
        # Attempt to remove any incomplete tar file.
        if transformed_dest_tar_path.exists():
            try:
                transformed_dest_tar_path.unlink()
            except Exception as unlink_err:
                print(
                    f"Warning: Failed to remove incomplete tar file "
                    f"'{transformed_dest_tar_path}': {unlink_err}"
                )
        raise RuntimeError(f"Failed to create tar archive: {e}") from e

    # Transform the output path back to a GCS-style path and return.
    result_path = transform_path_from_absolute(
        str(transformed_dest_tar_path.absolute())
    )
    return result_path


if __name__ == "__main__":
    from globus_compute_sdk import Client

    gcc = Client()
    do_tar_fuid = gcc.register_function(do_tar)
    print(f"Tar func UUID is {do_tar_fuid}")
Tip

For the best results, use the same Python version for registration as the one running on your Globus Compute Endpoint.

This ensures that you always get the most consistent behavior between the two environments.

do_tar Behaviors

do_tar is a function which compiles multiple files into a tar archive. It accepts source paths (multiple files) and a destination where the new tar file should be created. It also is aware that it is meant to operate in the same location as a Globus Connect Server collection.

Parameter Description

src_paths

List of paths to the files/directories to be archived.

dest_path

Where to write the tar archive (directory or file path).

gcs_base_path

The co-located Globus Connect Server collection's configured base path. (default: "/").

Collection Base Paths

The gcs_base_path parameter is provided to the Compute function to allow it to transform the user input paths to absolute paths. If the collection has a configured base_path, this option is needed in order to handle the path rewrites this entails.

For example,

  • The collection has configured its base path to /path/to/root/.

  • A user wants to tar the files at the absolute path /path/to/root/input_files/.

  • This path therefore appears as /input_files/ on the collection.

  • The do_tar function will access the same storage filesystem, and will need to know that /input_files/ maps to /path/to/root/input_files/.

In this scenario, gcs_base_path would be set to /path/to/root/, allowing the function to match the behavior of the collection on the same filesystem.

Example Flow 1

In this first example, the Compute and Transfer flow takes a user-provided list of files that already exist in a preconfigured source collection.

The flow creates a tarfile from those files and transfers the tarfile to a user-provided destination collection.

The flow will:

  1. Set constants for the run

  2. Create an output directory named after the run's ID on the source collection

  3. Invoke the do_tar function to create a tar archive from the input source files and save it in the output directory

  4. Transfer the resulting tarfile to the destination collection provided in the flow input

  5. Delete the output directory on the source collection

Create the Flow

  1. Edit compute_transfer_example_1_definition.json and replace the placeholder values:

    • gcs_endpoint_id: The source collection ID

    • compute_endpoint_id: The Compute endpoint ID

    • compute_function_id: The UUID of the registered do_tar function

If the collection has a configured base path, also edit gcs_base_path.

  1. Create the flow:

    globus flows create "Compute and Transfer Flow Example 1" \
       ./compute_transfer_example_1_definition.json \
       --input-schema ./compute_transfer_example_1_schema.json
  2. Save the flow ID returned by this command

{
  "StartAt": "SetConstants",
  "States": {
    "SetConstants": {
      "Type": "ExpressionEval",
      "Parameters": {
        "gcs_endpoint_id": "<INSERT YOUR GCS ENDPOINT ID HERE>",
        "gcs_base_path": "/",
        "compute_endpoint_id": "<INSERT YOUR COMPUTE ENDPOINT ID HERE>",
        "compute_function_id": "<INSERT YOUR COMPUTE FUNCTION ID HERE>",
        "compute_output_directory.=": "'/' + _context.run_id + '/'"
      },
      "ResultPath": "$.constants",
      "Next": "MakeComputeWorkingDir"
    },
    "MakeComputeWorkingDir": {
      "Type": "Action",
      "ActionUrl": "https://transfer.actions.globus.org/mkdir",
      "Parameters": {
        "endpoint_id.$": "$.constants.gcs_endpoint_id",
        "path.$": "$.constants.compute_output_directory"
      },
      "ResultPath": "$.mkdir_result",
      "Next": "RunComputeFunction"
    },
    "RunComputeFunction": {
      "Type": "Action",
      "ActionUrl": "https://compute.actions.globus.org/v3",
      "Parameters": {
        "endpoint_id.$": "$.constants.compute_endpoint_id",
        "tasks": [
          {
            "function_id.$": "$.constants.compute_function_id",
            "args": [],
            "kwargs": {
              "src_paths.$": "$.source_paths",
              "dest_path.$": "$.constants.compute_output_directory",
              "gcs_base_path.$": "$.constants.gcs_base_path"
            }
          }
        ]
      },
      "ResultPath": "$.compute_result",
      "Next": "TransferFromComputeEndpoint"
    },
    "TransferFromComputeEndpoint": {
      "Type": "Action",
      "ActionUrl": "https://transfer.actions.globus.org/transfer",
      "Parameters": {
        "source_endpoint.$": "$.constants.gcs_endpoint_id",
        "destination_endpoint.$": "$.destination_endpoint_id",
        "DATA": [
          {
            "source_path.=": "compute_result.details.result[0]",
            "destination_path.$": "$.destination_path"
          }
        ]
      },
      "ResultPath": "$.transfer_to_dest_result",
      "Next": "CleanupComputeEndpoint"
    },
    "CleanupComputeEndpoint": {
      "Type": "Action",
      "ActionUrl": "https://transfer.actions.globus.org/delete",
      "Parameters": {
        "endpoint.$": "$.constants.gcs_endpoint_id",
        "recursive": true,
        "DATA": [
          {
            "path.$": "$.constants.compute_output_directory"
          }
        ]
      },
      "ResultPath": "$.delete_compute_output_result",
      "End": true
    }
  }
}
{
  "type": "object",
  "required": [
    "source_paths",
    "destination_path",
    "destination_endpoint_id"
  ],
  "properties": {
    "source_paths": {
      "type": "array",
      "title": "Source Collection Paths",
      "description": "A list of paths on the source collection for the data"
    },
    "destination_path": {
      "type": "string",
      "title": "Destination Collection Path",
      "description": "The path on the destination collection for the data"
    },
    "destination_endpoint_id": {
      "type": "string",
      "title": "Destination Collection Endpoint ID",
      "description": "The endpoint id of destination collection"
    }
  },
  "additionalProperties": false
}

Run the Flow

  1. Create the flow input JSON file:

    {
        "source_paths": ["/path/to/file1", "/path/to/file2"],
        "destination_path": "/path/to/your/destination/file.tar.gz",
        "destination_endpoint_id": "your-destination-endpoint-uuid"
    }
  2. Start the flow:

    globus flows start "$FLOW_ID" \
       --input "<FLOW INPUT FILE>" \
       --label "Compute and Transfer Flow Example 1 Run"

    And save the run ID for use in the next command.

  3. Monitor the run progress:

    globus flows run show "<RUN_ID>"
    • At this point, the run may become INACTIVE, depending on the type of collection being used.

    • For inactive runs due to data access requirements, this can be resolved by resuming the run and following the prompts:

      globus flows run resume "<RUN_ID>"

      When prompted, run globus session consent and rerun globus flows run resume to resume the run.

Example Flow 2

In this second example, the flow takes a user-provided list of source files that exist on a user-provided source collection, distinct from the collection which is colocated with the Compute endpoint.

The flow transfers the files to the collection which is colocated with the Compute endpoint, creates a tarfile from them, and transfers the tarfile to a user-provided destination collection.

The flow will:

  1. Set constants for the run

  2. Create an output directory named after the run's ID on the intermediate collection

  3. Compute destination paths for the input files on the intermediate collection

  4. Transfer the source files to the newly created output directory folder on the intermediate collection

  5. Invoke the do_tar function to create a tar archive from the source files and save it in the output directory

  6. Transfer the resulting tarfile to the destination collection provided in the flow input

  7. Delete the output directory on the intermediate collection

Note

Computing destination paths (step 3) is implemented using six different states in the flow (SetSourcePathsIteratorVariables, EvalShouldIterateSourcePaths, IterateSourcePaths, EvalGetSourcePath, GetSourcePathInfo, and EvalSourcePathInfo).

These states loop over each path. An alternative approach, with fewer states, would be to create a separate Compute function to handle this work.

Create the Flow

  1. Edit compute_transfer_example_2_definition.json and replace the placeholder values:

    • gcs_endpoint_id: The intermediate collection ID

    • compute_endpoint_id: The Compute endpoint ID

    • compute_function_id: The UUID of the registered do_tar function

  2. Create the flow:

    globus flows create "Compute and Transfer Flow Example 2" \
        ./compute_transfer_example_2_definition.json \
        --input-schema ./compute_transfer_example_2_schema.json

    Or update the existing flow from example 1:

    globus flows update <FLOW_ID> \
        --title "Compute and Transfer Flow Example 2" \
        --definition ./compute_transfer_example_2_definition.json \
        --input-schema ./compute_transfer_example_2_schema.json
  3. Save the flow ID returned by this command

{
  "StartAt": "SetConstants",
  "States": {
    "SetConstants": {
      "Type": "ExpressionEval",
      "Parameters": {
        "gcs_endpoint_id": "<INSERT YOUR GCS ENDPOINT ID HERE>",
        "gcs_base_path": "/",
        "compute_endpoint_id": "<INSERT YOUR COMPUTE ENDPOINT ID HERE>",
        "compute_function_id": "<INSERT YOUR COMPUTE FUNCTION ID HERE>",
        "compute_output_directory.=": "'/' + _context.run_id + '/'"
      },
      "ResultPath": "$.constants",
      "Next": "MakeComputeWorkingDir"
    },
    "MakeComputeWorkingDir": {
      "Type": "Action",
      "ActionUrl": "https://transfer.actions.globus.org/mkdir",
      "Parameters": {
        "endpoint_id.$": "$.constants.gcs_endpoint_id",
        "path.$": "$.constants.compute_output_directory"
      },
      "ResultPath": "$.mkdir_result",
      "Next": "SetSourcePathsIteratorVariables"
    },
    "SetSourcePathsIteratorVariables": {
      "Type": "ExpressionEval",
      "Parameters": {
        "src_paths_iterator_pos": 0,
        "compute_src_paths": [],
        "transfer_from_src_data": []
      },
      "ResultPath": "$.iterator_vars",
      "Next": "EvalShouldIterateSourcePaths"
    },
    "EvalShouldIterateSourcePaths": {
      "Type": "ExpressionEval",
      "Parameters": {
        "should_iterate.=": "len(source_paths) > iterator_vars.src_paths_iterator_pos"
      },
      "ResultPath": "$.iterate_eval",
      "Next": "IterateSourcePaths"
    },
    "IterateSourcePaths": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.iterate_eval.should_iterate",
          "BooleanEquals": true,
          "Next": "EvalGetSourcePath"
        }
      ],
      "Default": "TransferToComputeEndpoint"
    },
    "EvalGetSourcePath": {
      "Type": "ExpressionEval",
      "Parameters": {
        "source_path.=": "source_paths[iterator_vars.src_paths_iterator_pos]"
      },
      "ResultPath": "$.get_path",
      "Next": "GetSourcePathInfo"
    },
    "GetSourcePathInfo": {
      "Type": "Action",
      "ActionUrl": "https://transfer.actions.globus.org/stat",
      "Parameters": {
        "endpoint_id.$": "$.source_endpoint",
        "path.$": "$.get_path.source_path"
      },
      "ResultPath": "$.source_path_stat_result",
      "Next": "EvalSourcePathInfo"
    },
    "EvalSourcePathInfo": {
      "Type": "ExpressionEval",
      "Parameters": {
        "compute_src_paths.=": "iterator_vars.compute_src_paths + [constants.compute_output_directory + source_path_stat_result.details.name]",
        "transfer_from_src_data.=": "iterator_vars.transfer_from_src_data + [{'source_path': get_path.source_path, 'destination_path': constants.compute_output_directory + source_path_stat_result.details.name}]",
        "src_paths_iterator_pos.=": "iterator_vars.src_paths_iterator_pos + 1"
      },
      "ResultPath": "$.iterator_vars",
      "Next": "EvalShouldIterateSourcePaths"
    },
    "TransferToComputeEndpoint": {
      "Type": "Action",
      "ActionUrl": "https://transfer.actions.globus.org/transfer",
      "Parameters": {
        "source_endpoint.$": "$.source_endpoint",
        "destination_endpoint.$": "$.constants.gcs_endpoint_id",
        "DATA.$": "$.iterator_vars.transfer_from_src_data"
      },
      "ResultPath": "$.transfer_from_src_result",
      "Next": "RunComputeFunction"
    },
    "RunComputeFunction": {
      "Type": "Action",
      "ActionUrl": "https://compute.actions.globus.org/v3",
      "Parameters": {
        "endpoint_id.$": "$.constants.compute_endpoint_id",
        "tasks": [
          {
            "function_id.$": "$.constants.compute_function_id",
            "args": [],
            "kwargs": {
              "src_paths.$": "$.iterator_vars.compute_src_paths",
              "dest_path.$": "$.constants.compute_output_directory",
              "gcs_base_path.$": "$.constants.gcs_base_path"
            }
          }
        ]
      },
      "ResultPath": "$.compute_result",
      "Next": "TransferFromComputeEndpoint"
    },
    "TransferFromComputeEndpoint": {
      "Type": "Action",
      "ActionUrl": "https://transfer.actions.globus.org/transfer",
      "Parameters": {
        "source_endpoint.$": "$.constants.gcs_endpoint_id",
        "destination_endpoint.$": "$.destination.id",
        "DATA": [
          {
            "source_path.=": "compute_result.details.result[0]",
            "destination_path.$": "$.destination.path"
          }
        ]
      },
      "ResultPath": "$.transfer_to_dest_result",
      "Next": "CleanupComputeEndpoint"
    },
    "CleanupComputeEndpoint": {
      "Type": "Action",
      "ActionUrl": "https://transfer.actions.globus.org/delete",
      "Parameters": {
        "endpoint.$": "$.constants.gcs_endpoint_id",
        "recursive": true,
        "DATA": [
          {
            "path.$": "$.constants.compute_output_directory"
          }
        ]
      },
      "ResultPath": "$.delete_compute_output_result",
      "End": true
    }
  }
}
{
  "type": "object",
  "required": [
    "source_endpoint",
    "source_paths",
    "destination"
  ],
  "properties": {
    "source_endpoint": {
      "type": "string",
      "title": "Source Endpoint ID",
      "description": "The endpoint id of the source collection"
    },
    "source_paths": {
      "type": "array",
      "title": "Source Collection Paths",
      "description": "A list of paths on the source collection for the data"
    },
    "destination": {
      "type": "object",
      "title": "Destination",
      "format": "globus-collection",
      "required": [
        "id",
        "path"
      ],
      "properties": {
        "id": {
          "type": "string",
          "title": "Destination Collection ID",
          "format": "uuid",
          "description": "The endpoint id of the destination collection"
        },
        "path": {
          "type": "string",
          "title": "Destination Collection Path",
          "description": "The path on the destination collection to transfer the compute output"
        }
      },
      "description": "The destination for the data",
      "additionalProperties": false
    }
  },
  "additionalProperties": false
}

Run the Flow

  1. Create the flow input JSON file:

    {
        "source_endpoint": "your-source-endpoint-uuid",
        "source_paths": ["/path/to/file1", "/path/to/file2"],
        "destination": {
            "id": "your-destination-endpoint-uuid",
            "path": "/path/to/your/destination/archive.tar.gz"
        }
    }
  2. Start the flow:

    globus flows start "$FLOW_ID" \
        --input <YOUR FLOW INPUT FILE> \
        --label "Compute and Transfer Flow Example 2 Run"

    And save the run ID for use in the next command.

  3. Monitor the run progress:

    globus flows run show "<RUN_ID>"
    • At this point, the run may become INACTIVE, depending on the type of collection being used.

    • For inactive runs due to data access requirements, this can be resolved by resuming the run and following the prompts:

      globus flows run resume "<RUN_ID>"

      When prompted, run globus session consent and rerun globus flows run resume to resume the run.

  • Globus Flows
  • Overview
  • Getting Started
    • How to Run a Flow
    • How to Monitor a Flow Run
    • How to Create a Flow
  • Authoring Flows
    • Introduction
    • Actions
    • Expressions
    • Choice States
    • Wait States
    • Fail States
    • Pass States
    • Protecting Secrets
    • Handling Exceptions
    • Performing Actions as Different Users
    • Run Context
    • Validating Flow Definitions
  • Authoring Input Schemas
  • Authentication and Authorization
  • Consents and Resuming Runs
  • Permissions
  • Limits
  • Hosted Action Providers
    • Hello World
    • Globus Search - Ingest Task
    • Globus Search - Delete Task
    • Send Notification Email
    • Wait For User Selection
    • Expression Evaluation
    • DataCite Mint
    • Transfer APs
    • Compute AP
  • Example Flows
    • Simple Transfer
    • Move (copy and delete) files
    • Transfer and Share Files
    • Two Stage Globus Transfer
    • Transfer After Approval
    • Looping Batched Move
    • Tar and Transfer with Globus Compute
© 2010- The University of Chicago Legal Privacy Accessibility