Skip to Content
DocsCore Components

Batch Processing (factory)

This example demonstrates how to create a factory pipeline that processes multiple items in parallel. We’ll build an article processing system that takes a list of topics and generates content for each one simultaneously, showcasing the power of factory pipelines for batch operations.

Note: This example uses create_mock tools for demonstration purposes to keep the example simple and focused on core workflow concepts. In real-world use cases, you would replace these mock tools with actual tools that perform the specific processing operations your automation requires (such as API calls, data transformations, file operations, or LLM interactions).

Pipeline Configuration

The root pipeline uses the factory pattern to process multiple topics in parallel:

pipelines: - name: factory_example title: "Example 2: Pipeline Factory" type: root workspace: examples sequence: - workflow: create_topics_workflow # Step 1: Setup topic list - pipeline: article_processor_pipeline # Step 2: Process topics in parallel - name: article_processor_pipeline title: "Batch Article Processor" type: factory parallel: true # Run all instances in parallel. Defaults to false. factory: workflow: generate_article_workflow # Template workflow to execute iterator: source: ${ context.variables.TOPIC_LIST } # Array of topics to process label: ${ item.topic } # Property to use for label. Defaults to ${ item } namespace: label: Articles # Namespace for organization

Key Concepts:

  • Factory Pipeline: Uses type: factory to create multiple workflow instances
  • Parallel Execution: parallel: true runs all instances simultaneously (defaults to false)
  • Iterator Source: source points to an array in the context
  • Template Workflow: factory.workflow defines what gets executed for each item

Setup Workflow: Create Topic List

The setup workflow creates the list of topics that will be processed:

