Connecting & Subscribing
Server Environments
ThetaDataDx supports three FPSS server environments:
| Config | Ports | Use case |
|---|---|---|
DirectConfig::production() | 20000/20001 | Live market data (NJ-A and NJ-B hosts) |
DirectConfig::dev() | 20200/20201 | Replays a random historical trading day in an infinite loop at max speed. Use when markets are closed. |
DirectConfig::stage() | 20100/20101 | Testing/staging servers. Frequent reboots, not stable. |
All three share the same MDDS (historical) production servers -- only FPSS hosts differ.
TLS & SPKI Pinning
FPSS TLS uses SPKI (Subject Public Key Info) pinning via a constant-time SHA-256 comparison against the captured ThetaData keypair. The pin survives cert renewal as long as ThetaData keeps the keypair; key rotation requires a coordinated client update. MITM attacks presenting a different certificate (even a valid CA-signed one) are rejected with RustlsError::General("FPSS SPKI pin mismatch ...").
Connect (Production)
use thetadatadx::{ThetaDataDxClient, Credentials, DirectConfig};
use thetadatadx::fpss::{FpssData, FpssControl, FpssEvent};
use thetadatadx::fpss::protocol::Contract;
let creds = Credentials::from_file("creds.txt")?;
let client = ThetaDataDxClient::connect(&creds, DirectConfig::production()).await?;
client.start_streaming(|event: &FpssEvent| {
match event {
// Every data event carries an `Arc<Contract>` — read the symbol
// directly, no contract-ID map lookup required.
FpssEvent::Data(FpssData::Quote { contract, bid, ask, received_at_ns, .. }) => {
println!("Quote: {} bid={bid:.2} ask={ask:.2} rx={received_at_ns}ns", contract.symbol);
}
FpssEvent::Data(FpssData::Trade { contract, price, size, received_at_ns, .. }) => {
println!("Trade: {} price={price:.2} size={size} rx={received_at_ns}ns", contract.symbol);
}
FpssEvent::Control(FpssControl::ContractAssigned { id, contract }) => {
println!("Contract {id} = {contract}");
}
_ => {}
}
})?;from thetadatadx import Credentials, Config, ThetaDataDxClient
creds = Credentials.from_file("creds.txt")
client = ThetaDataDxClient(creds, Config.production())
client.start_streaming(lambda event: print(event))auto creds = tdx::Credentials::from_file("creds.txt");
auto config = tdx::Config::production();
auto client = tdx::UnifiedClient::connect(creds, config);import { ThetaDataDxClient } from 'thetadatadx';
const client = await ThetaDataDxClient.connectFromFile('creds.txt');
client.startStreaming((event) => console.log(event));TIP
Every binding offers two equivalent delivery modes on the unified ThetaDataDxClient: push (start_streaming(callback)) for low-latency dispatch, and pull (start_streaming_iter() in Rust/Python/C++, startStreamingIter() in TypeScript) for the iterator idiom. Pick one per session — the modes are mutually exclusive on the client.
Connect (Dev Server)
The dev server replays historical data at maximum speed -- ideal for testing when markets are closed.
let client = ThetaDataDxClient::connect(&creds, DirectConfig::dev()).await?;client = ThetaDataDxClient(creds, Config.dev())auto config = tdx::Config::dev();
auto client = tdx::UnifiedClient::connect(creds, config);// Dev server config is not yet exposed in the TypeScript SDK.
// connectFromFile() always uses production config.
const client = await ThetaDataDxClient.connectFromFile('creds.txt');Dev server trade format
The dev server sends a simplified 8-field trade format instead of the full 16-field production format. This is handled transparently by the SDK -- your code sees the same Trade event type with missing fields zeroed out.
Connect (Stage Server)
let client = ThetaDataDxClient::connect(&creds, DirectConfig::stage()).await?;client = ThetaDataDxClient(creds, Config.stage())auto config = tdx::Config::stage();
auto client = tdx::UnifiedClient::connect(creds, config);// Stage server config is not yet exposed in the TypeScript SDK.
// connectFromFile() always uses production config.
const client = await ThetaDataDxClient.connectFromFile('creds.txt');Flush Mode
FpssFlushMode controls the latency/syscall tradeoff on the write path:
| Mode | Flush trigger | Added latency | Best for |
|---|---|---|---|
Batched (default) | PING frames every ~100ms | Up to 100ms | Production throughput, matches Java terminal |
Immediate | Every frame write | None | Lowest latency trading |
use thetadatadx::config::FpssFlushMode;
let mut config = DirectConfig::production();
config.fpss_flush_mode = FpssFlushMode::Immediate; // lowest latency
let client = ThetaDataDxClient::connect(&creds, config).await?;# Flush mode cannot currently be changed from the Python SDK.
# It defaults to Batched (flush on PING frames, ~100ms).
# Use the Rust SDK directly if you need Immediate mode.
client = ThetaDataDxClient(creds, Config.production())auto config = tdx::Config::production();
config.set_flush_mode(tdx::FlushMode::Immediate);
auto client = tdx::UnifiedClient::connect(creds, config);// Flush mode cannot currently be changed from the TypeScript SDK.
// It defaults to Batched (flush on PING frames, ~100ms).
const client = await ThetaDataDxClient.connectFromFile('creds.txt');Custom FPSS Hosts
FPSS hosts are not hardcoded. You can override them:
let mut config = DirectConfig::production();
config.fpss_hosts = vec![
("custom-host-a.example.com".to_string(), 20000),
("custom-host-b.example.com".to_string(), 20000),
];
let client = ThetaDataDxClient::connect(&creds, config).await?;
// Or parse from a comma-separated string (same format as config_0.properties):
let hosts = DirectConfig::parse_fpss_hosts("host-a:20000,host-b:20001")?;# Custom hosts are configured at the Rust level via DirectConfig or
# TOML config file. Python inherits them from the config at connection time.
# Set hosts in config.toml:
# [fpss]
# hosts = ["host-a.example.com:20000", "host-b.example.com:20001"]
client = ThetaDataDxClient(creds, Config.production())// Custom hosts are configured at the Rust level via DirectConfig or
// TOML config file. C++ inherits them from the config at connection time.
// Set hosts in config.toml:
// [fpss]
// hosts = ["host-a.example.com:20000", "host-b.example.com:20001"]
auto config = tdx::Config::production();
auto client = tdx::UnifiedClient::connect(creds, config);// Custom hosts are configured at the Rust level via DirectConfig or
// TOML config file. TypeScript inherits them from the config at connection time.
// Set hosts in config.toml:
// [fpss]
// hosts = ["host-a.example.com:20000", "host-b.example.com:20001"]
const client = await ThetaDataDxClient.connectFromFile('creds.txt');Or use a TOML config file (requires config-file feature):
[fpss]
hosts = ["host-a.example.com:20000", "host-b.example.com:20001"]
# Or as CSV:
# hosts = "host-a.example.com:20000,host-b.example.com:20001"Async/Sync Design
ThetaDataDx uses two different concurrency models for its two data paths:
| Path | Runtime | Why |
|---|---|---|
connect() + all historical methods | async (tokio) | gRPC/tonic requires tokio for HTTP/2 multiplexing |
start_streaming() + callbacks | sync (OS threads) | Dedicated I/O thread + LMAX Disruptor ring buffer for lowest latency |
TypeScript EventIterator.next() | Promise (napi-rs) | Returns Promise<FpssEvent | null> (null = timeout / drained) so Node's event loop stays unblocked during the read timeout |
What this means for your code:
- You need a tokio runtime for
connect()and any historical data call (stock_history_eod, etc.). - The streaming callback (
FnMut(&FpssEvent)) runs on a plain OS thread -- no async executor involved. This eliminates all executor scheduling jitter from the hot path. subscribe()andunsubscribe()are synchronous — they send a command through an internal channel to the I/O thread.
tokio runtime
+-- connect() async, gRPC/tonic/HTTP2
+-- stock_history_*() async, gRPC streaming
std::thread (fpss-io)
+-- TLS read loop blocking, 50ms timeout
+-- Disruptor publish lock-free, zero-alloc
std::thread (fpss-ping)
+-- PING heartbeat 100ms sleep loop
Disruptor consumer thread
+-- your callback(FnMut(&FpssEvent))You can safely call subscribe(spec) / unsubscribe(spec) from any thread -- the command is sent through an mpsc channel and executed by the I/O thread.
Subscribe
Every binding exposes a single polymorphic subscribe() method on the unified ThetaDataDxClient. Build a typed subscription spec by calling a topic helper — quote(), trade(), open_interest() — on a Contract, or full_trades() / full_open_interest() on a SecType, and hand the result to subscribe().
// Stock quotes
client.subscribe(Contract::stock("AAPL").quote())?;
// Stock trades
client.subscribe(Contract::stock("MSFT").trade())?;
// Quotes + trades in one call
client.subscribe(Contract::stock("TSLA").all())?;
// Option quotes
let opt = Contract::option("SPY", "20261218", "600", "C")?;
client.subscribe(opt.quote())?;
// Open interest
client.subscribe(Contract::stock("AAPL").open_interest())?;
// All trades for a security type (firehose)
client.subscribe(SecType::Stock.full_trades())?;
// All open interest for a security type (firehose)
client.subscribe(SecType::Option.full_open_interest())?;client.subscribe(Contract.stock("AAPL").quote())
client.subscribe(Contract.stock("MSFT").trade())
client.subscribe(Contract.stock("SPY").open_interest())
client.subscribe(SecType.Stock.full_trades())
client.subscribe(SecType.Option.full_open_interest())// Stock quotes
client.subscribe(tdx::Contract::stock("AAPL").quote());
// Stock trades
client.subscribe(tdx::Contract::stock("MSFT").trade());
// Open interest
client.subscribe(tdx::Contract::stock("AAPL").open_interest());
// All trades for a security type (firehose)
client.subscribe(tdx::SecType::Stock.full_trades());
// All open interest for a security type
client.subscribe(tdx::SecType::Option.full_open_interest());// Stock quotes
client.subscribe(Contract.stock('AAPL').quote());
// Stock trades
client.subscribe(Contract.stock('MSFT').trade());
// Open interest
client.subscribe(Contract.stock('AAPL').openInterest());
// All trades for a security type (firehose)
client.subscribe(SecType.Stock.fullTrades());
// All open interest for a security type
client.subscribe(SecType.Option.fullOpenInterest());Typed Contract on Data Events
FPSS assigns integer IDs to contracts on the wire, but the SDK resolves every data event's contract before user code sees it. Quote / Trade / OpenInterest / Ohlcvc events carry a typed contract with symbol, sec_type, expiration, strike / strike_dollars, and the option side. Each field surfaces in the language-idiomatic shape:
- Rust:
Contract { symbol: String, sec_type: SecType, expiration: Option<i32>, is_call: Option<bool>, strike: Option<i32> }plus the derived accessorsright() -> Option<Right>andstrike_dollars() -> Option<f64>. - Python:
event.contractexposessymbol: str,sec_type: str("STOCK"/"OPTION"/"INDEX"/"RATE"),expiration: Optional[int],right: Optional[str]("C"/"P"),strike_dollars: Optional[float], and the wire-levelstrike: Optional[int]. - TypeScript: same field set as Python (
secType: string,right: string | null,strikeDollars: number | null,strike: number | null). - C / C++:
TdxContract { const char* symbol; int32_t sec_type; bool has_expiration; int32_t expiration; bool has_right; char right; bool has_strike; int32_t strike; }. Strike stays in the wire integer form on the C ABI for layout stability; convert to dollars withstrike / 1000.0(or use thetdx::ContractC++ wrapper's accessors).
User code reads the symbol directly off the event without a side-table lookup.
client.start_streaming(|event: &FpssEvent| {
match event {
// Read the typed contract directly off the event; no integer
// ID lookup required.
FpssEvent::Data(FpssData::Quote { contract, bid, ask, .. }) => {
println!("{}: bid={bid:.2} ask={ask:.2}", contract.symbol);
}
FpssEvent::Data(FpssData::Trade { contract, price, size, .. }) => {
println!("{}: price={price:.2} size={size}", contract.symbol);
}
_ => {}
}
})?;
// Snapshot active subscriptions — useful for diagnostics dashboards.
let subs = client.active_subscriptions()?;
for (kind, contract) in subs {
println!(" {kind:?}: {}", contract.symbol);
}# Pull-iter mode: read `event.contract.symbol` directly off the
# typed event.
with client.streaming_iter() as it:
for event in it:
if event.kind == "quote":
print(f"[QUOTE] {event.contract.symbol}: bid={event.bid} ask={event.ask}")
elif event.kind == "trade":
print(f"[TRADE] {event.contract.symbol}: price={event.price} size={event.size}")
# Snapshot active subscriptions.
for sub in client.active_subscriptions():
print(sub)// Pull-iter: each event carries event->quote.contract.symbol etc.
// directly — `has_expiration` / `has_right` / `has_strike` gate
// the option-only fields.
// Snapshot active subscriptions.
auto subs = client.active_subscriptions();
for (const auto& sub : subs) {
std::cout << " " << static_cast<int>(sub.kind)
<< ": " << sub.contract.symbol << std::endl;
}// Each event carries the resolved typed contract directly — read
// event.contract.symbol off the event in your callback or async
// iterator loop, no side-table lookup required.
// Snapshot active subscriptions.
const subs = client.activeSubscriptions();Unsubscribe
Unsubscribe takes the same typed subscription spec as subscribe() — pass the matching quote() / trade() / open_interest() / full_trades() / full_open_interest() topic.
client.unsubscribe(Contract::stock("AAPL").quote())?;
client.unsubscribe(Contract::stock("MSFT").trade())?;
client.unsubscribe(Contract::stock("AAPL").open_interest())?;
client.unsubscribe(SecType::Stock.full_trades())?;
client.unsubscribe(SecType::Option.full_open_interest())?;client.unsubscribe(Contract.stock("AAPL").quote())
client.unsubscribe(Contract.stock("MSFT").trade())
client.unsubscribe(Contract.stock("SPY").open_interest())
client.unsubscribe(SecType.Stock.full_trades())
client.unsubscribe(SecType.Option.full_open_interest())client.unsubscribe(tdx::Contract::stock("AAPL").quote());
client.unsubscribe(tdx::Contract::stock("MSFT").trade());
client.unsubscribe(tdx::Contract::stock("AAPL").open_interest());
client.unsubscribe(tdx::SecType::Stock.full_trades());
client.unsubscribe(tdx::SecType::Option.full_open_interest());client.unsubscribe(Contract.stock('AAPL').quote());
client.unsubscribe(Contract.stock('MSFT').trade());
client.unsubscribe(Contract.stock('AAPL').openInterest());
client.unsubscribe(SecType.Stock.fullTrades());
client.unsubscribe(SecType.Option.fullOpenInterest());Stop Streaming
client.stop_streaming();client.stop_streaming()client.stop_streaming();
// RAII also handles cleanup: the ThetaDataDxClient destructor stops streaming on drop.client.stopStreaming();