Python API¶
The Python client provides a Pythonic interface to FoundationDB with decorators, context managers, and async support.
Installation¶
Prerequisites
The FoundationDB client library must be installed on your system. See Installation.
Version Matching
Always install a Python client version that matches your FoundationDB server version. For example, use foundationdb==7.3.x with a 7.3 server.
Quick Start¶
import fdb
# REQUIRED: Set API version before any other FDB calls
fdb.api_version(730)
# Open the default database
db = fdb.open()
# Simple transaction using decorator
@fdb.transactional
def hello_world(tr):
tr[b'hello'] = b'world'
return tr[b'hello']
result = hello_world(db)
print(result) # b'world'
Core Concepts¶
API Versioning¶
Always set the API version at program startup:
Version Lock
Once set, the API version cannot be changed. Call api_version() before any other FDB operations.
Opening the Database¶
# Default cluster file location
db = fdb.open()
# Custom cluster file
db = fdb.open('/path/to/fdb.cluster')
# With event loop (for async)
db = fdb.open(event_model='asyncio')
Transactions¶
Using the Decorator¶
The @fdb.transactional decorator is the recommended approach:
@fdb.transactional
def transfer_funds(tr, from_acct, to_acct, amount):
"""Transfer funds between accounts atomically."""
from_key = fdb.tuple.pack(('accounts', from_acct))
to_key = fdb.tuple.pack(('accounts', to_acct))
from_balance = int(tr[from_key] or b'0')
to_balance = int(tr[to_key] or b'0')
if from_balance < amount:
raise ValueError("Insufficient funds")
tr[from_key] = str(from_balance - amount).encode()
tr[to_key] = str(to_balance + amount).encode()
# Pass database as first argument
transfer_funds(db, 'alice', 'bob', 100)
Automatic Retry
The decorator automatically retries on transient errors and conflict.
Context Manager¶
For more control, use the context manager:
with db.transaction() as tr:
tr[b'key1'] = b'value1'
tr[b'key2'] = b'value2'
# Commits automatically on exit
Manual Transactions¶
For fine-grained control:
tr = db.create_transaction()
try:
tr[b'key'] = b'value'
tr.commit().wait()
except fdb.FDBError as e:
tr.on_error(e).wait()
# Retry logic here
Reading Data¶
Single Key¶
Range Reads¶
@fdb.transactional
def get_range(tr, start, end):
"""Read all key-value pairs in range."""
return list(tr.get_range(start, end))
@fdb.transactional
def get_all_users(tr):
"""Read all users using tuple keys."""
start = fdb.tuple.pack(('users',))
end = fdb.tuple.pack(('users', None)) # None is max value
users = []
for key, value in tr.get_range(start, end):
_, user_id = fdb.tuple.unpack(key)
users.append((user_id, value.decode()))
return users
Range Options¶
@fdb.transactional
def get_with_options(tr):
# Limit results
result = tr.get_range(start, end, limit=100)
# Reverse order
result = tr.get_range(start, end, reverse=True)
# Streaming mode for large ranges
result = tr.get_range(start, end, streaming_mode=fdb.StreamingMode.want_all)
GetMappedRange (7.1+)¶
Experimental
GetMappedRange is available in FoundationDB 7.1 and later, but it remains experimental in all current releases (7.1, 7.3, and 7.4). It is only allowed when the read uses snapshot isolation and the READ_YOUR_WRITES_DISABLE transaction option is set; calling it in any other configuration raises an error. The on-the-wire format and mapper syntax may still change in future releases.
See the upstream Everything about GetMappedRange wiki for details.
GetMappedRange allows efficient secondary index lookups by fetching related data in a single operation. Because of the requirements above, the read must go through tr.snapshot and the transaction must disable read-your-writes:
@fdb.transactional
def get_users_by_city(tr, city):
"""Fetch users using a city index with GetMappedRange (experimental)."""
# Index: ('city_index', city, user_id) -> b''
# Data: ('users', user_id) -> user_data
# Required: GetMappedRange only works with snapshot reads and
# READ_YOUR_WRITES_DISABLE.
tr.options.set_read_your_writes_disable()
index_start = fdb.tuple.pack(('city_index', city))
index_end = fdb.tuple.pack(('city_index', city, None))
# Define mapper to fetch user data for each index entry
mapper = fdb.tuple.pack(('users',)) + b'{K[2]}'
results = tr.snapshot.get_mapped_range(
index_start, index_end,
mapper=mapper
)
return [(key, value) for key, value in results]
Writing Data¶
Set and Clear¶
@fdb.transactional
def write_data(tr):
# Set a value
tr[b'key'] = b'value'
# Clear a key
del tr[b'key']
# Clear a range
tr.clear_range(b'prefix\x00', b'prefix\xff')
Atomic Operations¶
@fdb.transactional
def atomic_ops(tr, key):
# Atomic add (for counters)
tr.add(key, struct.pack('<q', 1))
# Bitwise operations
tr.bit_and(key, b'\xff\x00')
tr.bit_or(key, b'\x00\xff')
tr.bit_xor(key, b'\xaa\x55')
# Min/max
tr.min(key, struct.pack('<q', 10))
tr.max(key, struct.pack('<q', 100))
# Compare and clear
tr.compare_and_clear(key, b'expected')
Tuple Layer¶
The tuple layer provides structured key encoding:
from fdb import tuple
# Pack tuples into keys
key = tuple.pack(('users', 'alice', 'profile'))
# Result: b'\x02users\x00\x02alice\x00\x02profile\x00'
# Unpack keys into tuples
values = tuple.unpack(key)
# Result: ('users', 'alice', 'profile')
# Supported types: str, bytes, int, float, bool, None, UUID
key = tuple.pack(('data', 42, 3.14, True, None))
Directory Layer¶
Organize keys hierarchically:
# Open the directory layer
directory = fdb.directory.create_or_open(db, ('myapp',))
# Create subspaces
users = directory.create_or_open(db, ('users',))
orders = directory.create_or_open(db, ('orders',))
# Use subspace for keys
@fdb.transactional
def create_user(tr, user_id, name):
tr[users.pack((user_id, 'name'))] = name.encode()
tr[users.pack((user_id, 'created'))] = str(time.time()).encode()
Error Handling¶
Retryable Errors¶
@fdb.transactional
def safe_operation(tr):
# Transactional decorator handles retries
value = tr[b'counter']
tr[b'counter'] = str(int(value or b'0') + 1).encode()
Non-Retryable Errors¶
try:
result = some_operation(db)
except fdb.FDBError as e:
if e.code == 1007: # past_version
print("Transaction too old")
elif e.code == 1009: # future_version
print("Cluster version ahead")
else:
raise
Common Error Codes¶
| Code | Name | Description |
|---|---|---|
| 1007 | past_version | Transaction took too long |
| 1009 | future_version | Cluster ahead of client |
| 1020 | not_committed | Conflict during commit |
| 1021 | commit_unknown_result | Commit may have succeeded |
| 2000 | client_invalid_operation | Invalid API usage |
Async Support¶
With asyncio¶
import asyncio
import fdb
fdb.api_version(730)
db = fdb.open()
async def async_operation():
@fdb.transactional
async def read_write(tr):
value = await tr[b'key']
tr[b'key2'] = value + b'_modified'
return value
return await read_write(db)
asyncio.run(async_operation())
Common Patterns¶
Counters¶
import struct
@fdb.transactional
def increment(tr, counter_key):
tr.add(counter_key, struct.pack('<q', 1))
@fdb.transactional
def get_count(tr, counter_key):
value = tr[counter_key]
if value is None:
return 0
return struct.unpack('<q', value)[0]
Presence Checking¶
Pagination¶
@fdb.transactional
def get_page(tr, prefix, page_size, last_key=None):
start = last_key + b'\x00' if last_key else prefix
end = fdb.KeySelector.first_greater_or_equal(prefix + b'\xff')
results = list(tr.get_range(start, end, limit=page_size + 1))
has_more = len(results) > page_size
if has_more:
results = results[:page_size]
return results, has_more
Best Practices¶
Do
- Use
@fdb.transactionaldecorator for automatic retry - Keep transactions short (< 5 seconds)
- Use tuple layer for structured keys
- Handle
FDBErrorexceptions appropriately
Don't
- Don't perform I/O inside transactions
- Don't hold transactions open during user input
- Don't ignore conflict errors in critical paths
- Don't store values > 100KB (use chunking)