Realtime Subscriptions
Subscribe to database changes using PostgreSQL LISTEN/NOTIFY.
Overview
Postbase SDK provides realtime subscriptions using PostgreSQL's native LISTEN/NOTIFY mechanism:
// Subscribe to changes
db.channel('users')
.on('INSERT', (payload) => {
console.log('New user:', payload.new)
})
.on('UPDATE', (payload) => {
console.log('User updated:', payload.new)
})
.on('DELETE', (payload) => {
console.log('User deleted:', payload.old)
})
.subscribe()Setup
1. Create Triggers
Set up database triggers to send notifications:
postbase types realtime-triggers --database myapp --tables users,postsOr create manually:
-- Function to notify changes
CREATE OR REPLACE FUNCTION notify_changes()
RETURNS TRIGGER AS $
DECLARE
payload JSON;
BEGIN
IF TG_OP = 'INSERT' THEN
payload := json_build_object(
'operation', 'INSERT',
'table', TG_TABLE_NAME,
'schema', TG_TABLE_SCHEMA,
'new', row_to_json(NEW)
);
ELSIF TG_OP = 'UPDATE' THEN
payload := json_build_object(
'operation', 'UPDATE',
'table', TG_TABLE_NAME,
'schema', TG_TABLE_SCHEMA,
'old', row_to_json(OLD),
'new', row_to_json(NEW)
);
ELSIF TG_OP = 'DELETE' THEN
payload := json_build_object(
'operation', 'DELETE',
'table', TG_TABLE_NAME,
'schema', TG_TABLE_SCHEMA,
'old', row_to_json(OLD)
);
END IF;
PERFORM pg_notify(TG_TABLE_NAME, payload::text);
RETURN NEW;
END;
$ LANGUAGE plpgsql;
-- Trigger for users table
CREATE TRIGGER users_changes
AFTER INSERT OR UPDATE OR DELETE ON users
FOR EACH ROW EXECUTE FUNCTION notify_changes();2. Subscribe in Your App
import { createClient } from '@postbase/sdk'
const db = createClient({
connectionString: process.env.DATABASE_URL,
})
// Subscribe to users table
db.channel('users')
.on('INSERT', (payload) => {
console.log('New user:', payload.new)
})
.subscribe()Channel API
Creating a Channel
const channel = db.channel('channel_name')The channel name corresponds to the PostgreSQL NOTIFY channel.
Event Handlers
channel
// Insert events
.on('INSERT', (payload) => {
console.log('Inserted:', payload.new)
})
// Update events
.on('UPDATE', (payload) => {
console.log('Updated from:', payload.old, 'to:', payload.new)
})
// Delete events
.on('DELETE', (payload) => {
console.log('Deleted:', payload.old)
})
// All events
.on('*', (payload) => {
console.log('Change:', payload.operation, payload)
})Payload Structure
interface RealtimePayload<T> {
operation: 'INSERT' | 'UPDATE' | 'DELETE'
table: string
schema: string
old?: T // For UPDATE and DELETE
new?: T // For INSERT and UPDATE
}Starting Subscription
// Start listening
channel.subscribe()Stopping Subscription
// Stop listening
channel.unsubscribe()Multiple Tables
Subscribe to multiple tables:
// Users channel
db.channel('users')
.on('INSERT', handleNewUser)
.subscribe()
// Posts channel
db.channel('posts')
.on('*', handlePostChange)
.subscribe()
// Comments channel
db.channel('comments')
.on('INSERT', handleNewComment)
.subscribe()Filtered Subscriptions
Filter events in your handlers:
db.channel('orders')
.on('INSERT', (payload) => {
// Only process high-value orders
if (payload.new.total > 1000) {
notifyManager(payload.new)
}
})
.on('UPDATE', (payload) => {
// Only process status changes
if (payload.old.status !== payload.new.status) {
handleStatusChange(payload.old.status, payload.new.status)
}
})
.subscribe()Automatic Reconnection
The SDK handles connection drops automatically:
const channel = db.channel('users')
.on('INSERT', handleInsert)
.on('error', (error) => {
console.error('Connection error:', error)
})
.on('reconnect', () => {
console.log('Reconnected to database')
})
.subscribe()React Integration
Custom Hook
// hooks/useRealtimeTable.ts
import { useEffect, useState } from 'react'
import { db } from '../db/client'
export function useRealtimeTable<T>(tableName: string, initialData: T[] = []) {
const [data, setData] = useState<T[]>(initialData)
useEffect(() => {
const channel = db.channel(tableName)
.on('INSERT', (payload) => {
setData(prev => [...prev, payload.new as T])
})
.on('UPDATE', (payload) => {
setData(prev => prev.map(item =>
(item as any).id === (payload.new as any).id
? payload.new as T
: item
))
})
.on('DELETE', (payload) => {
setData(prev => prev.filter(item =>
(item as any).id !== (payload.old as any).id
))
})
.subscribe()
return () => {
channel.unsubscribe()
}
}, [tableName])
return data
}Using the Hook
function UserList() {
const [users, setUsers] = useState<User[]>([])
// Fetch initial data
useEffect(() => {
db.from('users').execute().then(setUsers)
}, [])
// Subscribe to changes
const realtimeUsers = useRealtimeTable<User>('users', users)
return (
<ul>
{realtimeUsers.map(user => (
<li key={user.id}>{user.name}</li>
))}
</ul>
)
}Express.js Integration
Stream changes to clients via Server-Sent Events:
import express from 'express'
import { db } from './db/client'
const app = express()
app.get('/events/users', (req, res) => {
res.setHeader('Content-Type', 'text/event-stream')
res.setHeader('Cache-Control', 'no-cache')
res.setHeader('Connection', 'keep-alive')
const channel = db.channel('users')
.on('*', (payload) => {
res.write(`data: ${JSON.stringify(payload)}\n\n`)
})
.subscribe()
req.on('close', () => {
channel.unsubscribe()
})
})Best Practices
1. Limit Trigger Columns
Only notify on relevant column changes:
CREATE TRIGGER users_email_change
AFTER UPDATE OF email ON users
FOR EACH ROW EXECUTE FUNCTION notify_changes();2. Batch Updates
For high-throughput tables, batch notifications:
-- Debounced trigger (pseudocode)
CREATE OR REPLACE FUNCTION batch_notify()
RETURNS TRIGGER AS $
BEGIN
-- Store change in queue table
INSERT INTO _change_queue (table_name, change_data)
VALUES (TG_TABLE_NAME, row_to_json(NEW));
RETURN NEW;
END;
$ LANGUAGE plpgsql;
-- Process queue every second with pg_cron
SELECT cron.schedule(
'process_changes',
'* * * * *', -- Every minute
$SELECT process_and_notify_changes()$
);3. Clean Up Subscriptions
Always unsubscribe when done:
// In React
useEffect(() => {
const channel = db.channel('users').on('*', handler).subscribe()
return () => channel.unsubscribe()
}, [])4. Handle Reconnection
Monitor connection status:
db.channel('users')
.on('reconnect', () => {
// Refresh data to catch missed events
fetchLatestData()
})
.subscribe()Troubleshooting
No Events Received
- Check trigger exists:
\d usersin psql - Test NOTIFY:
NOTIFY users, '{"test": true}' - Check LISTEN:
LISTEN usersin another session
Missing Events
Events may be missed during reconnection. Always fetch latest data on reconnect.
High Latency
- Check database connection pooling
- Consider batching high-frequency tables
- Use filtered triggers to reduce volume