Developing Pennsieve Processors

Everything you need to build a processor container that runs on the Pennsieve compute platform

🚧

This documentation applies to Pennsieve Workflow Services V2 -- This is not enabled by default in each workspace. We expect a broad roll-out in Q2 2026

Processor Guide

This guide covers everything you need to build a processor container that runs on the Pennsieve compute platform.

For infrastructure details (deployment modes, logging, cost, security), see the Operator Guide.

Processors are reusable across workflows and datasets. A single processor image can be registered once and used in many different pipelines. Because processors communicate only through files on disk, they can be written in any language and combined freely regardless of implementation.


1. Technical Overview

A processor is a Docker container that reads input files, does some computation, and writes output files.

Compute Types

Each processor runs as either an ECS Fargate task or an AWS Lambda function. The application record declares which runtimes the processor supports via the computeTypes array:

Compute TypeProvisioning LatencyMax DurationBest For
ecs (default)30-60sUnlimitedLong-running jobs, GPU workloads, large memory
lambda1-10s cold start15 minutesLightweight tasks (file conversion, metadata extraction)

Both types share the same INPUT_DIR / OUTPUT_DIR contract and read/write from the same EFS file system. A single workflow can mix ECS and Lambda processors freely.

The runtime for each processor is determined by:

  1. Default: the first entry in the application's computeTypes array (falls back to ["ecs"] if not set).
  2. Override: the workflow invocation can override the compute type per processor via params.computeTypeOverrides, a map of processor UUID to desired runtime. The ASL converter validates that the override is in the application's supported list and fails fast if not.

2. Container Contract

Your container must:

  1. Read input from INPUT_DIR — a path on EFS containing the files to process.
  2. Write output to OUTPUT_DIR — a path on EFS where results should go.
  3. Exit 0 on success, non-zero on failure.

3. Environment Variables

Static Values

The following environment variables are available inside every processor container, regardless of compute type:

