Skip to content

Reconnection & Error Handling

Reconnection (Rust)

ThetaDataDx uses manual reconnection. When the server disconnects, you receive an FpssControl::Disconnected event with a reason code.

rust
use thetadatadx::ThetaDataDx;
use thetadatadx::types::RemoveReason;

match thetadatadx::fpss::reconnect_delay(reason) {
    None => {
        // Permanent error (bad credentials, etc.) - do NOT retry
        eprintln!("Permanent disconnect: {:?}", reason);
    }
    Some(delay_ms) => {
        // Wait and reconnect streaming
        std::thread::sleep(std::time::Duration::from_millis(delay_ms));
        tdx.start_streaming(handler)?;
        // Re-subscribe to previous subscriptions
    }
}

WARNING

After reconnection, you must re-subscribe to all previously active streams. The server does not remember your subscriptions across connections.

Disconnect Categories

CategoryCodesAction
Permanent0, 1, 2, 6, 9, 17, 18Do NOT reconnect
Rate-limited12Wait 130 seconds, then reconnect
TransientAll othersWait 2 seconds, then reconnect

Permanent Disconnect Reasons

Permanent disconnects indicate a problem that will not resolve by retrying:

  • Code 0, 1, 2 - Authentication failures (bad credentials, expired subscription)
  • Code 6 - Account suspended
  • Code 9 - Invalid request parameters
  • Code 17, 18 - Server-side permanent errors

Rate-Limited Disconnect

Code 12 indicates you have exceeded the connection rate limit. Wait the full 130 seconds before attempting to reconnect, or the cooldown resets.

Transient Disconnects

All other codes indicate temporary issues (network glitch, server restart, etc.). A 2-second delay before reconnection is sufficient.

Complete Example with Reconnection

rust
use thetadatadx::{ThetaDataDx, Credentials, DirectConfig};
use thetadatadx::fpss::{FpssData, FpssControl, FpssEvent};
use thetadatadx::fpss::protocol::Contract;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

#[tokio::main]
async fn main() -> Result<(), thetadatadx::Error> {
    let creds = Credentials::from_file("creds.txt")?;
    let tdx = ThetaDataDx::connect(&creds, DirectConfig::production()).await?;

    let contracts: Arc<Mutex<HashMap<i32, Contract>>> = Arc::new(Mutex::new(HashMap::new()));
    let contracts_clone = contracts.clone();

    tdx.start_streaming(move |event: &FpssEvent| {
        match event {
            FpssEvent::Control(FpssControl::ContractAssigned { id, contract }) => {
                contracts_clone.lock().unwrap().insert(*id, contract.clone());
            }
            FpssEvent::Data(FpssData::Quote { contract_id, bid, ask, price_type, .. }) => {
                if let Some(c) = contracts_clone.lock().unwrap().get(contract_id) {
                    let bid_p = Price::new(*bid, *price_type);
                    let ask_p = Price::new(*ask, *price_type);
                    println!("[QUOTE] {}: bid={} ask={}", c.root, bid_p, ask_p);
                }
            }
            FpssEvent::Data(FpssData::Trade { contract_id, price, size, price_type, .. }) => {
                if let Some(c) = contracts_clone.lock().unwrap().get(contract_id) {
                    let trade_p = Price::new(*price, *price_type);
                    println!("[TRADE] {}: price={} size={}", c.root, trade_p, size);
                }
            }
            FpssEvent::Control(FpssControl::Disconnected { reason }) => {
                eprintln!("Disconnected: {:?}", reason);
            }
            _ => {}
        }
    })?;

    tdx.subscribe_quotes(&Contract::stock("AAPL"))?;
    tdx.subscribe_trades(&Contract::stock("AAPL"))?;
    tdx.subscribe_quotes(&Contract::stock("MSFT"))?;

    // Block until interrupted
    std::thread::park();
    tdx.stop_streaming();
    Ok(())
}
python
from thetadatadx import Credentials, Config, ThetaDataDx
import signal
import sys

creds = Credentials.from_file("creds.txt")
tdx = ThetaDataDx(creds, Config.production())

# Start streaming
tdx.start_streaming()

# Graceful shutdown on Ctrl+C
def shutdown_handler(sig, frame):
    tdx.stop_streaming()
    sys.exit(0)

signal.signal(signal.SIGINT, shutdown_handler)

# Subscribe to multiple streams
tdx.subscribe_quotes("AAPL")
tdx.subscribe_trades("AAPL")
tdx.subscribe_quotes("MSFT")

contracts = {}

while True:
    event = tdx.next_event(timeout_ms=5000)
    if event is None:
        continue

    if event["kind"] == "contract_assigned":
        contracts[event["id"]] = event["contract"]
    elif event["kind"] == "quote":
        name = contracts.get(event["contract_id"], "?")
        print(f"[QUOTE] {name}: bid={event['bid']} ask={event['ask']}")
    elif event["kind"] == "trade":
        name = contracts.get(event["contract_id"], "?")
        print(f"[TRADE] {name}: price={event['price']} size={event['size']}")
    elif event["kind"] == "disconnected":
        print(f"Disconnected: {event['reason']}")
        break

tdx.stop_streaming()
go
package main

import (
    "fmt"
    "log"

    thetadatadx "github.com/userFRM/ThetaDataDx/sdks/go"
)

func main() {
    creds, _ := thetadatadx.CredentialsFromFile("creds.txt")
    defer creds.Close()

    config := thetadatadx.ProductionConfig()
    defer config.Close()

    // Historical client
    client, err := thetadatadx.Connect(creds, config)
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()

    // Streaming client (separate connection, same credentials)
    fpss, err := thetadatadx.NewFpssClient(creds, config)
    if err != nil {
        log.Fatal(err)
    }
    defer fpss.Close()

    // Subscribe to real-time data
    fpss.SubscribeQuotes("AAPL")
    fpss.SubscribeTrades("AAPL")

    // Process events
    for {
        event, err := fpss.NextEvent(5000)
        if err != nil {
            log.Println("Error:", err)
            break
        }
        if event == nil {
            continue
        }
        fmt.Printf("Event: %s\n", string(event))
    }

    fpss.Shutdown()
}
cpp
#include "thetadx.hpp"
#include <iostream>

int main() {
    auto creds = tdx::Credentials::from_file("creds.txt");
    auto config = tdx::Config::production();

    // Historical client
    auto client = tdx::Client::connect(creds, config);

    // Streaming client (separate connection, same credentials)
    tdx::FpssClient fpss(creds, config);

    // Subscribe to quotes and trades
    fpss.subscribe_quotes("AAPL");
    fpss.subscribe_trades("AAPL");
    fpss.subscribe_trades("MSFT");

    // Process events
    while (true) {
        auto event = fpss.next_event(5000);
        if (event.empty()) {
            continue;
        }
        std::cout << "Event: " << event << std::endl;
    }

    fpss.shutdown();
}

Released under the GPL-3.0-or-later License.