Automate Your Content Pipeline
Build an automated content pipeline using n8n (no-code) or a Python script that generates and schedules content from a weekly calendar.
The goal
Create a system that:
- Reads a content calendar (a spreadsheet or JSON file)
- Generates video + music for each entry automatically
- Saves the outputs to a folder or cloud storage
- (Optionally) notifies you via email or Slack when ready
You schedule it weekly — it runs while you sleep.
Pipeline overview
Option A — n8n (no-code)
n8n is an open-source workflow automation tool with a visual editor. It has a native HTTP Request node that can call any REST API — including Skytells.
Step 1: Set up n8n
# Docker (recommended)
docker run -d --name n8n -p 5678:5678 \
-v ~/.n8n:/home/node/.n8n \
n8nio/n8n
# Or with npm
npm install -g n8n && n8n startOpen http://localhost:5678.
Step 2: Create the workflow
Nodes you'll need:
- Schedule Trigger — runs every Monday at 9am
- Read Binary File (or Google Sheets) — loads the content calendar
- SplitInBatches — processes one item at a time
- HTTP Request — calls
POST https://api.skytells.ai/v1/predictions - Wait → HTTP Request (loop) — polls until complete
- HTTP Request — downloads output
- Write Binary File — saves to disk
- Slack — notifies you when done
Step 3: Configure the HTTP Request node
{
"url": "https://api.skytells.ai/v1/predictions",
"method": "POST",
"headers": {
"x-api-key": "={{ $env.SKYTELLS_API_KEY }}",
"Content-Type": "application/json"
},
"body": {
"model": "truefusion-video-pro",
"input": {
"prompt": "={{ $json.prompt }}",
"duration_seconds": 5,
"aspect_ratio": "9:16"
}
}
}Option B — Python script
For developers who prefer code, here's a complete automated pipeline:
Content calendar format (JSON)
[
{
"week": "2025-W03",
"slug": "morning-coffee",
"video_prompt": "A barista making pour-over coffee, morning light, 9:16, cinematic",
"music_prompt": "Calm morning café ambience, acoustic guitar, warm, 10 seconds",
"post_date": "2025-01-13"
},
{
"week": "2025-W03",
"slug": "product-launch",
"video_prompt": "Minimalist product reveal on marble, slow rotation, clean studio",
"music_prompt": "Corporate announcement sting, brass and strings, dramatic build, 8 seconds",
"post_date": "2025-01-15"
}
]The pipeline script
#!/usr/bin/env python3
"""
content_pipeline.py — automated content generation pipeline
Usage: python content_pipeline.py content_calendar.json
"""
import os
import sys
import json
import time
import urllib.request
import subprocess
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
API_KEY = os.environ["SKYTELLS_API_KEY"]
BASE = "https://api.skytells.ai/v1"
OUTPUT_DIR = Path("generated")
OUTPUT_DIR.mkdir(exist_ok=True)
def create_prediction(model: str, input_data: dict) -> str:
"""Create a prediction and return its ID."""
req = urllib.request.Request(
f"{BASE}/predictions",
data=json.dumps({"model": model, "input": input_data}).encode(),
headers={"x-api-key": API_KEY, "Content-Type": "application/json"},
)
with urllib.request.urlopen(req) as resp:
return json.loads(resp.read())["id"]
def wait_for_prediction(prediction_id: str, interval: int = 5) -> str:
"""Poll until the prediction succeeds, return the output URL."""
while True:
req = urllib.request.Request(
f"{BASE}/predictions/{prediction_id}",
headers={"x-api-key": API_KEY},
)
with urllib.request.urlopen(req) as resp:
p = json.loads(resp.read())
if p["status"] == "succeeded":
return p["output"][0]
if p["status"] in ("failed", "canceled"):
raise RuntimeError(f"Prediction {prediction_id} failed: {p.get('error')}")
time.sleep(interval)
def download(url: str, path: Path) -> None:
urllib.request.urlretrieve(url, path)
def merge_video_audio(video_path: Path, audio_path: Path, output_path: Path) -> None:
subprocess.run(
[
"ffmpeg", "-y",
"-i", str(video_path),
"-i", str(audio_path),
"-c:v", "copy", "-c:a", "aac",
"-filter_complex", "[1:a]volume=0.35[m];[m]apad[out]",
"-map", "0:v", "-map", "[out]",
"-shortest", str(output_path),
],
check=True,
capture_output=True,
)
def process_entry(entry: dict) -> str:
slug = entry["slug"]
print(f"[{slug}] Starting generation...")
# Launch video and music in parallel
with ThreadPoolExecutor(max_workers=2) as executor:
video_future = executor.submit(
lambda: wait_for_prediction(
create_prediction("truefusion-video-pro", {
"prompt": entry["video_prompt"],
"duration_seconds": 10,
"aspect_ratio": "9:16",
})
)
)
music_future = executor.submit(
lambda: wait_for_prediction(
create_prediction("beatfusion-2.0", {
"prompt": entry["music_prompt"],
"duration_seconds": 12,
})
)
)
video_url = video_future.result()
music_url = music_future.result()
print(f"[{slug}] Downloading...")
video_path = OUTPUT_DIR / f"{slug}_video.mp4"
music_path = OUTPUT_DIR / f"{slug}_music.mp3"
final_path = OUTPUT_DIR / f"{slug}_final.mp4"
download(video_url, video_path)
download(music_url, music_path)
print(f"[{slug}] Merging...")
merge_video_audio(video_path, music_path, final_path)
print(f"[{slug}] Done → {final_path}")
return str(final_path)
def main():
calendar_path = sys.argv[1] if len(sys.argv) > 1 else "content_calendar.json"
with open(calendar_path) as f:
calendar = json.load(f)
print(f"Processing {len(calendar)} entries from {calendar_path}")
# Process up to 3 entries in parallel
with ThreadPoolExecutor(max_workers=3) as executor:
futures = {executor.submit(process_entry, entry): entry["slug"] for entry in calendar}
for future in as_completed(futures):
slug = futures[future]
try:
output = future.result()
print(f"✓ {slug}: {output}")
except Exception as e:
print(f"✗ {slug}: {e}")
if __name__ == "__main__":
main()Run it
SKYTELLS_API_KEY=sk-... python content_pipeline.py content_calendar.jsonSchedule it (cron)
# Add to crontab: run every Monday at 9am
crontab -e0 9 * * 1 cd /path/to/project && SKYTELLS_API_KEY=sk-... python content_pipeline.py content_calendar.json >> pipeline.log 2>&1Sending notifications
Add a Slack notification when the pipeline finishes:
import urllib.request
import json
def notify_slack(message: str) -> None:
webhook_url = os.environ.get("SLACK_WEBHOOK_URL")
if not webhook_url:
return
urllib.request.urlopen(
urllib.request.Request(
webhook_url,
data=json.dumps({"text": message}).encode(),
headers={"Content-Type": "application/json"},
)
)
# At the end of main():
notify_slack(f"✅ Content pipeline complete — {len(calendar)} videos generated")Storing in the cloud
Instead of saving locally, upload directly to S3 or Cloudflare R2:
import boto3
def upload_to_s3(local_path: Path, key: str) -> str:
s3 = boto3.client("s3")
s3.upload_file(
str(local_path),
os.environ["S3_BUCKET"],
key,
ExtraArgs={"ContentType": "video/mp4"},
)
return f"https://{os.environ['S3_BUCKET']}.s3.amazonaws.com/{key}"Summary
You've completed the AI Video & Audio for Creators path. Your automated content pipeline:
- Reads a content calendar
- Generates video clips and music in parallel
- Merges them with ffmpeg
- Saves outputs to disk or cloud storage
- Notifies you via Slack when done
All triggered weekly by a cron job or n8n schedule — hands-free content production.
Next steps:
- Build an AI Image Studio — add image generation to your toolkit
- Generative AI Patterns — advanced automation patterns