Pipe System

Learn how Open Ticket AI pipes process data through sequential execution, context sharing, dependencies, and conditional logic.

Pipe System

Pipes are the fundamental processing units in Open Ticket AI. Each pipe performs a specific task, receives context from previous pipes, executes its logic, and passes updated context forward.

Basic Pipeline Flow

A pipeline is a sequence of pipes that execute one after another:

flowchart TD
    Start([Start])
    Pipe1["Pipe 1 - Fetch Tickets"]
    Pipe2["Pipe 2 - Classify Tickets"]
    Pipe3["Pipe 3 - Update Tickets"]
    End([Complete])
    Start --> Pipe1
    Pipe1 --> Pipe2
    Pipe2 --> Pipe3
    Pipe3 --> End

Each pipe:

  1. Receives the PipeContext (containing results from previous pipes)
  2. Executes its specific task
  3. Creates a PipeResult with output data

What is a Pipe?

A pipe is a self-contained processing unit that:

  • Implements specific business logic (fetch data, classify, update, etc.)
  • Receives input via PipeContext
  • Produces output as PipeResult
flowchart LR
    subgraph Rendering
        PC["PipeContext"]:::value --> JR[["Jinja Renderer"]]:::renderer --> PP["PipeParams"]:::value
    end

    P(("Pipe")):::pipe
    PR["PipeResult"]:::value
    PP --> P --> PR
    classDef pipe fill: #0b1220, stroke: #7c4dff, stroke-width: 2px, color: #e6e7ea
    classDef renderer fill: #0b1220, stroke: #22c55e, stroke-dasharray: 5 3, stroke-width: 2px, color: #e6e7ea
    classDef value fill: #111827, stroke: #475569, color: #e6e7ea

Pipe Types

Simple Pipes

Atomic processing units that implement specific business logic:

Yaml Example

- id: fetch_tickets
  use: open_ticket_ai.base:FetchTicketsPipe
  injects:
    ticket_system: 'otobo_znuny'
  params:
    search_criteria:
      queue:
        name: 'Support'
      limit: 10

Characteristics:

  • Runs specific logic
  • No child pipes

Specific Simple Pipes:

  • AddNotePipe — registryKey: base:AddNotePipe
  • FetchTicketsPipe — registryKey: base:FetchTicketsPipe
  • UpdateTicketPipe — registryKey: base:UpdateTicketPipe

Expression Pipe (special)

Renders an expression and returns that value. If it renders to a FailMarker, the pipe fails. registryKey: base:ExpressionPipe

Yaml Example

- id: check_any_tickets
  use: 'base:ExpressionPipe'
  params:
    expression: >
      {{ fail() if (get_pipe_result('fetch_tickets','fetched_tickets')|length)==0 else 'ok' }}

Composite Pipes

Orchestrators that contain and execute child pipes:

Flowchart

flowchart TB
    subgraph CompositePipe
        A["Pipe #1"] --> B["Pipe #2"] --> C["Pipe #3"]
    end

    A --> U["Result #1"]
    B --> U["Result #2"]
    C --> U["Result #3"]
    U["union(Results #1, #2 #3)"]
    U --> CR["PipeResult"]

Yaml Example

Composite Pipe Example
- id: ticket_workflow
  use: open_ticket_ai.base:CompositePipe
  params:
    threshold: 0.8
  steps:
    - id: fetch
      use: open_ticket_ai.base:FetchTicketsPipe
      injects: { ticket_system: 'otobo_znuny' }
      params:
        search_criteria:
          queue: { name: 'Incoming' }
          limit: 10

    - id: classify
      use: otai_hf_local:HFLocalTextClassificationPipe
      params:
        model: 'bert-base-german-cased'
        text: "{{ get_pipe_result('fetch').data.fetched_tickets[0].subject }}"

    - id: update
      use: open_ticket_ai.base:UpdateTicketPipe
      injects: { ticket_system: 'otobo_znuny' }
      params:
        ticket_id: "{{ get_pipe_result('fetch').data.fetched_tickets[0].id }}"
        updated_ticket:
          queue:
            name: "{{ get_pipe_result('classify', 'predicted_queue') }}"

Characteristics

  • Contains steps list of child pipe configs
  • Executes children sequentially
  • Merges results
  • Children can access parent params via parent.params

Composite Execution:

  1. For Each Step:
    • Render: Renders Params with Jinja2 using current context
    • Execute: Execute next child pipe with rendered params
    • Loop: Continue to next step
  2. Finalization:
    • Union: Merge all child results using
    • Return: Return final updated context

