Skip to main content

Subscriptions Overview

The GraphQL API supports real-time subscriptions for live data updates using cursor-based streaming.

Basic Subscription Pattern​

subscription WatchAtoms(
$cursor: [atoms_stream_cursor_input]!
$batchSize: Int!
) {
atoms_stream(
cursor: $cursor
batch_size: $batchSize
) {
term_id
label
created_at
}
}

Variables​

{
"cursor": [{
"initial_value": { "created_at": "2024-01-01T00:00:00Z" },
"ordering": "ASC"
}],
"batchSize": 10
}

Cursor Configuration​

  • initial_value: Starting point for the stream
  • ordering: Sort direction (ASC or DESC)
  • batch_size: Number of items per batch

When to Use Subscriptions​

Use subscriptions when:

  • Building real-time dashboards
  • Monitoring live protocol activity
  • Creating notification systems
  • Data changes frequently

Use polling when:

  • Data updates infrequently
  • Real-time updates aren't critical
  • Minimizing server connections

Available Streaming Subscriptions​

The API provides streaming subscriptions for all major entities:

Core Entity Streams​

SubscriptionDescription
atoms_streamNew and updated atoms
triples_streamNew and updated triples
accounts_streamAccount updates
positions_streamPosition changes
vaults_streamVault updates

Activity Streams​

SubscriptionDescription
signals_streamReal-time deposits and redemptions
events_streamRaw blockchain events
deposits_streamDeposit events
redemptions_streamRedemption events

Position Tracking Streams​

SubscriptionDescription
positions_with_value_streamPosition updates with computed PnL/value
position_changes_streamIndividual position change events

Price & Stats Streams​

SubscriptionDescription
share_price_changes_streamShare price updates
chainlink_prices_streamOracle price updates
fee_transfers_streamProtocol fee transfers
protocol_fee_accruals_streamProtocol fee accrual events
stats_streamProtocol statistics updates

Time-Series Streams​

SubscriptionDescription
share_price_change_stats_hourly_streamHourly price stat updates
share_price_change_stats_daily_streamDaily price stat updates
signal_stats_hourly_streamHourly signal stat updates
signal_stats_daily_streamDaily signal stat updates
term_total_state_changes_streamTerm state change updates

Graph Structure Streams​

SubscriptionDescription
terms_streamTerm entity updates
triple_terms_streamTriple-term relationship updates
triple_vaults_streamTriple vault data updates
subject_predicates_streamSubject-predicate pair updates

Leaderboard Streams​

SubscriptionDescription
pnl_leaderboard_entry_streamPnL leaderboard updates
pnl_leaderboard_stats_streamLeaderboard aggregate stats updates
account_pnl_rank_streamAccount rank changes

Social Streams​

SubscriptionDescription
following_streamFollowing relationship changes

WebSocket Connection​

Connect using a WebSocket client:

import { createClient } from 'graphql-ws'

const client = createClient({
url: 'wss://mainnet.intuition.sh/v1/graphql',
})

// Subscribe to signals
const unsubscribe = client.subscribe(
{
query: `
subscription WatchSignals($cursor: [signals_stream_cursor_input]!) {
signals_stream(cursor: $cursor, batch_size: 10) {
id
signal_type
delta
account {
label
}
atom {
label
}
block_timestamp
}
}
`,
variables: {
cursor: [{
initial_value: { block_timestamp: new Date().toISOString() },
ordering: 'ASC'
}]
}
},
{
next: (data) => {
console.log('New signal:', data)
},
error: (error) => {
console.error('Subscription error:', error)
},
complete: () => {
console.log('Subscription complete')
}
}
)

Example: Live Activity Feed​

import { createClient } from 'graphql-ws'

const client = createClient({
url: 'wss://mainnet.intuition.sh/v1/graphql',
})

function createActivityFeed(onSignal: (signal: Signal) => void) {
return client.subscribe(
{
query: `
subscription LiveActivityFeed($cursor: [signals_stream_cursor_input]!) {
signals_stream(cursor: $cursor, batch_size: 5) {
id
signal_type
delta
account {
label
image
}
atom {
label
image
}
triple {
subject { label }
predicate { label }
object { label }
}
block_timestamp
}
}
`,
variables: {
cursor: [{
initial_value: { block_timestamp: new Date().toISOString() },
ordering: 'ASC'
}]
}
},
{
next: (result) => {
const signals = result.data?.signals_stream || []
signals.forEach(onSignal)
},
error: console.error
}
)
}

Example: Position Price Alerts​

function watchPositionPrices(
vaultId: string,
onPriceChange: (price: string) => void
) {
return client.subscribe(
{
query: `
subscription WatchVaultPrice(
$vault_id: String!
$cursor: [share_price_changes_stream_cursor_input]!
) {
share_price_changes_stream(
cursor: $cursor
batch_size: 1
where: { vault_id: { _eq: $vault_id } }
) {
new_share_price
block_timestamp
}
}
`,
variables: {
vault_id: vaultId,
cursor: [{
initial_value: { block_timestamp: new Date().toISOString() },
ordering: 'ASC'
}]
}
},
{
next: (result) => {
const changes = result.data?.share_price_changes_stream || []
changes.forEach(change => onPriceChange(change.new_share_price))
}
}
)
}

Best Practices​

  1. Store last cursor to resume after disconnection
  2. Use batch_size to control data flow (10-50)
  3. Filter subscriptions with where clauses
  4. Handle reconnections gracefully
  5. Implement exponential backoff for reconnection attempts
  6. Clean up subscriptions when components unmount