Pipeline
The Pipeline skill is how agents read and write Maestro’s internal Pipeline (the contacts + activities tables that back the Pipelines UI). Every other skill in the v1 hero score talks to the world — Apollo finds leads, Compose drafts, Gmail sends. The Pipeline skill is the one that records what happened so the Pipelines UI shows the operator what the agent did.
Three operations:
| Operation | Purpose | Status |
|---|---|---|
find_contact | Look up a contact by email within a pipeline. Returns the contact or null. Use before add_contact to avoid duplicates. | Shipped |
add_contact | Create a new contact in a pipeline. Logs a ‘created’ activity. Auto-links to a company by domain (creates one if missing). | Shipped |
log_activity | Append an activity to a contact’s timeline. Optionally advances the contact’s stage in the same call. | Shipped |
How agents typically use it
Cold-leads agent flow:
apollo.find_leads → list of candidates
for each candidate:
pipeline.find_contact → does this email already exist?
if exists: → skip, move to next
apollo.enrich_domain → company context
compose.draft_opener → subject + body
gmail.send_email → fire it off
pipeline.add_contact → create the contact, stage='contacted'
pipeline.log_activity → kind='contacted', payload={subject, preview}
Reply-triage agent flow:
gmail.list_inbox is:unread → recent inbound mail
for each thread:
gmail.read_thread → full message contents
pipeline.find_contact → match the sender to a known contact
if no match: → skip (not from a Maestro-tracked contact)
compose.classify_reply → intent, confidence, explanation
pipeline.log_activity → kind='triaged', new_stage based on intent,
payload={intent, confidence, explanation}
gmail.label_thread → apply Maestro/<intent> so we don't reprocess
The Pipelines UI streams these writes live via Postgres LISTEN/NOTIFY, so the operator sees the contact appear, advance stages, accumulate activities — all in real time as the agent works.
Setup
The Pipeline skill needs no manifest secrets — it talks directly to the same Postgres database the runtime is already connected to (via DATABASE_URL from .env). It opens a small per-skill connection pool (1–4 connections) lazily on first call.
If the skill ever returns internal: DATABASE_URL is not set, it means your runtime’s .env doesn’t have the database URL. That’s already required for the runtime itself to start — so this should never happen in a normally-configured install.
Pipeline IDs
Every operation takes pipeline_id (or contact_id, which transitively determines the pipeline). Agents pass them explicitly — usually hardcoded in the agent’s instructions:
“Write all contacts to pipeline
pl_saas.”
This keeps agent ↔ pipeline relationships visible in plain text, which makes them easy to audit and easy to change without touching code.
What the operations record
add_contact writes:
- A
contactsrow at the requested stage (defaultnew). - A
companiesrow ifcompany_domainis provided and the company doesn’t exist yet (de-duped by domain within the workspace). - A
createdactivity row capturing that the agent created the contact. last_activity_atis set tonow().
log_activity writes:
- An
activitiesrow with the givenkindand optional JSONpayload. contact.last_activity_at = now()andcontact.updated_at = now().- If
new_stageis provided,contact.stage = new_stagein the same transaction.
Both are transactional — if any step fails, the whole write is rolled back. No half-written contacts.
Activity kinds
Same enum as the database column:
created, enriched, contacted, replied, triaged, booked,
note, bounced, unsubscribed, excluded
Most kinds map 1:1 to a stage of the same name. Convention: when the agent records a contacted activity, it also passes new_stage='contacted' in the same call. Skipping the stage update is rare but legal — useful when an activity is informational (e.g. a note doesn’t advance stage).
Stage transitions
The valid stages in v1 (defined by the B2B SaaS Outbound score):
new → enriching → ready → contacted → replied → triaged → booked
disqualified
unsubscribed
disqualified and unsubscribed are terminal. The Pipeline skill doesn’t enforce transition order — an agent could go from new straight to booked if it had reason to. That flexibility is intentional; rule enforcement belongs in the score definition, not in the writes.
Failure modes
bad_input— duplicate email:add_contactrejects when an email already exists in the pipeline. Always callfind_contactfirst. The error message tells the LLM exactly what to do.bad_input— invalid stage / kind: input outside the allowed enums. Pydantic catches this in the SDK before the SQL runs.not_found— pipeline or contact:pipeline_iddoesn’t exist (e.g. typo in the agent’s instructions), orcontact_idwas deleted between writes.internal—DATABASE_URL is not set: env config issue, see Setup above.remote_unavailable: Postgres is down or the connection pool can’t open. Retryable.
Verifying
Skills → Pipeline → click Test next to find_contact. Default pipelines from the seed data:
pl_saas— the SaaS pipeline with 8 seeded contactspl_health— empty Healthcare pipeline (good for safe writes)
Try find_contact { pipeline_id: "pl_saas", email: "[email protected]" } — should return the seeded Sarah Chen contact. Try find_contact { pipeline_id: "pl_health", email: "[email protected]" } — should return {contact: null}.
Why Pipeline writes are a skill
Pipeline writes are explicit operations the agent calls — not implicit side-effects of other actions. Three reasons this matters:
- Predictable composition. Agents combine skills freely; without hidden side-effects, what an agent does is exactly what its instructions say.
- Intent stays explicit. Not every email is part of a tracked outreach campaign. Pairing a
gmail.send_emailwith apipeline.add_contactshould be a deliberate choice, not an automatic one. - Symmetry. Reply-triage reads from the Pipeline (looks up senders, checks stages) just as cold-leads writes to it. Both sides as skills keeps the architecture consistent end-to-end.