SimpleSequentialOrchestrator (special)

Runs its steps in an endless loop. It’s for background-style cycles. It does not expose the child pipes’ results as a single pipe result. registryKey: base:SimpleSequentialOrchestrator

Flowchart

flowchart TB
    subgraph SimpleSequentialOrchestrator
        S1["step #1"] --> S2["step #2"] --> S3["step #3"]
    end
    S3 --> Z(("sleep"))
    Z -->|loop| S1

Yaml Example

SimpleSequentialOrchestrator Example
- id: orchestrator
  use: 'base:SimpleSequentialOrchestrator'
  params:
    orchestrator_sleep: 'PT0.5S'
    exception_sleep: 'PT5S'
    always_retry: true
    steps:
      - id: tick
        use: 'base:IntervalTrigger'
        params: { interval: 'PT5S' }
      - id: fetch
        use: 'base:FetchTicketsPipe'
        injects: { ticket_system: 'otobo_znuny' }
        params:
          ticket_search_criteria: { queue: { name: 'Incoming' }, limit: 1 }

SimpleSequentialRunner (special)

Has two params: on and run (both are pipe configs). If on succeeds, it executes run; otherwise it skips. registryKey: base:SimpleSequentialRunner

Flowchart

flowchart TB
    subgraph SimpleSequentialRunner
        ON{"on (trigger) succeded ?"} -->|yes| RUN["run (action)"]
        ON -.->|no| SKIP["skipped"]
    end

Yaml Example

- id: run-when-triggered
  use: 'base:SimpleSequentialRunner'
  params:
    on:
      id: gate
      use: 'base:IntervalTrigger'
      params: { interval: 'PT60S' }
    run:
      id: do-something
      use: 'base:ExpressionPipe'
      params: { expression: 'Triggered run' }

Context Passing Between Pipes

Field Details:

  • pipes: Contains results from all previously executed pipes, keyed by pipe ID

    • Access via pipe_result('pipe_id') in templates
    • Accumulated as each pipe completes
    • In CompositePipe: merged results from all child steps
  • params: Current pipe’s parameters

    • Set when the pipe is created
    • Accessible via params.* in templates
    • For nested pipes, can reference parent via parent
  • parent: Reference to parent params

PipeResult Structure

Each pipe produces a PipeResult containing execution outcome and data:

AttributeData TypeDescription
succeededtrue/falseWhether the pipe completed successfully without errors
dataname:value pairsOutput data produced by the pipe for use by following pipes or external systems
was_skippedtrue/falseWhether the pipe was skipped due to failed dependencies or conditional execution
messageTEXTHuman-readable message describing the result or any issues

you access those results in the params of another pipe with those functions. Currenly there is no way to read was_skipped or message;

FunctionParametersReturnsErrors if…
has_failedpipe_id: textTrue if the given pipe result is marked failedUnknown pipe ID
get_pipe_resultpipe_id: text, data_key: text; default = valueValue stored in previous pipe result under given data_keyPipe or key missing

So for example if piperesult is

- id: ticket_fetcher
  result:
    succeeded: true
    data:
      fetched_tickets:
        - id: 123
          subject: 'Help me!'
        - id: 124
          subject: 'Another ticket'

To access the fetched tickets you would use:

{ { get_pipe_result('ticket_fetcher', 'fetched_tickets') } }

Returning:

- id: 123
  subject: 'Help me!'
- id: 124
  subject: 'Another ticket'

To access the subject of the first ticket you would use:

