Server API
The TopGun server is a Rust binary built on axum and tokio. It manages client WebSocket connections, CRDT synchronization, Merkle tree delta sync, full-text search, clustering, and durable PostgreSQL storage.
Getting Started
# Run the test server (integration testing)
cargo run --bin test-server --release
# With a custom port
PORT=8080 cargo run --bin test-server --release
# With debug logging
RUST_LOG=topgun_server=debug cargo run --bin test-server --release test-server binary is designed for integration testing. It uses in-memory storage (NullDataStore) and a hardcoded JWT secret. See the CLI Reference for details on the test binary and future production builds.Configuration
The Rust server is configured via two primary mechanisms: environment variables for runtime settings, and Rust structs for programmatic embedding.
Environment Variables
The server reads three environment variables at startup: PORT, TOPGUN_ADMIN_DIR, and RUST_LOG. See the CLI Reference for the full table with descriptions and defaults.
NetworkConfig (Programmatic)
struct NetworkConfigParameters
hostString. Bind address for the server. Default: "0.0.0.0"portu16. Port to listen on. 0 means OS-assigned. Default: 0tlsOption<TlsConfig>. Optional TLS configuration with cert_path, key_path, and optional ca_cert_path. Default: NoneconnectionConnectionConfig. Per-connection settings: outbound_channel_capacity (256), send_timeout (5s), idle_timeout (60s), ws_write_buffer_size (128KB), ws_max_write_buffer_size (512KB).cors_originsVec<String>. Allowed CORS origins. Default: ["*"]request_timeoutDuration. Maximum time to wait for a request to complete. Default: 30s
ServerConfig (Programmatic)
struct ServerConfigParameters
node_idString. Unique identifier for this server node.default_operation_timeout_msu64. Default timeout for operations in milliseconds. Default: 30000max_concurrent_operationsu32. Maximum concurrent operations before load shedding. Default: 1000gc_interval_msu64. Interval between garbage collection runs in milliseconds. Default: 60000partition_countu32. Number of partitions. Default: 271 (PARTITION_COUNT)securitySecurityConfig. Security configuration for write validation. Default: SecurityConfig::default()
Embedding the Server
use topgun_server::network::config::NetworkConfig;
use topgun_server::network::module::NetworkModule;
let config = NetworkConfig {
host: "0.0.0.0".to_string(),
port: 8080,
tls: None,
..Default::default()
};
let mut module = NetworkModule::new(config);
let port = module.start().await?;
println!("Server listening on port {port}");
module.serve(shutdown_signal()).await?; Endpoints
The server exposes the following HTTP and WebSocket endpoints:
Health and Observability
| Endpoint | Method | Description |
|---|---|---|
| /health | GET | Detailed health JSON including server state, uptime, and connection count. |
| /health/live | GET | Kubernetes liveness probe. Returns 200 if the process is alive. |
| /health/ready | GET | Kubernetes readiness probe. Returns 200 when the server is accepting connections. |
| /metrics | GET | Prometheus metrics endpoint. Returns text/plain with counter, histogram, and gauge data. |
Client Communication
| Endpoint | Method | Description |
|---|---|---|
| /ws | GET | WebSocket upgrade. Primary real-time connection for CRDT sync, queries, and pub/sub. Sends AUTH_REQUIRED on connect. |
| /sync | POST | Stateless HTTP sync endpoint. Accepts MsgPack-encoded batched operations, returns deltas and query results. |
Admin API
| Endpoint | Method | Auth | Description |
|---|---|---|---|
| /api/status | GET | No | Server status (version, uptime, node info). |
| /api/auth/login | POST | No | Admin login endpoint. Returns JWT for admin API access. |
| /api/admin/cluster/status | GET | Admin JWT | Cluster membership and partition status. |
| /api/admin/maps | GET | Admin JWT | List all maps with record counts and partition info. |
| /api/admin/settings | GET, PUT | Admin JWT | Read or update server configuration at runtime. |
| /api/docs | GET | No | Swagger UI for the admin API (OpenAPI spec at /api/openapi.json). |
| /admin/* | GET | No | SPA admin dashboard. Serves static files from TOPGUN_ADMIN_DIR. |
Docker
See the CLI Reference for Docker run examples with environment variable configuration.
PostgreSQL Storage
The PostgresDataStore provides durable write-through persistence via sqlx::PgPool. Every mutation is committed to PostgreSQL before the async method returns.
use sqlx::PgPool;
use topgun_server::storage::datastores::PostgresDataStore;
let pool = PgPool::connect("postgres://user:pass@localhost/topgun").await?;
let store = PostgresDataStore::new(pool, None)?; // table: "topgun_maps"
store.initialize().await?; // CREATE TABLE IF NOT EXISTS PostgresDataStore::new(pool, table_name)Parameters
poolPgPool. A sqlx connection pool. Create with PgPool::connect("postgres://...").table_nameOption<String>. Custom table name. Default: "topgun_maps". Validated against ^[a-zA-Z_][a-zA-Z0-9_]*$.
Call store.initialize().await? after construction to run the schema migration (CREATE TABLE IF NOT EXISTS).
POST /sync Endpoint
POST /syncContent Type:
application/x-msgpack (binary MsgPack)
application/msgpackRequest Schema (HttpSyncRequest)
interface HttpSyncRequestParameters
clientIdstring. Client identifier. Required.clientHlcTimestamp. Client's current Hybrid Logical Clock as an object: { millis: number, counter: number, nodeId: string }. Required.operations?ClientOp[]. Batch of operations to push to the server. Each operation contains mapName, key, and record with value and timestamp.syncMaps?SyncMapEntry[]. Maps to pull deltas for. Each entry contains mapName (string) and lastSyncTimestamp (Timestamp) indicating the last known server state for that map.queries?HttpQueryRequest[]. One-shot queries to execute. Each query contains queryId (string), mapName (string), filter (object), and optional limit (number) and offset (number).searches?HttpSearchRequest[]. One-shot search requests to execute.
Response Schema (HttpSyncResponse)
interface HttpSyncResponseParameters
serverHlcTimestamp. Server's current HLC timestamp as an object: { millis: number, counter: number, nodeId: string }.ack?{ lastId: string, results?: AckResult[] }. Acknowledgment of processed operations. lastId is the ID of the last processed operation. results contains per-operation success/failure details with achievedLevel.deltas?MapDelta[]. Delta records for requested maps. Each MapDelta contains mapName (string), records (DeltaRecord[] with key, record: LWWRecord, eventType: 'PUT' | 'REMOVE'), and serverSyncTimestamp (Timestamp).queryResults?HttpQueryResult[]. Results for one-shot queries, matching the queryId from the request.searchResults?HttpSearchResult[]. Results for one-shot search requests.errors?HttpSyncError[]. Errors for individual operations. Each error contains code (number), message (string), and context (string, optional) identifying the failed operation.
Example Request
// POST /sync
// Content-Type: application/x-msgpack
// (shown as JSON for readability)
{
"clientId": "client-1",
"clientHlc": {
"millis": 1706000000000,
"counter": 0,
"nodeId": "client-1"
},
"operations": [
{
"mapName": "todos",
"key": "t1",
"record": {
"value": { "text": "Buy milk" },
"timestamp": {
"millis": 1706000000000,
"counter": 1,
"nodeId": "client-1"
}
}
}
],
"syncMaps": [
{
"mapName": "todos",
"lastSyncTimestamp": {
"millis": 1705999000000,
"counter": 0,
"nodeId": ""
}
}
]
} application/x-msgpack (binary). The examples above use JSON for readability.Example Response
// 200 OK
// Content-Type: application/msgpack
{
"serverHlc": {
"millis": 1706000001000,
"counter": 1,
"nodeId": "server-1"
},
"ack": {
"lastId": "http-op-0",
"results": [
{
"opId": "http-op-0",
"success": true,
"achievedLevel": "MEMORY"
}
]
},
"deltas": [
{
"mapName": "todos",
"records": [
{
"key": "t2",
"record": {
"value": { "text": "Walk the dog" },
"timestamp": {
"millis": 1706000000500,
"counter": 0,
"nodeId": "client-2"
}
},
"eventType": "PUT"
}
],
"serverSyncTimestamp": {
"millis": 1706000001000,
"counter": 2,
"nodeId": "server-1"
}
}
],
"queryResults": [],
"searchResults": [],
"errors": []
} Graceful Shutdown
The server supports graceful shutdown via SIGTERM or SIGINT:
- Health state transitions to Draining
- All active WebSocket connections receive a Close frame
- In-flight requests are given up to 30 seconds to complete
- Health state transitions to Stopped