Are you an LLM? Read llms.txt for a summary of the docs, or llms-full.txt for the full context.
Skip to content

Realtime Subscriptions

Subscribe to database changes using PostgreSQL LISTEN/NOTIFY.

Overview

Postbase SDK provides realtime subscriptions using PostgreSQL's native LISTEN/NOTIFY mechanism:

// Subscribe to changes
db.channel('users')
  .on('INSERT', (payload) => {
    console.log('New user:', payload.new)
  })
  .on('UPDATE', (payload) => {
    console.log('User updated:', payload.new)
  })
  .on('DELETE', (payload) => {
    console.log('User deleted:', payload.old)
  })
  .subscribe()

Setup

1. Create Triggers

Set up database triggers to send notifications:

postbase types realtime-triggers --database myapp --tables users,posts

Or create manually:

-- Function to notify changes
CREATE OR REPLACE FUNCTION notify_changes()
RETURNS TRIGGER AS $
DECLARE
  payload JSON;
BEGIN
  IF TG_OP = 'INSERT' THEN
    payload := json_build_object(
      'operation', 'INSERT',
      'table', TG_TABLE_NAME,
      'schema', TG_TABLE_SCHEMA,
      'new', row_to_json(NEW)
    );
  ELSIF TG_OP = 'UPDATE' THEN
    payload := json_build_object(
      'operation', 'UPDATE',
      'table', TG_TABLE_NAME,
      'schema', TG_TABLE_SCHEMA,
      'old', row_to_json(OLD),
      'new', row_to_json(NEW)
    );
  ELSIF TG_OP = 'DELETE' THEN
    payload := json_build_object(
      'operation', 'DELETE',
      'table', TG_TABLE_NAME,
      'schema', TG_TABLE_SCHEMA,
      'old', row_to_json(OLD)
    );
  END IF;
 
  PERFORM pg_notify(TG_TABLE_NAME, payload::text);
  RETURN NEW;
END;
$ LANGUAGE plpgsql;
 
-- Trigger for users table
CREATE TRIGGER users_changes
AFTER INSERT OR UPDATE OR DELETE ON users
FOR EACH ROW EXECUTE FUNCTION notify_changes();

2. Subscribe in Your App

import { createClient } from '@postbase/sdk'
 
const db = createClient({
  connectionString: process.env.DATABASE_URL,
})
 
// Subscribe to users table
db.channel('users')
  .on('INSERT', (payload) => {
    console.log('New user:', payload.new)
  })
  .subscribe()

Channel API

Creating a Channel

const channel = db.channel('channel_name')

The channel name corresponds to the PostgreSQL NOTIFY channel.

Event Handlers

channel
  // Insert events
  .on('INSERT', (payload) => {
    console.log('Inserted:', payload.new)
  })
  // Update events
  .on('UPDATE', (payload) => {
    console.log('Updated from:', payload.old, 'to:', payload.new)
  })
  // Delete events
  .on('DELETE', (payload) => {
    console.log('Deleted:', payload.old)
  })
  // All events
  .on('*', (payload) => {
    console.log('Change:', payload.operation, payload)
  })

Payload Structure

interface RealtimePayload<T> {
  operation: 'INSERT' | 'UPDATE' | 'DELETE'
  table: string
  schema: string
  old?: T  // For UPDATE and DELETE
  new?: T  // For INSERT and UPDATE
}

Starting Subscription

// Start listening
channel.subscribe()

Stopping Subscription

// Stop listening
channel.unsubscribe()

Multiple Tables

Subscribe to multiple tables:

// Users channel
db.channel('users')
  .on('INSERT', handleNewUser)
  .subscribe()
 
// Posts channel
db.channel('posts')
  .on('*', handlePostChange)
  .subscribe()
 
// Comments channel
db.channel('comments')
  .on('INSERT', handleNewComment)
  .subscribe()

Filtered Subscriptions

Filter events in your handlers:

db.channel('orders')
  .on('INSERT', (payload) => {
    // Only process high-value orders
    if (payload.new.total > 1000) {
      notifyManager(payload.new)
    }
  })
  .on('UPDATE', (payload) => {
    // Only process status changes
    if (payload.old.status !== payload.new.status) {
      handleStatusChange(payload.old.status, payload.new.status)
    }
  })
  .subscribe()