{ { (get_pipe_result('ticket_fetcher', 'fetched_tickets') | first)[ 'subject' ] }

Returning:

Help me!

To check if the ticket_fetcher pipe failed you would use:

{ { has_failed('ticket_fetcher') } }

Returning:

false

Checking the success can be used in combination with fail() to create guards:

- id: fail_no_tickets
  use: 'base:ExpressionPipe'
  params:
    expression: >
      {{ fail() if has_failed('ticket_fetcher' else 'ok' }}

The Pipeline/FLow would stop executing at this point if the ticket_fetcher pipe failed.

Pipe Registry Overview

NameregistryKeyParams (names, comma-separated)
AddNotePipebase:AddNotePipeticket_id, note
FetchTicketsPipebase:FetchTicketsPipeticket_search_criteria
UpdateTicketPipebase:UpdateTicketPipeticket_id, updated_ticket
ClassificationPipebase:ClassificationPipetext, model_name, api_token
ExpressionPipebase:ExpressionPipeexpression
IntervalTriggerbase:IntervalTriggerinterval
SimpleSequentialRunnerbase:SimpleSequentialRunneron, run
SimpleSequentialOrchestratorbase:SimpleSequentialOrchestratororchestrator_sleep, exception_sleep, always_retry, steps
CompositePipebase:CompositePipesteps

Use these values in your YAML as: use: "<registryKey>".


Pipe Details

AddNotePipe

The Pipe AddNotePipe adds a structured note (subject/body) to a given ticket in the ticket system. It logs a short preview of the note and writes it via the ticket system service.

registryKey: base:AddNotePipe So to “use” it set use to this registryKey.

- id: 'my-add_note_pipe'
  use: 'base:AddNotePipe'
  params:
    ticket_id: '<ticket id>'
    note:
      subject: '<subject>'
      body: '<body>'

FetchTicketsPipe

The Pipe FetchTicketsPipe queries tickets from the ticket system using a unified TicketSearchCriteria (queue, limit, offset, etc.) and returns them as fetched_tickets.

registryKey: base:FetchTicketsPipe

- id: 'my-fetch_tickets'
  use: 'base:FetchTicketsPipe'
  params:
    ticket_search_criteria:
      queue:
        name: '<QueueName>'
      limit: 10

UpdateTicketPipe

The Pipe UpdateTicketPipe updates an existing ticket by ID using a unified UnifiedTicket payload (e.g., queue, priority, fields). Returns success state from the ticket system.

registryKey: base:UpdateTicketPipe

- id: 'my-update_ticket'
  use: 'base:UpdateTicketPipe'
  params:
    ticket_id: '<ticket id>'
    updated_ticket:
      queue:
        name: '<QueueName>'

ClassificationPipe

The Pipe ClassificationPipe classifies input text using a configured ClassificationService and model, returning label, confidence, and scores (if available).

registryKey: base:ClassificationPipe

- id: 'my-classify'
  use: 'base:ClassificationPipe'
  injects: { classification_service: 'hf_local' }
  params:
    text: '{{ some_text }}'
    model_name: 'softoft/otai-queue-de-bert-v1'
    api_token: '{{ env.HF_TOKEN | default(None) }}'

ExpressionPipe

The Pipe ExpressionPipe returns the value of expression; if it evaluates to a FailMarker, the pipe fails (useful for control-flow/guards).

registryKey: base:ExpressionPipe

- id: 'my-expression'
  use: 'base:ExpressionPipe'
  params:
    expression: "{{ fail() if condition else 'ok' }}"

IntervalTrigger

The Pipe IntervalTrigger gates downstream execution based on a time interval; it succeeds once the interval has passed since the last trigger, otherwise fails.

registryKey: base:IntervalTrigger

- id: 'my-interval'
  use: 'base:IntervalTrigger'
  params:
    interval: 'PT60S'

SimpleSequentialRunner

The Pipe SimpleSequentialRunner runs on and, only if it succeeds, runs run. It’s a minimal two-step control runner.

registryKey: base:SimpleSequentialRunner

- id: 'my-simple_runner'
  use: 'base:SimpleSequentialRunner'
  params:
    on:
      id: 'interval'
      use: 'base:IntervalTrigger'
      params: { interval: 'PT5M' }
    run:
      id: 'do-something'
      use: 'base:ExpressionPipe'
      params: { expression: 'go!' }

SimpleSequentialOrchestrator

The Pipe SimpleSequentialOrchestrator loops forever, executing steps in order each cycle; it optionally sleeps between cycles and on exceptions, and can auto-retry.

registryKey: base:SimpleSequentialOrchestrator For SimpleSequentialOrchestrator you don’t need to set params, since all attributes are optional.

- id: 'my-orchestrator'
  use: 'base:SimpleSequentialOrchestrator'
  params:
    orchestrator_sleep: 'PT0.5S'
    exception_sleep: 'PT5S'
    always_retry: true
    steps:
      - id: 'step-1'
        use: 'base:ExpressionPipe'
        params: { expression: 'ok' }

CompositePipe

The Pipe CompositePipe executes its steps sequentially, stops on the first failure, and returns a union of prior results. It’s the base building block for multi-step flows.

registryKey: base:CompositePipe For CompositePipe you don’t need to set params, since all attributes are optional.

- id: 'my-composite'
  use: 'base:CompositePipe'
  params:
    steps:
      - id: 'one'
        use: 'base:ExpressionPipe'
        params: { expression: 'first' }
      - id: 'two'
        use: 'base:ExpressionPipe'
        params: { expression: 'second' }