Streaming data
Realtime subscriptions on the same engine.
Use simple stream iteration or full lifecycle subscriptions with diagnostics, slow-consumer policy, and Arrow-native batches.
xbbg provides two interfaces for real-time Bloomberg data: stream() / astream() for simple iteration, and subscribe() / asubscribe() for full lifecycle control. All four accept the same parameters; the a-prefixed variants are the canonical async implementations — the sync variants run them in a background thread.
Choosing an Interface
| Use case | Function |
|---|---|
| Simple iteration, no dynamic changes | stream() (sync) or astream() (async) |
| Dynamic add/remove, health inspection | subscribe() (sync) or asubscribe() (async) |
| Running inside an existing async event loop | astream() or asubscribe() |
| Script with no event loop | stream() or subscribe() |
Every subscription is isolated — each subscribe() / asubscribe() call gets its own Bloomberg session from the pool, so topic streams cannot cross-contaminate one another.
Basic Streaming
stream() and astream() are context-manager generators. Break out of the loop to stop the subscription cleanly.
Async
from xbbg import blp
async def main():
async for batch in blp.astream(['AAPL US Equity', 'MSFT US Equity'], ['LAST_PRICE', 'BID', 'ASK']):
# batch is a narwhals DataFrame (or pyarrow.RecordBatch if raw=True)
print(batch)Sync
from xbbg import blp
for batch in blp.stream(['AAPL US Equity', 'MSFT US Equity'], ['LAST_PRICE', 'BID', 'ASK']):
print(batch)Each batch is a narwhals DataFrame by default, containing one or more ticks that arrived together. Use raw=True to receive raw pyarrow.RecordBatch objects for maximum throughput.
async for batch in blp.astream(['AAPL US Equity'], ['LAST_PRICE'], raw=True):
# batch: pyarrow.RecordBatch
df = batch.to_pandas()INFO
stream() runs the async implementation in a background thread. Use astream() inside an existing event loop (FastAPI, Jupyter, etc.) to avoid thread overhead.
Full Subscription Lifecycle
asubscribe() returns a Subscription handle that you iterate separately. This gives you dynamic add/remove and access to health metadata.
Create and iterate
from xbbg import blp
async def main():
sub = await blp.asubscribe(
['AAPL US Equity', 'MSFT US Equity'],
['LAST_PRICE', 'BID', 'ASK'],
)
async for batch in sub:
print(batch)Context manager
Use async with to guarantee cleanup on exit or exception:
async with await blp.asubscribe(['AAPL US Equity'], ['LAST_PRICE']) as sub:
count = 0
async for batch in sub:
print(batch)
count += 1
if count >= 100:
breakAdd tickers dynamically
Tickers can be added while the subscription is active. The new topics are subscribed on the same Bloomberg session:
sub = await blp.asubscribe(['AAPL US Equity'], ['LAST_PRICE', 'BID', 'ASK'])
async for batch in sub:
print(batch)
if should_add_more:
await sub.add(['MSFT US Equity', 'GOOG US Equity'])
should_add_more = FalseRemove tickers
await sub.remove(['AAPL US Equity'])Removed tickers stop delivering updates immediately. The remaining tickers continue unaffected.
Unsubscribe
# Clean close — discard buffered data
await sub.unsubscribe()
# Drain — return any remaining buffered batches before closing
remaining = await sub.unsubscribe(drain=True)Subscription properties
| Property | Type | Description |
|---|---|---|
sub.tickers | list[str] | Currently subscribed tickers |
sub.fields | list[str] | Subscribed fields |
sub.is_active | bool | Whether the subscription is open |
sub.all_failed | bool | True if every ticker has failed or terminated |
Subscription Configuration
Per-subscription tuning is done via keyword arguments on asubscribe() and astream().
all_fields
By default, Bloomberg delivers only the fields you requested. Setting all_fields=True exposes all top-level scalar fields Bloomberg sends (including INITPAINT summary fields for the initial snapshot):
# Receive INITPAINT fields and any extra scalar fields Bloomberg sends
sub = await blp.asubscribe(
['XBTUSD Curncy'],
['LAST_PRICE', 'BID', 'ASK'],
all_fields=True,
)flush_threshold, stream_capacity, overflow_policy
These control the backpressure pipeline between the Bloomberg SDK event thread and your Python consumer.
from xbbg import blp, configure
# Global defaults (applied to all new subscriptions)
configure(
subscription_flush_threshold=10, # buffer up to 10 ticks before flushing
subscription_stream_capacity=512, # internal channel depth
overflow_policy='drop_newest', # what to do when the channel is full
)You can also pass them per-subscription to asubscribe() and astream():
sub = await blp.asubscribe(
['AAPL US Equity'],
['LAST_PRICE', 'BID', 'ASK'],
flush_threshold=5,
stream_capacity=1024,
overflow_policy='block', # block the Rust thread until Python catches up
)| Parameter | Default | Description |
|---|---|---|
flush_threshold | 1 | Ticks buffered before flushing to Python. Increase for throughput, decrease for latency. |
stream_capacity | 256 | Backpressure buffer size (number of batches). |
overflow_policy | 'drop_newest' | Slow-consumer policy: 'drop_newest' or 'block'. |
WARNING
overflow_policy='block' prevents data loss but will stall the Bloomberg SDK event thread if your consumer is too slow. Use it only when every tick is critical and your processing loop is fast enough to keep up.
Specialized Streams
xbbg provides dedicated functions for Bloomberg's specialized subscription services. All return a Subscription handle and follow the same async iteration pattern. Sync wrappers (vwap, mktbar, depth, chains) are also available.
VWAP — avwap() / vwap()
Streaming Volume Weighted Average Price via //blp/mktvwap.
# Basic — default fields: RT_PX_VWAP, RT_VWAP_VOLUME
sub = await blp.avwap(['AAPL US Equity'])
async for batch in sub:
print(batch)
await sub.unsubscribe()# Custom time window
sub = await blp.avwap(
['AAPL US Equity', 'MSFT US Equity'],
start_time='09:30',
end_time='16:00',
)# Request additional VWAP fields
sub = await blp.avwap(
'AAPL US Equity',
['RT_PX_VWAP', 'RT_VWAP_VOLUME', 'RT_VWAP_TURNOVER'],
)| Parameter | Default | Description |
|---|---|---|
tickers | required | One or more securities |
fields | ['RT_PX_VWAP', 'RT_VWAP_VOLUME'] | Fields to subscribe to |
start_time | None | VWAP calculation start (e.g., '09:30') |
end_time | None | VWAP calculation end (e.g., '16:00') |
Market Bars — amktbar() / mktbar()
Streaming real-time OHLCV bars via //blp/mktbar. Like bdib() but live as bars form.
Default fields: OPEN, HIGH, LOW, CLOSE, VOLUME, NUM_TRADES.
# 1-minute bars (default interval)
sub = await blp.amktbar('AAPL US Equity')
async for batch in sub:
print(batch)# 5-minute bars with a session window
async with await blp.amktbar(
['AAPL US Equity', 'MSFT US Equity'],
interval=5,
start_time='09:30',
end_time='16:00',
) as sub:
async for batch in sub:
print(batch)| Parameter | Default | Description |
|---|---|---|
tickers | required | One or more securities |
interval | 1 | Bar interval in minutes |
start_time | None | Session start in HH:MM format |
end_time | None | Session end in HH:MM format |
Market Depth — adepth() / depth()
WARNING
Requires a Bloomberg B-PIPE license. Not available with standard Terminal connections. Raises BlpBPipeError if the service is unavailable.
Streaming Level 2 order book data via //blp/mktdepth. Fields are implicit (provided by Bloomberg).
from xbbg.exceptions import BlpBPipeError
from xbbg import blp
try:
async with await blp.adepth('AAPL US Equity') as sub:
async for batch in sub:
print(batch) # Bid/ask levels and sizes
except BlpBPipeError as e:
print(f"B-PIPE required: {e}")Option and Futures Chains — achains() / chains()
WARNING
Requires a Bloomberg B-PIPE license. Not available with standard Terminal connections. Raises BlpBPipeError if the service is unavailable.
Streaming option or futures chain updates via //blp/mktlist.
# Option chain (default)
async with await blp.achains('AAPL US Equity') as sub:
async for batch in sub:
print(batch)# Futures chain
sub = await blp.achains('ES1 Index', chain_type='FUTURES')
async for batch in sub:
print(batch)
await sub.unsubscribe()| Parameter | Default | Description |
|---|---|---|
underlying | required | Underlying security identifier |
chain_type | 'OPTIONS' | 'OPTIONS' or 'FUTURES' |
Health and Observability
The Subscription object exposes several properties for runtime inspection. None of these require pausing iteration — they are safe to read at any point.
status
A combined snapshot of the subscription's operational state:
print(sub.status)
# {
# 'active': True,
# 'all_failed': False,
# 'tickers': ['AAPL US Equity', 'MSFT US Equity'],
# 'failed_tickers': [],
# 'topic_states': {
# 'AAPL US Equity': {'state': 'SUBSCRIBED', 'last_change_us': 1711000000000},
# 'MSFT US Equity': {'state': 'SUBSCRIBED', 'last_change_us': 1711000000001},
# },
# 'session': {...},
# 'admin': {...},
# 'services': {...},
# }topic_states
Per-ticker lifecycle state keyed by ticker string. Useful for confirming that tickers are in SUBSCRIBED state before processing data:
for ticker, state_info in sub.topic_states.items():
print(f"{ticker}: {state_info['state']}")failed_tickers
List of tickers Bloomberg rejected or terminated. In a mixed-subscription scenario (some tickers valid, some invalid), this will be non-empty while is_active remains True as long as at least one ticker is healthy:
if sub.failed_tickers:
print("Failed:", sub.failed_tickers)failures
Detailed failure records, each containing ticker, reason, and kind ("failure" or "terminated"):
for f in sub.failures:
print(f"{f['ticker']}: {f['reason']} ({f['kind']})")events
Bounded lifecycle event history for the subscription. Each entry contains at_us (microsecond timestamp), category, level, message_type, topic, and detail:
for event in sub.events:
print(f"[{event['level']}] {event['message_type']}: {event['detail']}")stats
Subscription throughput metrics:
print(sub.stats)
# {
# 'messages_received': 4821,
# 'dropped_batches': 0,
# 'batches_sent': 4821,
# 'slow_consumer': False,
# 'data_loss_events': 0,
# 'last_message_us': 1711000123456,
# 'effective_overflow_policy': 'drop_newest',
# }| Key | Description |
|---|---|
messages_received | Total messages received from Bloomberg |
dropped_batches | Batches dropped due to overflow |
batches_sent | Batches successfully delivered to Python |
slow_consumer | True if Bloomberg signalled DATALOSS |
data_loss_events | Total Bloomberg data-loss signals observed |
effective_overflow_policy | Runtime policy used by the Rust stream |
Error Handling
Topic failure isolation
Bloomberg's subscription model isolates failures at the topic level. If one ticker is invalid or gets terminated, the other tickers in the same subscription continue delivering data. Your loop keeps running; only failed_tickers grows.
sub = await blp.asubscribe(
['AAPL US Equity', 'INVALID_TICKER'],
['LAST_PRICE'],
)
async for batch in sub:
# AAPL ticks still arrive; INVALID_TICKER never produces data
print(batch)
print("Failed so far:", sub.failed_tickers)Stopping when all tickers fail
Use all_failed to detect the total-failure case and exit cleanly:
sub = await blp.asubscribe(['AAPL US Equity', 'MSFT US Equity'], ['LAST_PRICE'])
async for batch in sub:
process(batch)
if sub.all_failed:
print("All tickers failed:", sub.failed_tickers)
break
await sub.unsubscribe()Handling partial failures at startup
Check topic_states after a short wait to confirm which tickers successfully subscribed before committing downstream resources:
import asyncio
sub = await blp.asubscribe(
['AAPL US Equity', 'INVALID_TICKER', 'MSFT US Equity'],
['LAST_PRICE', 'BID', 'ASK'],
)
# Brief wait for Bloomberg's INITPAINT / subscription status events
await asyncio.sleep(2)
active = [t for t, s in sub.topic_states.items() if s['state'] == 'SUBSCRIBED']
failed = sub.failed_tickers
print(f"Active: {active}")
print(f"Failed: {failed}")
if not active:
await sub.unsubscribe()
raise RuntimeError("No tickers subscribed successfully")
async for batch in sub:
process(batch)Connection interruption
Each subscription session is isolated. If the session drops and reconnects, Bloomberg will resend INITPAINT (initial snapshot) events. The events property records session lifecycle transitions you can inspect to detect reconnect cycles.
See Also
- API Reference —
subscribe/asubscribe - API Reference —
stream/astream - Engine Configuration for
subscription_pool_size, global flush/overflow defaults
