Skip to content

Streaming data

Realtime subscriptions on the same engine.

Use simple stream iteration or full lifecycle subscriptions with diagnostics, slow-consumer policy, and Arrow-native batches.

xbbg provides two interfaces for real-time Bloomberg data: stream() / astream() for simple iteration, and subscribe() / asubscribe() for full lifecycle control. All four accept the same parameters; the a-prefixed variants are the canonical async implementations — the sync variants run them in a background thread.

Choosing an Interface

Use caseFunction
Simple iteration, no dynamic changesstream() (sync) or astream() (async)
Dynamic add/remove, health inspectionsubscribe() (sync) or asubscribe() (async)
Running inside an existing async event loopastream() or asubscribe()
Script with no event loopstream() or subscribe()

Every subscription is isolated — each subscribe() / asubscribe() call gets its own Bloomberg session from the pool, so topic streams cannot cross-contaminate one another.


Basic Streaming

stream() and astream() are context-manager generators. Break out of the loop to stop the subscription cleanly.

Async
python
from xbbg import blp

async def main():
    async for batch in blp.astream(['AAPL US Equity', 'MSFT US Equity'], ['LAST_PRICE', 'BID', 'ASK']):
        # batch is a narwhals DataFrame (or pyarrow.RecordBatch if raw=True)
        print(batch)
Sync
python
from xbbg import blp

for batch in blp.stream(['AAPL US Equity', 'MSFT US Equity'], ['LAST_PRICE', 'BID', 'ASK']):
    print(batch)

Each batch is a narwhals DataFrame by default, containing one or more ticks that arrived together. Use raw=True to receive raw pyarrow.RecordBatch objects for maximum throughput.

python
async for batch in blp.astream(['AAPL US Equity'], ['LAST_PRICE'], raw=True):
    # batch: pyarrow.RecordBatch
    df = batch.to_pandas()

INFO

stream() runs the async implementation in a background thread. Use astream() inside an existing event loop (FastAPI, Jupyter, etc.) to avoid thread overhead.


Full Subscription Lifecycle

asubscribe() returns a Subscription handle that you iterate separately. This gives you dynamic add/remove and access to health metadata.

Create and iterate

python
from xbbg import blp

async def main():
    sub = await blp.asubscribe(
        ['AAPL US Equity', 'MSFT US Equity'],
        ['LAST_PRICE', 'BID', 'ASK'],
    )

    async for batch in sub:
        print(batch)

Context manager

Use async with to guarantee cleanup on exit or exception:

python
async with await blp.asubscribe(['AAPL US Equity'], ['LAST_PRICE']) as sub:
    count = 0
    async for batch in sub:
        print(batch)
        count += 1
        if count >= 100:
            break

Add tickers dynamically

Tickers can be added while the subscription is active. The new topics are subscribed on the same Bloomberg session:

python
sub = await blp.asubscribe(['AAPL US Equity'], ['LAST_PRICE', 'BID', 'ASK'])

async for batch in sub:
    print(batch)
    if should_add_more:
        await sub.add(['MSFT US Equity', 'GOOG US Equity'])
        should_add_more = False

Remove tickers

python
await sub.remove(['AAPL US Equity'])

Removed tickers stop delivering updates immediately. The remaining tickers continue unaffected.

Unsubscribe

python
# Clean close — discard buffered data
await sub.unsubscribe()

# Drain — return any remaining buffered batches before closing
remaining = await sub.unsubscribe(drain=True)

Subscription properties

PropertyTypeDescription
sub.tickerslist[str]Currently subscribed tickers
sub.fieldslist[str]Subscribed fields
sub.is_activeboolWhether the subscription is open
sub.all_failedboolTrue if every ticker has failed or terminated

Subscription Configuration

Per-subscription tuning is done via keyword arguments on asubscribe() and astream().

all_fields

By default, Bloomberg delivers only the fields you requested. Setting all_fields=True exposes all top-level scalar fields Bloomberg sends (including INITPAINT summary fields for the initial snapshot):

python
# Receive INITPAINT fields and any extra scalar fields Bloomberg sends
sub = await blp.asubscribe(
    ['XBTUSD Curncy'],
    ['LAST_PRICE', 'BID', 'ASK'],
    all_fields=True,
)

flush_threshold, stream_capacity, overflow_policy

These control the backpressure pipeline between the Bloomberg SDK event thread and your Python consumer.

python
from xbbg import blp, configure

