Cognition: Transport and Buffer

How @skytells/cognition holds events in a bounded ring buffer, batches them, compresses the payload, and delivers to dsn.skytells.ai with automatic retries.

Every event Cognition captures — errors, runtime snapshots, security threats, trace spans, and custom events — passes through the same delivery pipeline before it reaches Skytells. Understanding this pipeline helps you tune buffer sizes, flush timing, and retry behaviour for your environment.


Pipeline Overview

This is the path every event takes from the moment it's captured to the moment it's delivered:

capture(event)


beforeSend(event)  ──null──► dropped (silently)


EventBuffer (ring buffer, bounded)

    ▼  (flush timer: every 5s by default)
TransportManager

    ├── drain(batchSize) → EventEnvelope


HttpTransport

    ├── JSON.stringify(envelope)
    ├── gzip if payload > 1KB


https://dsn.skytells.ai
    ├── 200 OK           → success
    ├── 429 Rate Limited → stop, honor Retry-After
    ├── 4xx Client Error → drop permanently (non-retryable)
    └── 5xx Server Error → retry with exponential backoff

Event Buffer

Ring Buffer Design

The EventBuffer is a bounded circular buffer that drops the oldest event when it fills up. This gives you:

  • Predictable memory usage — never more than transport.maxBufferSize events in memory
  • Recent events kept — the newest events are the most actionable
  • No allocation overhead — pre-allocated array with head and count pointers

Buffer Operations

OperationDescription
push(event)Add event. If full, drops oldest and increments droppedCount
drain(max)Remove and return up to max events in FIFO order
sizeCurrent number of buffered events
isFullWhether buffer is at capacity
droppedCountTotal events dropped since SDK initialization
clear()Remove all events and reset counters

Monitoring Buffer Health

Check buffer depth and dropped event counts at any time:

console.log(`Buffer: ${cognition.bufferSize} events`);
console.log(`Dropped: ${cognition.droppedCount} events`);

If events are being dropped:

  • Increase transport.maxBufferSize to hold more
  • Decrease transport.flushIntervalMs to flush more often
  • Use beforeSend to filter out high-volume events you don't need

Flush Behavior

Automatic Flush

The TransportManager starts a periodic flush timer (default every 5 seconds, unref()'d):

  1. Drain up to batchSize events from the buffer
  2. Sanitize event timestamps — any NaN, Infinity, or ≤ 0 timestamp is corrected to Date.now() without mutating the buffered event
  3. Wrap events in an EventEnvelope with SDK metadata
  4. Send via HttpTransport
  5. Repeat until buffer is empty

Manual Flush

Force an immediate flush of all buffered events:

await cognition.flush();

Useful before a known high-traffic batch or to verify events in tests.

Shutdown Flush

cognition.close() attempts to flush all remaining events with a timeout before shutting down:

await cognition.close();        // Default: 5000ms timeout
await cognition.close(10_000);  // Custom: 10s timeout

Shutdown sequence:

  1. Mark transport as closed (no new events accepted)
  2. Clear the flush interval timer
  3. Race: flush() vs. setTimeout(timeoutMs)
  4. Shut down the HTTP transport

Always call cognition.close() in SIGTERM and SIGINT handlers to ensure no buffered events are lost on process exit.


HTTP Transport

Authentication Headers

Every request is authenticated automatically from your SDK configuration:

HeaderValueDescription
x-api-keyConfig apiKeyAuthentication
x-project-idConfig projectIdProject identification
content-typeapplication/jsonAlways JSON
content-encodinggzipOnly when payload > 1KB

Compression

Payloads larger than 1024 bytes are automatically gzipped using node:zlib gzipSync. The content-encoding: gzip header is added when compression is applied.

Compression can be disabled:

{ transport: { compression: false } }

Retry Strategy

AttemptDelayBehavior
1st0msImmediate
2nd1,000msExponential backoff
3rd2,000msExponential backoff
4th4,000msExponential backoff
Max delay30,000msCapped

Retry behavior by status:

StatusRetried?Action
200–299NoSuccess
429NoRate limited — honor Retry-After header
400–499 (not 429)NoClient error — drop permanently
500–599YesServer error — retry with backoff
Network errorYesConnection failure — retry with backoff
TimeoutYesAbortController timeout — retry with backoff

Timeout

Each request has an AbortController timeout (default 10 seconds):

{ transport: { timeoutMs: 5000 } } // 5 second timeout

Wire Format

Events are sent as a JSON EventEnvelope:

{
  "sdk": {
    "name": "@skytells/cognition",
    "version": "1.0.0"
  },
  "projectId": "my-api-service",
  "events": [
    {
      "type": "error",
      "timestamp": 1719849600000,
      "error": {
        "name": "TypeError",
        "message": "Cannot read properties of undefined (reading 'id')",
        "frames": [
          {
            "function": "getUser",
            "filename": "src/handlers/auth.ts",
            "lineno": 42,
            "colno": 15,
            "in_app": true
          }
        ]
      },
      "level": "error",
      "breadcrumbs": [
        {
          "timestamp": 1719849599500,
          "category": "http",
          "message": "GET /api/user/123 → started",
          "level": "info"
        }
      ],
      "tags": { "service": "auth" },
      "extra": {},
      "environment": "production",
      "release": "2.1.0",
      "serverName": "api-pod-3a7f",
      "runtime": { "name": "node", "version": "v20.11.0" },
      "os": { "name": "linux", "version": "6.1.0" }
    },
    {
      "type": "runtime_snapshot",
      "timestamp": 1719849600100,
      "memory": {
        "rss": 104857600,
        "heapUsed": 52428800,
        "heapTotal": 67108864,
        "external": 1048576,
        "arrayBuffers": 524288
      },
      "cpu": { "user": 12.5, "system": 3.2 },
      "eventLoop": {
        "utilization": 0.35,
        "lagP50": 1.2,
        "lagP99": 8.7,
        "lagMax": 45.3
      },
      "gc": {
        "majorCount": 0,
        "minorCount": 3,
        "incrementalCount": 1,
        "totalDuration": 4.2,
        "maxPause": 2.1
      },
      "handles": { "active": 15, "requests": 2 }
    }
  ],
  "sentAt": 1719849600200
}

Transport Result

Every send() call returns a TransportResult:

interface TransportResult {
  status: 'success' | 'rate_limited' | 'error';
  statusCode?: number;
  retryAfterMs?: number; // Only for rate_limited
}

Diagnostics

// Current buffer depth
cognition.bufferSize;

// Total events dropped due to buffer overflow since init
cognition.droppedCount;

  • Configurationtransport.* options: batchSize, flushIntervalMs, maxRetries, maxBufferSize, compression
  • Error CapturebeforeSend hook for event filtering and PII scrubbing
  • Examples — Microservice graceful shutdown, health check endpoint

How is this guide?

On this page