Batch Processing (factory)
This example demonstrates how to create a factory pipeline that processes multiple items in parallel. Weβll build an article processing system that takes a list of topics and generates content for each one simultaneously, showcasing the power of factory pipelines for batch operations.
Note: This example uses
create_mock
tools for demonstration purposes to keep the example simple and focused on core workflow concepts. In real-world use cases, you would replace these mock tools with actual tools that perform the specific processing operations your automation requires (such as API calls, data transformations, file operations, or LLM interactions).
Pipeline Configuration
The root pipeline uses the factory pattern to process multiple topics in parallel:
pipelines:
- name: factory_example
title: "Example 2: Pipeline Factory"
type: root
workspace: examples
sequence:
- workflow: create_topics_workflow # Step 1: Setup topic list
- pipeline: article_processor_pipeline # Step 2: Process topics in parallel
- name: article_processor_pipeline
title: "Batch Article Processor"
type: factory
parallel: true # Run all instances in parallel. Defaults to false.
factory:
workflow: generate_article_workflow # Template workflow to execute
iterator:
source: ${ context.variables.TOPIC_LIST } # Array of topics to process
label: ${ item.topic } # Property to use for label. Defaults to ${ item }
namespace:
label: Articles # Namespace for organization
Key Concepts:
- Factory Pipeline: Uses
type: factory
to create multiple workflow instances - Parallel Execution:
parallel: true
runs all instances simultaneously (defaults to false) - Iterator Source:
source
points to an array in the context - Template Workflow:
factory.workflow
defines what gets executed for each item
Setup Workflow: Create Topic List
The setup workflow creates the list of topics that will be processed:
workflows:
- name: create_topics_workflow
title: "Setup Topics"
type: stateMachine
transitions:
- name: mock_topic_list
from: start
to: topics_ready
call:
- tool: create_mock
arguments:
input: 'Generate topic list for batch processing'
output:
- topic: "AI in Healthcare"
category: "technology"
- topic: "Sustainable Energy"
category: "environment"
- topic: "Quantum Computing"
category: "technology"
- topic: "Mental Health Apps"
category: "health"
exportContext: TOPIC_LIST
- name: display_topics
from: topics_ready
to: end
call:
- tool: create_chat_message
arguments:
role: 'assistant'
content: |
π **Batch Processing Started**
Processing {{ context.variables.TOPIC_LIST.length }} topics in parallel:
{{#each context.variables.TOPIC_LIST}}
- {{ this.topic }} ({{ this.category }})
{{/each}}
Key Concepts:
- Array Creation: Setup workflow creates the (mock) array that the factory will iterate over
- Direct Object Output: The mock tool now outputs objects directly rather than JSON strings
- Context Export:
exportContext: TOPIC_LIST
makes the array available to the factory pipeline - Handlebars Templates: Uses
{{}}
syntax for variable interpolation and{{#each}}
for iteration
Sub-Workflow: Process Single Article
This workflow processes one topic and gets executed once for each item in the topic list:
workflows:
- name: generate_article_workflow
title: "Generate Article"
type: stateMachine
transitions:
- name: generate_content
from: start
to: content_generated
call:
- tool: create_mock
arguments:
input: |
Topic: {{ context.item.topic }}
Category: {{ context.item.category }}
output: |
This article explores the fascinating world of {{ context.item.topic }}...
as: ARTICLE_CONTENT
- name: create_article
from: content_generated
to: article_created
call:
- tool: create_document
arguments:
document: article_document
content:
topic: ${ context.item.topic }
category: ${ context.item.category }
article: ${ ARTICLE_CONTENT }
processed_at: "{{ currentDate }}"
- name: display_result
from: article_created
to: end
call:
- tool: create_chat_message
arguments:
role: 'assistant'
content: |
β
{{ context.item.topic }} Article Completed
Key Concepts:
- Item Access: Use
{{ context.item }}
to access the current item being processed - Mixed Template Syntax: Uses both Handlebars (
{{}}
) and expression syntax (${}
) - Document Creation: Results are stored in structured documents with defined schemas
- Progress Feedback: Each completion shows in the chat interface
Document Schema
The example includes a structured document schema for consistent data storage:
documents:
- name: article_document
schema:
type: object
properties:
topic:
type: string
category:
type: string
article:
type: string
processed_at:
type: string
ui:
order:
- topic
- category
- article
- processed_at
properties:
article:
widget: textarea-expand
Key Concepts:
- Schema Definition: Defines the structure and types of document properties
- UI Configuration: Specifies field order and display widgets
- Textarea Widget: The
textarea-expand
widget provides better display for longer content
Data Flow Summary
Factory pipelines create a different data flow pattern compared to sequential pipelines:
Key Differences from Sequential Pipelines:
- Parallel Execution: All workflow instances run simultaneously when
parallel: true
- Independent Context: Each instance has its own context with
context.item
- No Cross-Instance Communication: Factory workflows donβt share variables between instances
- Document Storage: Use documents to persist results that can be accessed later
Running This Example
This example demonstrates factory pipeline capabilities:
- Navigate to the Studio interface
- Switch to the examples workspace
- Select βExample 2: Pipeline Factoryβ from available automations
- Watch as:
- Setup workflow creates the topic list
- Factory pipeline launches 4 parallel workflows
- Each workflow processes its assigned topic independently
- Results appear in chat as each completes
Configuration Location: You can also view the complete configuration file at src/config/examples/basic/factory-example.yaml
Complete Example:
include:
- core/tools/create-mock.yaml
- core/tools/create-chat-message.yaml
- core/tools/create-document.yaml
pipelines:
- name: factory_example
title: "Example 2: Pipeline Factory"
type: root
workspace: examples
sequence:
- workflow: create_topics_workflow # Step 1: Setup topic list
- pipeline: article_processor_pipeline # Step 2: Process topics in parallel
- name: article_processor_pipeline
title: "Batch Article Processor"
type: factory
parallel: true # Run all instances in parallel. Defaults to false.
factory:
workflow: generate_article_workflow # Template workflow to execute
iterator:
source: ${ context.variables.TOPIC_LIST } # Array of topics to process
label: ${ item.topic } # Property to use for label. Defaults to ${ item }
namespace:
label: Articles # Namespace for organization
workflows:
- name: create_topics_workflow
title: "Setup Topics"
type: stateMachine
transitions:
- name: mock_topic_list
from: start
to: topics_ready
call:
- tool: create_mock
arguments:
input: 'Generate topic list for batch processing'
output:
- topic: "AI in Healthcare"
category: "technology"
- topic: "Sustainable Energy"
category: "environment"
- topic: "Quantum Computing"
category: "technology"
- topic: "Mental Health Apps"
category: "health"
exportContext: TOPIC_LIST
- name: display_topics
from: topics_ready
to: end
call:
- tool: create_chat_message
arguments:
role: 'assistant'
content: |
π **Batch Processing Started**
Processing {{ context.variables.TOPIC_LIST.length }} topics in parallel:
{{#each context.variables.TOPIC_LIST}}
- {{ this.topic }} ({{ this.category }})
{{/each}}
- name: generate_article_workflow
title: "Generate Article"
type: stateMachine
transitions:
- name: generate_content
from: start
to: content_generated
call:
- tool: create_mock
arguments:
input: |
Topic: {{ context.item.topic }}
Category: {{ context.item.category }}
output: |
This article explores the fascinating world of {{ context.item.topic }}...
as: ARTICLE_CONTENT
- name: create_article
from: content_generated
to: article_created
call:
- tool: create_document
arguments:
document: article_document
content:
topic: ${ context.item.topic }
category: ${ context.item.category }
article: ${ ARTICLE_CONTENT }
processed_at: "{{ currentDate }}"
- name: display_result
from: article_created
to: end
call:
- tool: create_chat_message
arguments:
role: 'assistant'
content: |
β
{{ context.item.topic }} Article Completed
documents:
- name: article_document
schema:
type: object
properties:
topic:
type: string
category:
type: string
article:
type: string
processed_at:
type: string
ui:
order:
- topic
- category
- article
- processed_at
properties:
article:
widget: textarea-expand