Automatic Reconnection

The SDK handles connection drops automatically:

const channel = db.channel('users')
  .on('INSERT', handleInsert)
  .on('error', (error) => {
    console.error('Connection error:', error)
  })
  .on('reconnect', () => {
    console.log('Reconnected to database')
  })
  .subscribe()

React Integration

Custom Hook

// hooks/useRealtimeTable.ts
import { useEffect, useState } from 'react'
import { db } from '../db/client'
 
export function useRealtimeTable<T>(tableName: string, initialData: T[] = []) {
  const [data, setData] = useState<T[]>(initialData)
 
  useEffect(() => {
    const channel = db.channel(tableName)
      .on('INSERT', (payload) => {
        setData(prev => [...prev, payload.new as T])
      })
      .on('UPDATE', (payload) => {
        setData(prev => prev.map(item => 
          (item as any).id === (payload.new as any).id 
            ? payload.new as T 
            : item
        ))
      })
      .on('DELETE', (payload) => {
        setData(prev => prev.filter(item => 
          (item as any).id !== (payload.old as any).id
        ))
      })
      .subscribe()
 
    return () => {
      channel.unsubscribe()
    }
  }, [tableName])
 
  return data
}

Using the Hook

function UserList() {
  const [users, setUsers] = useState<User[]>([])
  
  // Fetch initial data
  useEffect(() => {
    db.from('users').execute().then(setUsers)
  }, [])
 
  // Subscribe to changes
  const realtimeUsers = useRealtimeTable<User>('users', users)
 
  return (
    <ul>
      {realtimeUsers.map(user => (
        <li key={user.id}>{user.name}</li>
      ))}
    </ul>
  )
}

Express.js Integration

Stream changes to clients via Server-Sent Events:

import express from 'express'
import { db } from './db/client'
 
const app = express()
 
app.get('/events/users', (req, res) => {
  res.setHeader('Content-Type', 'text/event-stream')
  res.setHeader('Cache-Control', 'no-cache')
  res.setHeader('Connection', 'keep-alive')
 
  const channel = db.channel('users')
    .on('*', (payload) => {
      res.write(`data: ${JSON.stringify(payload)}\n\n`)
    })
    .subscribe()
 
  req.on('close', () => {
    channel.unsubscribe()
  })
})

Best Practices

1. Limit Trigger Columns

Only notify on relevant column changes:

CREATE TRIGGER users_email_change
AFTER UPDATE OF email ON users
FOR EACH ROW EXECUTE FUNCTION notify_changes();

2. Batch Updates

For high-throughput tables, batch notifications:

-- Debounced trigger (pseudocode)
CREATE OR REPLACE FUNCTION batch_notify()
RETURNS TRIGGER AS $
BEGIN
  -- Store change in queue table
  INSERT INTO _change_queue (table_name, change_data)
  VALUES (TG_TABLE_NAME, row_to_json(NEW));
  
  RETURN NEW;
END;
$ LANGUAGE plpgsql;
 
-- Process queue every second with pg_cron
SELECT cron.schedule(
  'process_changes',
  '* * * * *',  -- Every minute
  $SELECT process_and_notify_changes()$
);

3. Clean Up Subscriptions

Always unsubscribe when done:

// In React
useEffect(() => {
  const channel = db.channel('users').on('*', handler).subscribe()
  return () => channel.unsubscribe()
}, [])

4. Handle Reconnection

Monitor connection status:

db.channel('users')
  .on('reconnect', () => {
    // Refresh data to catch missed events
    fetchLatestData()
  })
  .subscribe()

Troubleshooting

No Events Received

  1. Check trigger exists: \d users in psql
  2. Test NOTIFY: NOTIFY users, '{"test": true}'
  3. Check LISTEN: LISTEN users in another session

Missing Events

Events may be missed during reconnection. Always fetch latest data on reconnect.

High Latency

  • Check database connection pooling
  • Consider batching high-frequency tables
  • Use filtered triggers to reduce volume