workflows: - name: create_topics_workflow title: "Setup Topics" type: stateMachine transitions: - name: mock_topic_list from: start to: topics_ready call: - tool: create_mock arguments: input: 'Generate topic list for batch processing' output: - topic: "AI in Healthcare" category: "technology" - topic: "Sustainable Energy" category: "environment" - topic: "Quantum Computing" category: "technology" - topic: "Mental Health Apps" category: "health" exportContext: TOPIC_LIST - name: display_topics from: topics_ready to: end call: - tool: create_chat_message arguments: role: 'assistant' content: | πŸ“ **Batch Processing Started** Processing {{ context.variables.TOPIC_LIST.length }} topics in parallel: {{#each context.variables.TOPIC_LIST}} - {{ this.topic }} ({{ this.category }}) {{/each}}

Key Concepts:

  • Array Creation: Setup workflow creates the (mock) array that the factory will iterate over
  • Direct Object Output: The mock tool now outputs objects directly rather than JSON strings
  • Context Export: exportContext: TOPIC_LIST makes the array available to the factory pipeline
  • Handlebars Templates: Uses {{}} syntax for variable interpolation and {{#each}} for iteration

Sub-Workflow: Process Single Article

This workflow processes one topic and gets executed once for each item in the topic list:

workflows: - name: generate_article_workflow title: "Generate Article" type: stateMachine transitions: - name: generate_content from: start to: content_generated call: - tool: create_mock arguments: input: | Topic: {{ context.item.topic }} Category: {{ context.item.category }} output: | This article explores the fascinating world of {{ context.item.topic }}... as: ARTICLE_CONTENT - name: create_article from: content_generated to: article_created call: - tool: create_document arguments: document: article_document content: topic: ${ context.item.topic } category: ${ context.item.category } article: ${ ARTICLE_CONTENT } processed_at: "{{ currentDate }}" - name: display_result from: article_created to: end call: - tool: create_chat_message arguments: role: 'assistant' content: | βœ… {{ context.item.topic }} Article Completed

Key Concepts:

  • Item Access: Use {{ context.item }} to access the current item being processed
  • Mixed Template Syntax: Uses both Handlebars ({{}}) and expression syntax (${})
  • Document Creation: Results are stored in structured documents with defined schemas
  • Progress Feedback: Each completion shows in the chat interface

Document Schema

The example includes a structured document schema for consistent data storage:

documents: - name: article_document schema: type: object properties: topic: type: string category: type: string article: type: string processed_at: type: string ui: order: - topic - category - article - processed_at properties: article: widget: textarea-expand

Key Concepts:

  • Schema Definition: Defines the structure and types of document properties
  • UI Configuration: Specifies field order and display widgets
  • Textarea Widget: The textarea-expand widget provides better display for longer content

Data Flow Summary

Factory pipelines create a different data flow pattern compared to sequential pipelines:

Key Differences from Sequential Pipelines:

  • Parallel Execution: All workflow instances run simultaneously when parallel: true
  • Independent Context: Each instance has its own context with context.item
  • No Cross-Instance Communication: Factory workflows don’t share variables between instances
  • Document Storage: Use documents to persist results that can be accessed later

Running This Example

This example demonstrates factory pipeline capabilities:

  1. Navigate to the Studio interface
  2. Switch to the examples workspace
  3. Select β€œExample 2: Pipeline Factory” from available automations
  4. Watch as:
  • Setup workflow creates the topic list
  • Factory pipeline launches 4 parallel workflows
  • Each workflow processes its assigned topic independently
  • Results appear in chat as each completes

Configuration Location: You can also view the complete configuration file at src/config/examples/basic/factory-example.yaml

Complete Example:

include: - core/tools/create-mock.yaml - core/tools/create-chat-message.yaml - core/tools/create-document.yaml pipelines: - name: factory_example title: "Example 2: Pipeline Factory" type: root workspace: examples sequence: - workflow: create_topics_workflow # Step 1: Setup topic list - pipeline: article_processor_pipeline # Step 2: Process topics in parallel - name: article_processor_pipeline title: "Batch Article Processor" type: factory parallel: true # Run all instances in parallel. Defaults to false. factory: workflow: generate_article_workflow # Template workflow to execute iterator: source: ${ context.variables.TOPIC_LIST } # Array of topics to process label: ${ item.topic } # Property to use for label. Defaults to ${ item } namespace: label: Articles # Namespace for organization workflows: - name: create_topics_workflow title: "Setup Topics" type: stateMachine transitions: - name: mock_topic_list from: start to: topics_ready call: - tool: create_mock arguments: input: 'Generate topic list for batch processing' output: - topic: "AI in Healthcare" category: "technology" - topic: "Sustainable Energy" category: "environment" - topic: "Quantum Computing" category: "technology" - topic: "Mental Health Apps" category: "health" exportContext: TOPIC_LIST - name: display_topics from: topics_ready to: end call: - tool: create_chat_message arguments: role: 'assistant' content: | πŸ“ **Batch Processing Started** Processing {{ context.variables.TOPIC_LIST.length }} topics in parallel: {{#each context.variables.TOPIC_LIST}} - {{ this.topic }} ({{ this.category }}) {{/each}} - name: generate_article_workflow title: "Generate Article" type: stateMachine transitions: - name: generate_content from: start to: content_generated call: - tool: create_mock arguments: input: | Topic: {{ context.item.topic }} Category: {{ context.item.category }} output: | This article explores the fascinating world of {{ context.item.topic }}... as: ARTICLE_CONTENT - name: create_article from: content_generated to: article_created call: - tool: create_document arguments: document: article_document content: topic: ${ context.item.topic } category: ${ context.item.category } article: ${ ARTICLE_CONTENT } processed_at: "{{ currentDate }}" - name: display_result from: article_created to: end call: - tool: create_chat_message arguments: role: 'assistant' content: | βœ… {{ context.item.topic }} Article Completed documents: - name: article_document schema: type: object properties: topic: type: string category: type: string article: type: string processed_at: type: string ui: order: - topic - category - article - processed_at properties: article: widget: textarea-expand
Last updated on