Subscriptions Overview
The GraphQL API supports real-time subscriptions for live data updates using cursor-based streaming.
Basic Subscription Patternβ
subscription WatchAtoms(
$cursor: [atoms_stream_cursor_input]!
$batchSize: Int!
) {
atoms_stream(
cursor: $cursor
batch_size: $batchSize
) {
term_id
label
created_at
}
}
Variablesβ
{
"cursor": [{
"initial_value": { "created_at": "2024-01-01T00:00:00Z" },
"ordering": "ASC"
}],
"batchSize": 10
}
Cursor Configurationβ
- initial_value: Starting point for the stream
- ordering: Sort direction (ASC or DESC)
- batch_size: Number of items per batch
When to Use Subscriptionsβ
Use subscriptions when:
- Building real-time dashboards
- Monitoring live protocol activity
- Creating notification systems
- Data changes frequently
Use polling when:
- Data updates infrequently
- Real-time updates aren't critical
- Minimizing server connections
Available Streaming Subscriptionsβ
The API provides streaming subscriptions for all major entities:
Core Entity Streamsβ
| Subscription | Description |
|---|---|
atoms_stream | New and updated atoms |
triples_stream | New and updated triples |
accounts_stream | Account updates |
positions_stream | Position changes |
vaults_stream | Vault updates |
Activity Streamsβ
| Subscription | Description |
|---|---|
signals_stream | Real-time deposits and redemptions |
events_stream | Raw blockchain events |
deposits_stream | Deposit events |
redemptions_stream | Redemption events |
Position Tracking Streamsβ
| Subscription | Description |
|---|---|
positions_with_value_stream | Position updates with computed PnL/value |
position_changes_stream | Individual position change events |
Price & Stats Streamsβ
| Subscription | Description |
|---|---|
share_price_changes_stream | Share price updates |
chainlink_prices_stream | Oracle price updates |
fee_transfers_stream | Protocol fee transfers |
protocol_fee_accruals_stream | Protocol fee accrual events |
stats_stream | Protocol statistics updates |
Time-Series Streamsβ
| Subscription | Description |
|---|---|
share_price_change_stats_hourly_stream | Hourly price stat updates |
share_price_change_stats_daily_stream | Daily price stat updates |
signal_stats_hourly_stream | Hourly signal stat updates |
signal_stats_daily_stream | Daily signal stat updates |
term_total_state_changes_stream | Term state change updates |
Graph Structure Streamsβ
| Subscription | Description |
|---|---|
terms_stream | Term entity updates |
triple_terms_stream | Triple-term relationship updates |
triple_vaults_stream | Triple vault data updates |
subject_predicates_stream | Subject-predicate pair updates |
Leaderboard Streamsβ
| Subscription | Description |
|---|---|
pnl_leaderboard_entry_stream | PnL leaderboard updates |
pnl_leaderboard_stats_stream | Leaderboard aggregate stats updates |
account_pnl_rank_stream | Account rank changes |
Social Streamsβ
| Subscription | Description |
|---|---|
following_stream | Following relationship changes |
WebSocket Connectionβ
Connect using a WebSocket client:
import { createClient } from 'graphql-ws'
const client = createClient({
url: 'wss://mainnet.intuition.sh/v1/graphql',
})
// Subscribe to signals
const unsubscribe = client.subscribe(
{
query: `
subscription WatchSignals($cursor: [signals_stream_cursor_input]!) {
signals_stream(cursor: $cursor, batch_size: 10) {
id
signal_type
delta
account {
label
}
atom {
label
}
block_timestamp
}
}
`,
variables: {
cursor: [{
initial_value: { block_timestamp: new Date().toISOString() },
ordering: 'ASC'
}]
}
},
{
next: (data) => {
console.log('New signal:', data)
},
error: (error) => {
console.error('Subscription error:', error)
},
complete: () => {
console.log('Subscription complete')
}
}
)
Example: Live Activity Feedβ
import { createClient } from 'graphql-ws'
const client = createClient({
url: 'wss://mainnet.intuition.sh/v1/graphql',
})
function createActivityFeed(onSignal: (signal: Signal) => void) {
return client.subscribe(
{
query: `
subscription LiveActivityFeed($cursor: [signals_stream_cursor_input]!) {
signals_stream(cursor: $cursor, batch_size: 5) {
id
signal_type
delta
account {
label
image
}
atom {
label
image
}
triple {
subject { label }
predicate { label }
object { label }
}
block_timestamp
}
}
`,
variables: {
cursor: [{
initial_value: { block_timestamp: new Date().toISOString() },
ordering: 'ASC'
}]
}
},
{
next: (result) => {
const signals = result.data?.signals_stream || []
signals.forEach(onSignal)
},
error: console.error
}
)
}
Example: Position Price Alertsβ
function watchPositionPrices(
vaultId: string,
onPriceChange: (price: string) => void
) {
return client.subscribe(
{
query: `
subscription WatchVaultPrice(
$vault_id: String!
$cursor: [share_price_changes_stream_cursor_input]!
) {
share_price_changes_stream(
cursor: $cursor
batch_size: 1
where: { vault_id: { _eq: $vault_id } }
) {
new_share_price
block_timestamp
}
}
`,
variables: {
vault_id: vaultId,
cursor: [{
initial_value: { block_timestamp: new Date().toISOString() },
ordering: 'ASC'
}]
}
},
{
next: (result) => {
const changes = result.data?.share_price_changes_stream || []
changes.forEach(change => onPriceChange(change.new_share_price))
}
}
)
}
Best Practicesβ
- Store last cursor to resume after disconnection
- Use batch_size to control data flow (10-50)
- Filter subscriptions with where clauses
- Handle reconnections gracefully
- Implement exponential backoff for reconnection attempts
- Clean up subscriptions when components unmount
Relatedβ
- Real-time Positions - Position updates
- Price Updates - Price streaming
- Subscriptions vs Polling - When to use each