VariableDescriptionECSLambda
PENNSIEVE_API_HOSTPennsieve API base URL (e.g., https://api.pennsieve.net)Container overrideFunction config
PENNSIEVE_API_HOST2Pennsieve API v2 base URL (e.g., https://api2.pennsieve.net)Container overrideFunction config
ENVIRONMENTEnvironment name (dev, prod)Container overrideFunction config
REGIONAWS regionContainer overrideFunction config
DEPLOYMENT_MODESecurity scope: basic, secure, or compliantContainer overrideFunction config

These static values are set as environment variables on both ECS and Lambda, so processor code using os.environ["PENNSIEVE_API_HOST"] works identically on both runtimes.

Processors can use DEPLOYMENT_MODE to adapt their behavior based on the compute node's network topology. For example, in compliant mode there is no internet access, so processors should skip external API calls or use VPC endpoints instead of failing with a timeout.

Per-Invocation Values

The following values change on every workflow run. For ECS processors, they are set as environment variables via container overrides. For Lambda processors, they are passed in the invocation payload because Lambda environment variables are part of the function configuration and cannot change per invocation.

ValueECS env varLambda payload field
Input directoryINPUT_DIRinputDir
Output directoryOUTPUT_DIRoutputDir
Workflow instance IDWORKFLOW_INSTANCE_IDworkflowInstanceId
Session tokenSESSION_TOKENsessionToken
Refresh tokenREFRESH_TOKENrefreshToken

Credential handling: Session and refresh tokens are never stored in the Step Functions state in plaintext. The compute gateway writes both tokens to an AWS Secrets Manager secret at the start of each execution. Before each processor runs, a ResolveToken state reads the secret and injects the tokens into the processor's environment. This ensures credentials are isolated per execution and protected at rest by Secrets Manager encryption.

The REFRESH_TOKEN allows processors to refresh an expired session token during long-running jobs (Cognito access tokens expire after 1 hour). Processors that make authenticated Pennsieve API calls should attempt a token refresh on HTTP 401 responses.

The Lambda handler wrapper is responsible for extracting these from the event and exporting them as INPUT_DIR, OUTPUT_DIR, etc. so that the processor logic works unchanged (see the dual-mode image example below).


4. Input and Output

How INPUT_DIR is Determined

  • First processor in the DAG (no dependencies): INPUT_DIR points to the downloaded input files from Pennsieve (/mnt/efs/{computeNodeId}/input/{workflowInstanceId}).
  • Processor with one dependency: INPUT_DIR points to the output directory of the upstream processor (/mnt/efs/{computeNodeId}/workdir/{workflowInstanceId}/{hash}).
  • Processor with multiple dependencies: INPUT_DIR points to a merged directory created by the platform before the processor starts (/mnt/efs/{computeNodeId}/workdir/{workflowInstanceId}/m_{hash}).

How Merges Work

When a processor has multiple dependencies in the DAG (e.g., a diamond pattern), the platform needs to combine their outputs into a single INPUT_DIR. This is handled automatically by a MergeInputs step that runs before the processor starts.

     [A]  ← output: /workdir/{id}/{hashA}/
    /   \
  [B]   [C]  ← outputs: /workdir/{id}/{hashB}/, /workdir/{id}/{hashC}/
    \   /
     [D]  ← input:  /workdir/{id}/m_{hashD}/  (merged)

The merge step is executed by the data-transfer Lambda and works as follows:

  1. Creates a merged target directory for the downstream processor.
  2. Iterates over each upstream dependency's output directory.
  3. For each file in each dependency output, creates a symlink in the merged directory pointing to the original file.
  4. If two dependencies produce a file with the same name, the workflow fails immediately with a collision error.

Because the merge uses symlinks rather than copies, it is fast and uses no additional EFS storage regardless of file size.

Avoiding collisions: If your workflow uses a diamond DAG where multiple upstream processors may produce files with the same name, each processor should prefix its output filenames with a unique identifier (e.g., a run ID or processor name). This ensures the merge step can combine outputs without collisions.


5. Building Processor Images

Dockerfile Guidelines (ECS Processors)

There are no special requirements for the Dockerfile. Any image that respects INPUT_DIR / OUTPUT_DIR works.

Example:

FROM python:3.11-slim

WORKDIR /app
COPY process.py .

CMD ["python", "process.py"]
# process.py
import os, shutil

input_dir = os.environ["INPUT_DIR"]
output_dir = os.environ["OUTPUT_DIR"]

for filename in os.listdir(input_dir):
    filepath = os.path.join(input_dir, filename)
    # ... process file ...
    shutil.copy(filepath, os.path.join(output_dir, filename))

Dual-Mode Images (Lambda Processors)

Lambda processors use the same container image as ECS but have additional requirements imposed by the Lambda container runtime:

ConstraintDetail
Runtime Interface Client (RIC)Image must include the Lambda RIC for your language
Read-only root filesystemOnly /tmp (512 MB - 10 GB) and EFS (/mnt/efs) are writable
Max image size10 GB uncompressed (including all layers)
ArchitectureSingle architecture only (no multi-arch images)
Max duration15 minutes

The recommended approach for dual-mode images is a runtime-detecting entrypoint that checks the AWS_LAMBDA_RUNTIME_API environment variable (set by Lambda, absent on ECS) to decide whether to run the RIC or the normal command:

FROM python:3.11-slim

# Install Lambda RIC
RUN pip install awslambdaric

WORKDIR /app
COPY process.py handler.py entrypoint.sh ./
RUN chmod +x entrypoint.sh

ENTRYPOINT ["./entrypoint.sh"]
CMD ["python", "process.py"]
#!/bin/sh
# entrypoint.sh — detects runtime and branches
if [ -n "$AWS_LAMBDA_RUNTIME_API" ]; then
    # Running on Lambda: start the RIC with our handler
    exec python -m awslambdaric handler.handler
else
    # Running on ECS: execute the default command
    exec "$@"
fi
# handler.py — Lambda handler that bridges payload → env vars
import os, subprocess

def handler(event, context):
    # Export per-invocation values as env vars so process.py works unchanged
    os.environ["INPUT_DIR"] = event["inputDir"]
    os.environ["OUTPUT_DIR"] = event["outputDir"]
    os.environ["WORKFLOW_INSTANCE_ID"] = event.get("workflowInstanceId", "")
    os.environ["SESSION_TOKEN"] = event.get("sessionToken", "")
    os.environ["REFRESH_TOKEN"] = event.get("refreshToken", "")
    # PENNSIEVE_API_HOST, PENNSIEVE_API_HOST2, ENVIRONMENT, REGION,
    # DEPLOYMENT_MODE are already set as Lambda function env vars — no action needed

    subprocess.run(["python", "process.py"], check=True)
    return {"status": "success"}

On ECS, AWS_LAMBDA_RUNTIME_API is not set, so the entrypoint falls through to exec "$@" which runs python process.py (the CMD). All values arrive as environment variables.

On Lambda, the entrypoint detects the Lambda runtime and starts the RIC, which invokes handler.handler. The handler exports the per-invocation payload fields as env vars, then runs the same process.py.


6. Resource Defaults and Limits

ECS Fargate processors:

ResourceDefaultFargate Maximum
vCPU0.516
Memory1 GB120 GB
Ephemeral storage20 GB200 GB
TimeoutNone (runs until complete)None

GPU instances (g4dn family) are available when the compute node is configured with GPU capacity.

Lambda processors:

ResourceDefaultLambda Maximum
Memory3008 MB10,240 MB
Timeout900s (15 min)900s (15 min)
Ephemeral storage512 MB10,240 MB
EFS mount/mnt/efs/mnt/efs

Lambda processors always run in the compute node's VPC with access to EFS. Note that Lambda functions in a VPC do not have internet access unless a NAT Gateway is present (secure mode). In basic mode, Lambda processors have EFS access but no internet.


7. Registration and Workflows

Registering a Processor

Processors are registered as applications in the Pennsieve platform. Each application record includes:

FieldDescription
sourceUrlThe container image reference (e.g., an ECR URI or GitHub repository URL). This is the primary identifier used to resolve processors in workflows.
uuidAuto-generated unique identifier used for status tracking and log separation.
computeTypesSupported runtimes: ["ecs"], ["lambda"], or ["ecs", "lambda"] for dual-mode. Defaults to ["ecs"] if not set.
cpu, memoryDefault resource allocation for the processor.
paramsParameter schema the processor accepts.
commandArgumentsDefault command-line arguments.

Registration will leverage the GitHub integration with the Pennsieve platform (pending Q1 2026), allowing processors to be registered directly from GitHub repositories with automatic image builds and version tracking.

Defining a Workflow

A workflow defines a DAG of processors. Each node references a processor by its sourceUrl and declares dependencies on other nodes:

{
  "name": "My Analysis Pipeline",
  "description": "Convert, extract features, and score",
  "processors": [
    {
      "id": "convert",
      "sourceUrl": "ghcr.io/my-org/converter:latest"
    },
    {
      "id": "extract",
      "sourceUrl": "ghcr.io/my-org/feature-extractor:latest",
      "dependsOn": ["convert"]
    },
    {
      "id": "score",
      "sourceUrl": "ghcr.io/my-org/scorer:latest",
      "dependsOn": ["extract"]
    }
  ]
}

The platform validates that the DAG is acyclic and computes the execution order automatically. Independent processors (no dependency between them) run in parallel.

Running a Workflow

A workflow is triggered via the workflow service API. The caller specifies the dataset and packages to process, and optionally configures per-processor settings:

{
  "workflowInstanceConfiguration": {
    "workflowId": "workflow-uuid",
    "computeNodeId": "compute-node-uuid",
    "processorConfigs": [
      {
        "nodeId": "convert",
        "version": "latest",
        "executionTarget": "ecs"
      },
      {
        "nodeId": "extract",
        "version": "v1.2.0",
        "executionTarget": "lambda"
      }
    ]
  },
  "datasetId": "N:dataset:abc-123",
  "packageIds": ["pkg-1", "pkg-2"]
}

The processorConfigs array allows overriding the runtime (executionTarget) and pinning a specific version per processor. Each override is validated against the processor's computeTypes array — requesting "lambda" for a processor that only supports ["ecs"] will fail with a clear error. Processors without an override use the first value from their computeTypes array.

At execution time, the platform snapshots the workflow definition and configuration into an immutable execution record. This ensures that changes to the workflow definition do not affect running executions and provides a full audit trail of what was executed.

What Happens at Runtime

When a workflow execution starts, the compute node:

  1. Fetches the execution record (containing the full DAG and processor configs).
  2. Resolves each processor's container image from its sourceUrl.
  3. Registers ECS task definitions for ecs processors and Lambda functions for lambda processors.
  4. Generates a Step Functions state machine that executes the DAG in the correct order.
  5. Runs each processor, tracking per-processor status (NOT_STARTEDSTARTEDSUCCEEDED or FAILED).

Lambda functions are named proc-lambda-{nodeIdentifier}-{imageHash} and are reused across workflow runs. They are cleaned up when the workflow definition is deleted or the compute node is destroyed.