# Global defaults (applied to all new subscriptions)
configure(
    subscription_flush_threshold=10,   # buffer up to 10 ticks before flushing
    subscription_stream_capacity=512,  # internal channel depth
    overflow_policy='drop_newest',     # what to do when the channel is full
)

You can also pass them per-subscription to asubscribe() and astream():

python
sub = await blp.asubscribe(
    ['AAPL US Equity'],
    ['LAST_PRICE', 'BID', 'ASK'],
    flush_threshold=5,
    stream_capacity=1024,
    overflow_policy='block',       # block the Rust thread until Python catches up
)
ParameterDefaultDescription
flush_threshold1Ticks buffered before flushing to Python. Increase for throughput, decrease for latency.
stream_capacity256Backpressure buffer size (number of batches).
overflow_policy'drop_newest'Slow-consumer policy: 'drop_newest' or 'block'.

WARNING

overflow_policy='block' prevents data loss but will stall the Bloomberg SDK event thread if your consumer is too slow. Use it only when every tick is critical and your processing loop is fast enough to keep up.


Specialized Streams

xbbg provides dedicated functions for Bloomberg's specialized subscription services. All return a Subscription handle and follow the same async iteration pattern. Sync wrappers (vwap, mktbar, depth, chains) are also available.

VWAP — avwap() / vwap()

Streaming Volume Weighted Average Price via //blp/mktvwap.

python
# Basic — default fields: RT_PX_VWAP, RT_VWAP_VOLUME
sub = await blp.avwap(['AAPL US Equity'])
async for batch in sub:
    print(batch)
await sub.unsubscribe()
python
# Custom time window
sub = await blp.avwap(
    ['AAPL US Equity', 'MSFT US Equity'],
    start_time='09:30',
    end_time='16:00',
)
python
# Request additional VWAP fields
sub = await blp.avwap(
    'AAPL US Equity',
    ['RT_PX_VWAP', 'RT_VWAP_VOLUME', 'RT_VWAP_TURNOVER'],
)
ParameterDefaultDescription
tickersrequiredOne or more securities
fields['RT_PX_VWAP', 'RT_VWAP_VOLUME']Fields to subscribe to
start_timeNoneVWAP calculation start (e.g., '09:30')
end_timeNoneVWAP calculation end (e.g., '16:00')

Market Bars — amktbar() / mktbar()

Streaming real-time OHLCV bars via //blp/mktbar. Like bdib() but live as bars form.

Default fields: OPEN, HIGH, LOW, CLOSE, VOLUME, NUM_TRADES.

python
# 1-minute bars (default interval)
sub = await blp.amktbar('AAPL US Equity')
async for batch in sub:
    print(batch)
python
# 5-minute bars with a session window
async with await blp.amktbar(
    ['AAPL US Equity', 'MSFT US Equity'],
    interval=5,
    start_time='09:30',
    end_time='16:00',
) as sub:
    async for batch in sub:
        print(batch)
ParameterDefaultDescription
tickersrequiredOne or more securities
interval1Bar interval in minutes
start_timeNoneSession start in HH:MM format
end_timeNoneSession end in HH:MM format

Market Depth — adepth() / depth()

WARNING

Requires a Bloomberg B-PIPE license. Not available with standard Terminal connections. Raises BlpBPipeError if the service is unavailable.

Streaming Level 2 order book data via //blp/mktdepth. Fields are implicit (provided by Bloomberg).

python
from xbbg.exceptions import BlpBPipeError
from xbbg import blp

try:
    async with await blp.adepth('AAPL US Equity') as sub:
        async for batch in sub:
            print(batch)  # Bid/ask levels and sizes
except BlpBPipeError as e:
    print(f"B-PIPE required: {e}")

Option and Futures Chains — achains() / chains()

WARNING

Requires a Bloomberg B-PIPE license. Not available with standard Terminal connections. Raises BlpBPipeError if the service is unavailable.

Streaming option or futures chain updates via //blp/mktlist.

python
# Option chain (default)
async with await blp.achains('AAPL US Equity') as sub:
    async for batch in sub:
        print(batch)
python
# Futures chain
sub = await blp.achains('ES1 Index', chain_type='FUTURES')
async for batch in sub:
    print(batch)
await sub.unsubscribe()
ParameterDefaultDescription
underlyingrequiredUnderlying security identifier
chain_type'OPTIONS''OPTIONS' or 'FUTURES'

Health and Observability

The Subscription object exposes several properties for runtime inspection. None of these require pausing iteration — they are safe to read at any point.

status

A combined snapshot of the subscription's operational state:

python
print(sub.status)
# {
#   'active': True,
#   'all_failed': False,
#   'tickers': ['AAPL US Equity', 'MSFT US Equity'],
#   'failed_tickers': [],
#   'topic_states': {
#       'AAPL US Equity': {'state': 'SUBSCRIBED', 'last_change_us': 1711000000000},
#       'MSFT US Equity': {'state': 'SUBSCRIBED', 'last_change_us': 1711000000001},
#   },
#   'session': {...},
#   'admin': {...},
#   'services': {...},
# }

topic_states

Per-ticker lifecycle state keyed by ticker string. Useful for confirming that tickers are in SUBSCRIBED state before processing data:

python
for ticker, state_info in sub.topic_states.items():
    print(f"{ticker}: {state_info['state']}")

failed_tickers

List of tickers Bloomberg rejected or terminated. In a mixed-subscription scenario (some tickers valid, some invalid), this will be non-empty while is_active remains True as long as at least one ticker is healthy:

python
if sub.failed_tickers:
    print("Failed:", sub.failed_tickers)

failures

Detailed failure records, each containing ticker, reason, and kind ("failure" or "terminated"):

python
for f in sub.failures:
    print(f"{f['ticker']}: {f['reason']} ({f['kind']})")

events

Bounded lifecycle event history for the subscription. Each entry contains at_us (microsecond timestamp), category, level, message_type, topic, and detail:

python
for event in sub.events:
    print(f"[{event['level']}] {event['message_type']}: {event['detail']}")

stats

Subscription throughput metrics:

python
print(sub.stats)
# {
#   'messages_received': 4821,
#   'dropped_batches': 0,
#   'batches_sent': 4821,
#   'slow_consumer': False,
#   'data_loss_events': 0,
#   'last_message_us': 1711000123456,
#   'effective_overflow_policy': 'drop_newest',
# }
KeyDescription
messages_receivedTotal messages received from Bloomberg
dropped_batchesBatches dropped due to overflow
batches_sentBatches successfully delivered to Python
slow_consumerTrue if Bloomberg signalled DATALOSS
data_loss_eventsTotal Bloomberg data-loss signals observed
effective_overflow_policyRuntime policy used by the Rust stream

Error Handling

Topic failure isolation

Bloomberg's subscription model isolates failures at the topic level. If one ticker is invalid or gets terminated, the other tickers in the same subscription continue delivering data. Your loop keeps running; only failed_tickers grows.

python
sub = await blp.asubscribe(
    ['AAPL US Equity', 'INVALID_TICKER'],
    ['LAST_PRICE'],
)

async for batch in sub:
    # AAPL ticks still arrive; INVALID_TICKER never produces data
    print(batch)
    print("Failed so far:", sub.failed_tickers)

Stopping when all tickers fail

Use all_failed to detect the total-failure case and exit cleanly:

python
sub = await blp.asubscribe(['AAPL US Equity', 'MSFT US Equity'], ['LAST_PRICE'])

async for batch in sub:
    process(batch)
    if sub.all_failed:
        print("All tickers failed:", sub.failed_tickers)
        break

await sub.unsubscribe()

Handling partial failures at startup

Check topic_states after a short wait to confirm which tickers successfully subscribed before committing downstream resources:

python
import asyncio

sub = await blp.asubscribe(
    ['AAPL US Equity', 'INVALID_TICKER', 'MSFT US Equity'],
    ['LAST_PRICE', 'BID', 'ASK'],
)

# Brief wait for Bloomberg's INITPAINT / subscription status events
await asyncio.sleep(2)

active = [t for t, s in sub.topic_states.items() if s['state'] == 'SUBSCRIBED']
failed = sub.failed_tickers
print(f"Active: {active}")
print(f"Failed: {failed}")

if not active:
    await sub.unsubscribe()
    raise RuntimeError("No tickers subscribed successfully")

async for batch in sub:
    process(batch)

Connection interruption

Each subscription session is isolated. If the session drops and reconnects, Bloomberg will resend INITPAINT (initial snapshot) events. The events property records session lifecycle transitions you can inspect to detect reconnect cycles.


See Also

xbbg is independent open-source software and is not affiliated with, endorsed by, sponsored by, or approved by Bloomberg Finance L.P.