Mooncake Store Python API#
Installation#
PyPI Package#
Install the Mooncake Transfer Engine package from PyPI, which includes both Mooncake Transfer Engine and Mooncake Store Python bindings:
pip install mooncake-transfer-engine
📦 Package Details: https://pypi.org/project/mooncake-transfer-engine/
Required Service#
Only one service is required now:
mooncake_master— Master service which now embeds the HTTP metadata server
Quick Start#
Start Master (with HTTP enabled)#
Enable the built-in HTTP metadata server when starting the master:
mooncake_master \
--enable_http_metadata_server=true \
--http_metadata_server_host=0.0.0.0 \
--http_metadata_server_port=8080
This exposes the metadata endpoint at http://<host>:<port>/metadata.
Hello World Example#
from mooncake.store import MooncakeDistributedStore
# 1. Create store instance
store = MooncakeDistributedStore()
# 2. Setup with all required parameters
store.setup(
"localhost", # Your node's address
"http://localhost:8080/metadata", # HTTP metadata server
512*1024*1024, # 512MB segment size
128*1024*1024, # 128MB local buffer
"tcp", # Use TCP (RDMA for high performance)
"", # Leave empty; Mooncake auto-picks RDMA devices when needed
"localhost:50051" # Master service
)
# 3. Store data
store.put("hello_key", b"Hello, Mooncake Store!")
# 4. Retrieve data
data = store.get("hello_key")
print(data.decode()) # Output: Hello, Mooncake Store!
# 5. Clean up
store.close()
RDMA device selection: Leave rdma_devices as "" to auto-select RDMA NICs. Provide a comma-separated list (e.g. "mlx5_0,mlx5_1") to pin to specific hardware.
Mooncake selects available ports internally at setup() , so you do not need to fix specific port numbers in these examples. Internally, ports are chosen from a dynamic range (currently 12300–14300).
P2P Hello World (preview)#
The following setup uses the new P2P handshake and does not require an HTTP metadata server. This feature is not released yet; use only if you’re testing the latest code.
from mooncake.store import MooncakeDistributedStore
store = MooncakeDistributedStore()
store.setup(
"localhost", # Your node's ip address
"P2PHANDSHAKE", # P2P handshake (no HTTP metadata)
512*1024*1024, # 512MB segment size
128*1024*1024, # 128MB local buffer
"tcp", # Use TCP (RDMA for high performance)
"", # Leave empty; Mooncake auto-picks RDMA devices when needed
"localhost:50051" # Master service
)
store.put("hello_key", b"Hello, Mooncake Store!")
print(store.get("hello_key").decode())
store.close()
Basic API Usage#
Simple Get/Put Operations#
Click to expand: Complete Get/Put example with NumPy arrays
import numpy as np
import json
from mooncake.store import MooncakeDistributedStore
# 1. Initialize
store = MooncakeDistributedStore()
store.setup("localhost",
"http://localhost:8080/metadata",
512*1024*1024,
128*1024*1024,
"tcp",
"",
"localhost:50051")
print("Store ready.")
# 2. Store data
store.put("config", b'{"model": "llama-7b", "temperature": 0.7}')
model_weights = np.random.randn(1000, 1000).astype(np.float32)
store.put("weights", model_weights.tobytes())
store.put("cache", b"some serialized cache data")
# 3. Retrieve and verify data
config = json.loads(store.get("config").decode())
weights = np.frombuffer(store.get("weights"), dtype=np.float32).reshape(1000, 1000)
print("Config OK:", config["model"])
print("Weights OK, mean =", round(float(weights.mean()), 4))
print("Cache exists?", bool(store.is_exist("cache")))
# 4. Close
store.close()
Zero-Copy API (Advanced Performance)#
For maximum performance, especially with RDMA networks, use the zero-copy API. This allows direct memory access without intermediate copies.
Memory Registration#
⚠️ Important: register_buffer is required for zero-copy RDMA operations. Without proper buffer registration, undefined behavior and memory corruption may occur.
Zero-copy operations require registering memory buffers with the store:
register_buffer()#
Register a memory buffer for direct RDMA access.
unregister_buffer()#
Unregister a previously registered buffer.
Click to expand: Buffer registration example
import numpy as np
from mooncake.store import MooncakeDistributedStore
# Initialize store
store = MooncakeDistributedStore()
store.setup("localhost", "http://localhost:8080/metadata", 512*1024*1024, 128*1024*1024, "tcp", "", "localhost:50051")
# Create a large buffer
buffer = np.zeros(100 * 1024 * 1024, dtype=np.uint8) # 100MB buffer
# Register the buffer for zero-copy operations
buffer_ptr = buffer.ctypes.data
result = store.register_buffer(buffer_ptr, buffer.nbytes)
if result != 0:
print(f"Failed to register buffer: {result}")
raise RuntimeError(f"Failed to register buffer: {result}")
print("Buffer registered successfully.")
store.unregister_buffer(buffer_ptr)
Zero-Copy Operations#
Complete Zero-Copy Workflow#
⚠️ Critical: Always register buffers before zero-copy operations. Failure to register buffers will cause undefined behavior and potential memory corruption.
Here’s a complete example showing the full zero-copy workflow with proper buffer management:
Click to expand: Complete zero-copy workflow example
import numpy as np
from mooncake.store import MooncakeDistributedStore
# Initialize store with RDMA protocol for maximum performance
store = MooncakeDistributedStore()
store.setup("localhost", "http://localhost:8080/metadata", 512*1024*1024, 16*1024*1024, "tcp", "", "localhost:50051")
# Create data to store
original_data = np.random.randn(1000, 1000).astype(np.float32)
buffer_ptr = original_data.ctypes.data
size = original_data.nbytes
# Step 1: Register the buffer
result = store.register_buffer(buffer_ptr, size)
if result != 0:
raise RuntimeError(f"Failed to register buffer: {result}")
# Step 2: Zero-copy store
result = store.put_from("large_tensor", buffer_ptr, size)
if result == 0:
print(f"Successfully stored {size} bytes with zero-copy")
else:
raise RuntimeError(f"Store failed with code: {result}")
# Step 3: Pre-allocate buffer for retrieval
retrieved_data = np.empty((1000, 1000), dtype=np.float32)
recv_buffer_ptr = retrieved_data.ctypes.data
recv_size = retrieved_data.nbytes
# Step 4: Register receive buffer
result = store.register_buffer(recv_buffer_ptr, recv_size)
if result != 0:
raise RuntimeError(f"Failed to register receive buffer: {result}")
# Step 5: Zero-copy retrieval
bytes_read = store.get_into("large_tensor", recv_buffer_ptr, recv_size)
if bytes_read > 0:
print(f"Successfully retrieved {bytes_read} bytes with zero-copy")
# Verify the data
print(f"Data matches: {np.array_equal(original_data, retrieved_data)}")
else:
raise RuntimeError(f"Retrieval failed with code: {bytes_read}")
# Step 6: Clean up - unregister both buffers
store.unregister_buffer(buffer_ptr)
store.unregister_buffer(recv_buffer_ptr)
store.close()
put_from()#
Store data directly from a registered buffer (zero-copy).
def put_from(self, key: str, buffer_ptr: int, size: int, config=None) -> int
Parameters:
key: Object identifierbuffer_ptr: Memory address (from ctypes.data or similar)size: Number of bytes to storeconfig: Optional replication configuration
get_into()#
Retrieve data directly into a registered buffer (zero-copy).
def get_into(self, key: str, buffer_ptr: int, size: int) -> int
Parameters:
key: Object identifier to retrievebuffer_ptr: Memory address of pre-allocated buffersize: Size of the buffer (must be >= object size)
Returns: Number of bytes read, or negative on error
ReplicateConfig Configuration#
The ReplicateConfig class allows you to control data replication behavior when storing objects in Mooncake Store. This configuration is essential for ensuring data reliability, performance optimization, and storage placement control.
Class Definition#
from mooncake.store import ReplicateConfig
# Create a configuration instance
config = ReplicateConfig()
Properties#
replica_num#
Type: int
Default: 1
Description: Specifies the total number of replicas to create for the stored object.
config = ReplicateConfig()
config.replica_num = 3 # Store 3 copies of the data
with_soft_pin#
Type: bool
Default: False
Description: Enables soft pinning for the stored object. Soft pinned objects are prioritized to remain in memory during eviction - they are only evicted when memory is insufficient and no other objects are eligible for eviction. This is useful for frequently accessed or important objects like system prompts.
config = ReplicateConfig()
config.with_soft_pin = True # Keep this object in memory longer
preferred_segment#
Type: str
Default: "" (empty string)
Description: Specifies a preferred segment (node) for data allocation. This is typically the hostname:port of a target server.
config = ReplicateConfig()
# Preferred replica location ("host:port")
config.preferred_segment = "localhost:12345" # pin to a specific machine
# Alternatively, pin to the local host
config.preferred_segment = self.get_hostname()
# Optional: speed up local transfers
# export MC_STORE_MEMCPY=1
prefer_alloc_in_same_node#
Type: str
Default: "" (empty string)
Description: Enables the preference for allocating data on the same node. Currently, this only supports batch_put_from_multi_buffers. Additionally, it does not support disk segments, and the replica_num can only be set to 1.
config = ReplicateConfig()
config.prefer_alloc_in_same_node = "True
Non-Zero-Copy API (Simple Usage)#
For simpler use cases, use the standard API without memory registration:
Basic Operations#
Click to expand: Non-zero-copy API examples
from mooncake.store import MooncakeDistributedStore
# Initialize (same as zero-copy)
store = MooncakeDistributedStore()
store.setup("localhost", "http://localhost:8080/metadata", 512*1024*1024, 128*1024*1024, "tcp", "", "localhost:50051")
# Simple put/get (automatic memory management)
data = b"Hello, World!" * 1000 # ~13KB
store.put("message", data)
retrieved = store.get("message")
print(retrieved == data) # True
# Batch operations
keys = ["key1", "key2", "key3"]
values = [b"value1", b"value2", b"value3"]
store.put_batch(keys, values)
retrieved = store.get_batch(keys)
print("Retrieved all keys successfully:", retrieved == values)
Performance Notes#
Choose the appropriate API based on your use case:
Zero-copy API is beneficial when:
Working with large data transfers
RDMA network infrastructure is available and configured
Direct memory access patterns fit your application design
Non-zero-copy API is suitable for:
Development and prototyping phases
Applications without specific performance requirements
Batch operations can improve throughput for:
Multiple related operations performed together
Scenarios where network round-trip reduction is beneficial
Topology & Devices#
Auto-discovery: Disabled by default. For
protocol="rdma", you must specify RDMA devices.Enable auto-discovery (optional):
MC_MS_AUTO_DISC=1enables auto-discovery; thenrdma_devicesis not required.Optionally restrict candidates with
MC_MS_FILTERS, a comma-separated whitelist of NIC names, e.g.MC_MS_FILTERS=mlx5_0,mlx5_2.If
MC_MS_AUTO_DISCis not set or set to0, auto-discovery remains disabled andrdma_devicesis required for RDMA.
Examples:
# Auto-select with default settings
python - <<'PY'
from mooncake.store import MooncakeDistributedStore as S
s = S()
s.setup("localhost", "http://localhost:8080/metadata", 512*1024*1024, 128*1024*1024, "rdma", "", "localhost:50051")
PY
# Manual device list
unset MC_MS_AUTO_DISC
python - <<'PY'
from mooncake.store import MooncakeDistributedStore as S
s = S()
s.setup("localhost", "http://localhost:8080/metadata", 512*1024*1024, 128*1024*1024, "rdma", "mlx5_0,mlx5_1", "localhost:50051")
PY
# Auto-select with filters
export MC_MS_AUTO_DISC=1
export MC_MS_FILTERS=mlx5_0,mlx5_2
python - <<'PY'
from mooncake.store import MooncakeDistributedStore as S
s = S()
s.setup("localhost", "http://localhost:8080/metadata", 512*1024*1024, 128*1024*1024, "rdma", "", "localhost:50051")
PY
get_buffer Buffer Protocol#
The get_buffer method returns a BufferHandle object that implements the Python buffer protocol:
Click to expand: Buffer protocol usage example
# Get buffer with buffer protocol support
buffer = store.get_buffer("large_object")
if buffer:
# Access as numpy array without copy
import numpy as np
arr = np.array(buffer, copy=False)
# Direct memory access
ptr = buffer.ptr() # Memory address
size = buffer.size() # Buffer size in bytes
# Use with other libraries that accept buffer protocol
print(f"Buffer size: {len(buffer)} bytes")
# The buffer is automatically freed
Full API Reference#
Class: MooncakeDistributedStore#
The main class for interacting with Mooncake Store.
Constructor#
store = MooncakeDistributedStore()
Creates a new store instance. No parameters required.
setup()#
Initialize distributed resources and establish network connections.
def setup(
self,
local_hostname: str,
metadata_server: str,
global_segment_size: int = 16777216,
local_buffer_size: int = 1073741824,
protocol: str = "tcp",
rdma_devices: str = "",
master_server_addr: str,
) -> int
Parameters:
local_hostname(str): Required. Local hostname and port (e.g., “localhost” or “localhost:12345”)metadata_server(str): Required. Metadata server address (e.g., “http://localhost:8080/metadata”)global_segment_size(int): Memory segment size in bytes for mounting (default: 16MB = 16777216)local_buffer_size(int): Local buffer size in bytes (default: 1GB = 1073741824)protocol(str): Network protocol - “tcp” or “rdma” (default: “tcp”)rdma_devices(str): RDMA device name(s), e.g."mlx5_0"or"mlx5_0,mlx5_1". Leave empty to auto-select NICs. Provide device names to pin the NICs. Always empty for TCP.master_server_addr(str): Required. Master server address (e.g., “localhost:50051”)
Returns:
int: Status code (0 = success, non-zero = error code)
Example:
Click to expand: Setup examples for TCP and RDMA
# TCP initialization
store.setup("localhost", "http://localhost:8080/metadata", 1024*1024*1024, 128*1024*1024, "tcp", "", "localhost:50051")
# RDMA auto-detect
store.setup("localhost", "http://localhost:8080/metadata", 512*1024*1024, 128*1024*1024, "rdma", "", "localhost:50051")
# RDMA with explicit device list
store.setup("localhost", "http://localhost:8080/metadata", 512*1024*1024, 128*1024*1024, "rdma", "mlx5_0,mlx5_1", "localhost:50051")
setup_dummy()#
Initialize the store with a dummy client for testing purposes.
def setup_dummy(self, mem_pool_size: int, local_buffer_size: int, server_address: str) -> int
Parameters:
mem_pool_size(int): Memory pool size in byteslocal_buffer_size(int): Local buffer size in bytesserver_address(str): Server address in format “hostname:port”
Returns:
int: Status code (0 = success, non-zero = error code)
Example:
# Initialize with dummy client
store.setup_dummy(1024*1024*256, 1024*1024*64, "localhost:8080")
put()#
Store binary data in the distributed storage.
def put(self, key: str, value: bytes, config: ReplicateConfig = None) -> int
Parameters:
key(str): Unique object identifiervalue(bytes): Binary data to storeconfig(ReplicateConfig, optional): Replication configuration
Returns:
int: Status code (0 = success, non-zero = error code)
Example:
Click to expand: Put operation examples
# Simple put
store.put("my_key", b"Hello, World!")
# Put with replication config
config = ReplicateConfig()
config.replica_num = 2
store.put("important_data", b"Critical information", config)
get()#
Retrieve binary data from distributed storage.
def get(self, key: str) -> bytes
Parameters:
key(str): Object identifier to retrieve
Returns:
bytes: Retrieved binary data
Raises:
Returns empty bytes if key doesn’t exist
Example:
Click to expand: Get operation example
data = store.get("my_key")
if data:
print(f"Retrieved: {data.decode()}")
else:
print("Key not found")
put_batch()#
Store multiple objects in a single batch operation.
def put_batch(self, keys: List[str], values: List[bytes], config: ReplicateConfig = None) -> int
Parameters:
keys(List[str]): List of object identifiersvalues(List[bytes]): List of binary data to storeconfig(ReplicateConfig, optional): Replication configuration for all objects
Returns:
int: Status code (0 = success, non-zero = error code)
Example:
Click to expand: Batch put example
keys = ["key1", "key2", "key3"]
values = [b"value1", b"value2", b"value3"]
result = store.put_batch(keys, values)
get_batch()#
Retrieve multiple objects in a single batch operation.
def get_batch(self, keys: List[str]) -> List[bytes]
Parameters:
keys(List[str]): List of object identifiers to retrieve
Returns:
List[bytes]: List of retrieved binary data
Example:
Click to expand: Batch get example
keys = ["key1", "key2", "key3"]
values = store.get_batch(keys)
for key, value in zip(keys, values):
print(f"{key}: {len(value)} bytes")
remove()#
Delete an object from the storage system.
def remove(self, key: str) -> int
Parameters:
key(str): Object identifier to remove
Returns:
int: Status code (0 = success, non-zero = error code)
Example:
result = store.remove("my_key")
if result == 0:
print("Successfully removed")
remove_by_regex()#
Remove objects from the storage system whose keys match a regular expression.
def remove_by_regex(self, regex: str) -> int
Parameters:
regex(str): The regular expression to match against object keys.
Returns:
int: The number of objects removed, or a negative value on error.
Example:
# Remove all keys starting with "user_session_"
count = store.remove_by_regex("^user_session_.*")
if count >= 0:
print(f"Removed {count} objects")
remove_all()#
Remove all objects from the storage system.
def remove_all(self) -> int
Returns:
int: Number of objects removed, or -1 on error
Example:
count = store.remove_all()
print(f"Removed {count} objects")
is_exist()#
Check if an object exists in the storage system.
def is_exist(self, key: str) -> int
Parameters:
key(str): Object identifier to check
Returns:
int:1: Object exists0: Object doesn’t exist-1: Error occurred
Example:
exists = store.is_exist("my_key")
if exists == 1:
print("Object exists")
elif exists == 0:
print("Object not found")
else:
print("Error checking existence")
batch_is_exist()#
Check existence of multiple objects in a single batch operation.
def batch_is_exist(self, keys: List[str]) -> List[int]
Parameters:
keys(List[str]): List of object identifiers to check
Returns:
List[int]: List of existence results (1=exists, 0=not exists, -1=error)
Example:
keys = ["key1", "key2", "key3"]
results = store.batch_is_exist(keys)
for key, exists in zip(keys, results):
status = "exists" if exists == 1 else "not found" if exists == 0 else "error"
print(f"{key}: {status}")
get_size()#
Get the size of a stored object in bytes.
def get_size(self, key: str) -> int
Parameters:
key(str): Object identifier
Returns:
int: Size in bytes, or negative value on error
Example:
size = store.get_size("my_key")
if size >= 0:
print(f"Object size: {size} bytes")
else:
print("Error getting size or object not found")
get_buffer()#
Get object data as a buffer that implements Python’s buffer protocol.
def get_buffer(self, key: str) -> BufferHandle
Parameters:
key(str): Object identifier
Returns:
BufferHandle: Buffer object or None if not found
Example:
buffer = store.get_buffer("large_object")
if buffer:
print(f"Buffer size: {buffer.size()} bytes")
# Use with numpy without copying
import numpy as np
arr = np.array(buffer, copy=False)
put_parts()#
Store data from multiple buffer parts as a single object.
def put_parts(self, key: str, *parts, config: ReplicateConfig = None) -> int
Parameters:
key(str): Object identifier*parts: Variable number of bytes-like objects to concatenateconfig(ReplicateConfig, optional): Replication configuration
Returns:
int: Status code (0 = success, non-zero = error code)
Example:
part1 = b"Hello, "
part2 = b"World!"
part3 = b" From Mooncake"
result = store.put_parts("greeting", part1, part2, part3)
batch_get_buffer()#
Get multiple objects as buffers that implement Python’s buffer protocol.
def batch_get_buffer(self, keys: List[str]) -> List[BufferHandle]
Parameters:
keys(List[str]): List of object identifiers to retrieve
Returns:
List[BufferHandle]: List of buffer objects, with None for keys not found
Note: This function is not supported for dummy client.
Example:
buffers = store.batch_get_buffer(["key1", "key2", "key3"])
for i, buffer in enumerate(buffers):
if buffer:
print(f"Buffer {i} size: {buffer.size()} bytes")
alloc_from_mem_pool()#
Allocate memory from the memory pool.
def alloc_from_mem_pool(self, size: int) -> int
Parameters:
size(int): Size of memory to allocate in bytes
Returns:
int: Memory address as integer, or 0 on failure
init_all()#
Initialize all resources with specified protocol and device.
def init_all(self, protocol: str, device_name: str, mount_segment_size: int = 16777216) -> int
Parameters:
protocol(str): Network protocol - “tcp” or “rdma”device_name(str): Device name for the protocolmount_segment_size(int): Memory segment size in bytes for mounting (default: 16MB = 16777216)
Returns:
int: Status code (0 = success, non-zero = error code)
get_hostname()#
Get the hostname of the current store instance.
def get_hostname(self) -> str
Returns:
str: Hostname and port of this store instance
Example:
hostname = store.get_hostname()
print(f"Store running on: {hostname}")
get_replica_desc()#
Get descriptors of replicas for a key.
def get_replica_desc(self, key: str) -> List[Replica::Descriptor]
Parameters:
key(str): mooncake store key
Returns:
List[Replica::Descriptor]: List of replica descriptors
Example:
descriptors = store.get_replica_desc("mooncake_key")
for desc in descriptors:
print("Status:", desc.status)
if desc.is_memory_replica():
mem_desc = desc.get_memory_descriptor()
print("Memory buffer desc:", mem_desc.buffer_descriptor)
elif desc.is_disk_replica():
disk_desc = desc.get_disk_descriptor()
print("Disk path:", disk_desc.file_path, "Size:", disk_desc.object_size)
batch_get_replica_desc()#
Get descriptors of replicas for a tuple of keys.
def batch_get_replica_desc(self, keys: List[str]) -> Dict[str, List[Replica::Descriptor]]
Parameters:
keys(List[str]): List of mooncake store keys
Returns:
Dict[str, List[Replica::Descriptor]]: Dictionary mapping keys to their list of replica descriptors
Example:
descriptors_map = store.batch_get_replica_desc(["key1", "key2"])
for key, desc_list in descriptors_map.items():
print(f"Replicas for key: {key}")
for desc in desc_list:
if desc.is_memory_replica():
mem_desc = desc.get_memory_descriptor()
print("Memory buffer desc:", mem_desc.buffer_descriptor)
elif desc.is_disk_replica():
disk_desc = desc.get_disk_descriptor()
print("Disk path:", disk_desc.file_path, "Size:", disk_desc.object_size)
close()#
Clean up all resources and terminate connections.
def close(self) -> int
Returns:
int: Status code (0 = success, non-zero = error code)
Example:
store.close()
put_from_with_metadata()#
Store data directly from a registered buffer with metadata (zero-copy).
def put_from_with_metadata(self, key: str, buffer_ptr: int, metadata_buffer_ptr: int, size: int, metadata_size: int, config: ReplicateConfig = None) -> int
Parameters:
key(str): Object identifierbuffer_ptr(int): Memory address of the main data buffer (from ctypes.data or similar)metadata_buffer_ptr(int): Memory address of the metadata buffersize(int): Number of bytes for the main datametadata_size(int): Number of bytes for the metadataconfig(ReplicateConfig, optional): Replication configuration
Returns:
int: Status code (0 = success, non-zero = error code)
Note: This function is not supported for dummy client.
Example:
import numpy as np
# Create data and metadata
data = np.random.randn(1000).astype(np.float32)
metadata = np.array([42, 100], dtype=np.int32) # example metadata
# Register buffers
data_ptr = data.ctypes.data
metadata_ptr = metadata.ctypes.data
store.register_buffer(data_ptr, data.nbytes)
store.register_buffer(metadata_ptr, metadata.nbytes)
# Store with metadata
result = store.put_from_with_metadata("data_with_metadata", data_ptr, metadata_ptr,
data.nbytes, metadata.nbytes)
if result == 0:
print("Data with metadata stored successfully")
# Cleanup
store.unregister_buffer(data_ptr)
store.unregister_buffer(metadata_ptr)
pub_tensor()#
Publish a PyTorch tensor with configurable replication settings.
def pub_tensor(self, key: str, tensor: torch.Tensor, config: ReplicateConfig = None) -> int
Parameters:
key(str): Unique object identifiertensor(torch.Tensor): PyTorch tensor to storeconfig(ReplicateConfig, optional): Replication configuration
Returns:
int: Status code (0 = success, non-zero = error code)
Note: This function requires torch to be installed and available in the environment.
Example:
import torch
from mooncake.store import ReplicateConfig
# Create a tensor
tensor = torch.randn(100, 100)
# Create replication config
config = ReplicateConfig()
config.replica_num = 3
config.with_soft_pin = True
# Publish tensor with replication settings
result = store.pub_tensor("my_tensor", tensor, config)
if result == 0:
print("Tensor published successfully")
PyTorch Tensor Operations (Tensor Parallelism)#
These methods provide direct support for storing and retrieving PyTorch tensors. They automatically handle serialization and metadata, and include built-in support for Tensor Parallelism (TP) by automatically splitting and reconstructing tensor shards.
⚠️ Note: These methods require torch to be installed and available in the environment.
put_tensor_with_tp()#
Put a PyTorch tensor into the store, optionally splitting it into shards for tensor parallelism.
The tensor is chunked immediately and stored as separate keys (e.g., key_tp_0, key_tp_1…).
def put_tensor_with_tp(self, key: str, tensor: torch.Tensor, tp_rank: int = 0, tp_size: int = 1, split_dim: int = 0) -> int
Parameters:
key(str): Base identifier for the tensor.tensor(torch.Tensor): The PyTorch tensor to store.tp_rank(int): Current tensor parallel rank (default: 0). Note: The method splits and stores all chunks for all ranks regardless of this value.tp_size(int): Total tensor parallel size (default: 1). If > 1, the tensor is split intotp_sizechunks.split_dim(int): The dimension to split the tensor along (default: 0).
Returns:
int: Status code (0 = success, non-zero = error code).
pub_tensor_with_tp()#
Publish a PyTorch tensor into the store with configurable replication settings, optionally splitting it into shards for tensor parallelism.
The tensor is chunked immediately and stored as separate keys (e.g., key_tp_0, key_tp_1…).
def pub_tensor_with_tp(self, key: str, tensor: torch.Tensor, config: ReplicateConfig, tp_rank: int = 0, tp_size: int = 1, split_dim: int = 0) -> int
Parameters:
key(str): Base identifier for the tensor.tensor(torch.Tensor): The PyTorch tensor to store.config(ReplicateConfig): Optional replication configuration.tp_rank(int): Current tensor parallel rank (default: 0). Note: The method splits and stores all chunks for all ranks regardless of this value.tp_size(int): Total tensor parallel size (default: 1). If > 1, the tensor is split intotp_sizechunks.split_dim(int): The dimension to split the tensor along (default: 0).
Returns:
int: Status code (0 = success, non-zero = error code).
get_tensor_with_tp()#
Get a PyTorch tensor from the store, specifically retrieving the shard corresponding to the given Tensor Parallel rank.
def get_tensor_with_tp(self, key: str, tp_rank: int = 0, tp_size: int = 1, split_dim: int = 0) -> torch.Tensor
Parameters:
key(str): Base identifier of the tensor.tp_rank(int): The tensor parallel rank to retrieve (default: 0). Fetches keykey_tp_{rank}iftp_size > 1.tp_size(int): Total tensor parallel size (default: 1).split_dim(int): The dimension used during splitting (default: 0).
Returns:
torch.Tensor: The retrieved tensor (or shard). ReturnsNoneif not found.
batch_put_tensor_with_tp()#
Put a batch of PyTorch tensors into the store, splitting each into shards for tensor parallelism.
def batch_put_tensor_with_tp(self, base_keys: List[str], tensors_list: List[torch.Tensor], tp_rank: int = 0, tp_size: int = 1, split_dim: int = 0) -> List[int]
Parameters:
base_keys(List[str]): List of base identifiers.tensors_list(List[torch.Tensor]): List of tensors to store.tp_rank(int): Current rank (default: 0).tp_size(int): Total TP size (default: 1).split_dim(int): Split dimension (default: 0).
Returns:
List[int]: List of status codes for each tensor operation.
batch_pub_tensor_with_tp()#
Publish a batch of PyTorch tensors into the store with configurable replication settings, splitting each into shards for tensor parallelism.
def batch_pub_tensor_with_tp(self, base_keys: List[str], tensors_list: List[torch.Tensor], config: ReplicateConfig, tp_rank: int = 0, tp_size: int = 1, split_dim: int = 0) -> List[int]
Parameters:
base_keys(List[str]): List of base identifiers.tensors_list(List[torch.Tensor]): List of tensors to store.config(ReplicateConfig): Optional replication configuration.tp_rank(int): Current rank (default: 0).tp_size(int): Total tp size (default: 1).split_dim(int): Split dimension (default: 0).
Returns:
List[int]: List of status codes for each tensor operation.
batch_get_tensor_with_tp()#
Get a batch of PyTorch tensor shards from the store for a given Tensor Parallel rank.
def batch_get_tensor_with_tp(self, base_keys: List[str], tp_rank: int = 0, tp_size: int = 1) -> List[torch.Tensor]
Parameters:
base_keys(List[str]): List of base identifiers.tp_rank(int): The tensor parallel rank to retrieve (default: 0).tp_size(int): Total tensor parallel size (default: 1).
Returns:
List[torch.Tensor]: List of retrieved tensors (or shards). ContainsNonefor missing keys.
put_tensor()#
Put a PyTorch tensor into the store.
def put_tensor(self, key: str, tensor: torch.Tensor) -> int
Parameters:
key(str): Object identifiertensor(torch.Tensor): The PyTorch tensor to store
Returns:
int: Status code (0 = success, non-zero = error code)
Note: This function requires torch to be installed and available in the environment.
Example:
import torch
from mooncake.store import MooncakeDistributedStore
store = MooncakeDistributedStore()
store.setup("localhost", "http://localhost:8080/metadata", 512*1024*1024, 128*1024*1024, "tcp", "", "localhost:50051")
# Store a tensor
tensor = torch.randn(100, 100)
result = store.put_tensor("my_tensor", tensor)
if result == 0:
print("Tensor stored successfully")
get_tensor()#
Get a PyTorch tensor from the store.
def get_tensor(self, key: str) -> torch.Tensor
Parameters:
key(str): Object identifier to retrieve
Returns:
torch.Tensor: The retrieved tensor. ReturnsNoneif not found.
Note: This function requires torch to be installed and available in the environment.
Example:
import torch
from mooncake.store import MooncakeDistributedStore
store = MooncakeDistributedStore()
store.setup("localhost", "http://localhost:8080/metadata", 512*1024*1024, 128*1024*1024, "tcp", "", "localhost:50051")
# Store a tensor
tensor = torch.randn(100, 100)
store.put_tensor("my_tensor", tensor)
# Retrieve the tensor
retrieved_tensor = store.get_tensor("my_tensor")
if retrieved_tensor is not None:
print(f"Retrieved tensor with shape: {retrieved_tensor.shape}")
batch_get_tensor()#
Get a batch of PyTorch tensors from the store.
def batch_get_tensor(self, keys: List[str]) -> List[torch.Tensor]
Parameters:
keys(List[str]): List of object identifiers to retrieve
Returns:
List[torch.Tensor]: List of retrieved tensors. ContainsNonefor missing keys.
Note: This function requires torch to be installed and available in the environment.
Example:
import torch
from mooncake.store import MooncakeDistributedStore
store = MooncakeDistributedStore()
store.setup("localhost", "http://localhost:8080/metadata", 512*1024*1024, 128*1024*1024, "tcp", "", "localhost:50051")
# Store tensors
tensor1 = torch.randn(100, 100)
tensor2 = torch.randn(50, 50)
store.put_tensor("tensor1", tensor1)
store.put_tensor("tensor2", tensor2)
# Retrieve multiple tensors
tensors = store.batch_get_tensor(["tensor1", "tensor2", "nonexistent"])
for i, tensor in enumerate(tensors):
if tensor is not None:
print(f"Tensor {i} shape: {tensor.shape}")
else:
print(f"Tensor {i} not found")
batch_put_tensor()#
Put a batch of PyTorch tensors into the store.
def batch_put_tensor(self, keys: List[str], tensors_list: List[torch.Tensor]) -> List[int]
Parameters:
keys(List[str]): List of object identifierstensors_list(List[torch.Tensor]): List of tensors to store
Returns:
List[int]: List of status codes for each tensor operation.
Note: This function requires torch to be installed and available in the environment.
Example:
import torch
from mooncake.store import MooncakeDistributedStore
store = MooncakeDistributedStore()
store.setup("localhost", "http://localhost:8080/metadata", 512*1024*1024, 128*1024*1024, "tcp", "", "localhost:50051")
# Create tensors
tensors = [torch.randn(100, 100), torch.randn(50, 50), torch.randn(25, 25)]
keys = ["tensor1", "tensor2", "tensor3"]
# Store multiple tensors
results = store.batch_put_tensor(keys, tensors)
for i, result in enumerate(results):
if result == 0:
print(f"Tensor {i} stored successfully")
else:
print(f"Tensor {i} failed to store with code: {result}")
batch_pub_tensor()#
Pub a batch of PyTorch tensors into the store with configurable replication settings.
def batch_pub_tensor(self, keys: List[str], tensors_list: List[torch.Tensor], config: ReplicateConfig) -> List[int]
Parameters:
keys(List[str]): List of object identifierstensors_list(List[torch.Tensor]): List of tensors to storeconfig(ReplicateConfig): Optional replication configuration.
Returns:
List[int]: List of status codes for each tensor operation.
Note: This function requires torch to be installed and available in the environment.
PyTorch Tensor Operations (Zero Copy)#
These methods provide direct support for storing and retrieving PyTorch tensors. They automatically handle serialization and metadata, and include built-in support for Tensor Parallelism (TP) by automatically splitting and reconstructing tensor shards.
⚠️ Note: These methods require torch to be installed and available in the environment.
get_tensor_into()#
Get a PyTorch tensor from the store directly into a pre-allocated buffer.
def get_tensor_into(self, key: str, buffer_ptr: int, size: int) -> torch.Tensor
Parameters:
key(str): Base identifier of the tensor.buffer_ptr(int): The buffer pointer pre-allocated for tensor, and the buffer should be registered.size(int): The size of buffer.
Returns:
torch.Tensor: The retrieved tensor (or shard). ReturnsNoneif not found.
batch_get_tensor()#
Get a batch of PyTorch tensor from the store directly into a pre-allocated buffer.
def batch_get_tensor_into(self, base_keys: List[str], buffer_ptrs: List[int], sizes: List[int]) -> List[torch.Tensor]
Parameters:
base_keys(List[str]): List of base identifiers.buffer_ptrs(List[int]): List of the buffers pointer pre-allocated for tensor, and the buffers should be registered.sizes(List[int]): List of the size of buffers.
Returns:
List[torch.Tensor]: List of retrieved tensors (or shards). ContainsNonefor missing keys.
get_tensor_with_tp_into()#
Get a PyTorch tensor from the store, specifically retrieving the shard corresponding to the given Tensor Parallel rank, directly into the pre-allocated buffer.
def get_tensor_with_tp_into(self, key: str, buffer_ptr: int, size: int, tp_rank: int = 0, tp_size: int = 1, split_dim: int = 0) -> torch.Tensor
Parameters:
key(str): Base identifier of the tensor.buffer_ptr(int): The buffer pointer pre-allocated for tensor, and the buffer should be registered.size(int): The size of buffer.tp_rank(int): The tensor parallel rank to retrieve (default: 0). Fetches keykey_tp_{rank}iftp_size > 1.tp_size(int): Total tensor parallel size (default: 1).split_dim(int): The dimension used during splitting (default: 0).
Returns:
torch.Tensor: The retrieved tensor (or shard). ReturnsNoneif not found.
batch_get_tensor_with_tp_into()#
Get a batch of PyTorch tensor shards from the store for a given Tensor Parallel rank, directly into the pre-allocated buffer.
def batch_get_tensor_with_tp_into(self, base_keys: List[str], buffer_ptrs: List[int], sizes: List[int], tp_rank: int = 0, tp_size: int = 1) -> List[torch.Tensor]
Parameters:
base_keys(List[str]): List of base identifiers.buffer_ptrs(List[int]): List of the buffers pointer pre-allocated for tensor, and the buffers should be registered.sizes(List[int]): List of the size of buffers.tp_rank(int): The tensor parallel rank to retrieve (default: 0).tp_size(int): Total tensor parallel size (default: 1).
Returns:
List[torch.Tensor]: List of retrieved tensors (or shards). ContainsNonefor missing keys.
Batch Zero-Copy Operations#
batch_put_from()#
Store multiple objects from pre-registered buffers (zero-copy).
def batch_put_from(self, keys: List[str], buffer_ptrs: List[int], sizes: List[int], config: ReplicateConfig = None) -> List[int]
Parameters:
keys(List[str]): List of object identifiersbuffer_ptrs(List[int]): List of memory addressessizes(List[int]): List of buffer sizesconfig(ReplicateConfig, optional): Replication configuration
Returns:
List[int]: List of status codes for each operation (0 = success, negative = error)
batch_get_into()#
Retrieve multiple objects into pre-registered buffers (zero-copy).
def batch_get_into(self, keys: List[str], buffer_ptrs: List[int], sizes: List[int]) -> List[int]
Parameters:
keys(List[str]): List of object identifiersbuffer_ptrs(List[int]): List of memory addressessizes(List[int]): List of buffer sizes
Returns:
List[int]: List of bytes read for each operation (positive = success, negative = error)
⚠️ Buffer Registration Required: All buffers must be registered before batch zero-copy operations.
Example:
Click to expand: Batch zero-copy retrieval example
# Prepare buffers
keys = ["tensor1", "tensor2", "tensor3"]
buffer_size = 1024 * 1024 # 1MB each
buffers = []
buffer_ptrs = []
for i in range(len(keys)):
buffer = np.empty(buffer_size, dtype=np.uint8)
buffers.append(buffer)
buffer_ptrs.append(buffer.ctypes.data)
store.register_buffer(buffer.ctypes.data, buffer_size)
# Batch retrieve
sizes = [buffer_size] * len(keys)
results = store.batch_get_into(keys, buffer_ptrs, sizes)
# Check results
for key, result in zip(keys, results):
if result > 0:
print(f"Retrieved {key}: {result} bytes")
else:
print(f"Failed to retrieve {key}: error {result}")
# Cleanup
for ptr in buffer_ptrs:
store.unregister_buffer(ptr)
batch_put_from_multi_buffers()#
Store multiple objects from multiple pre-registered buffers (zero-copy).
def batch_put_from_multi_buffers(self, keys: List[str], all_buffer_ptrs: List[List[int]], all_sizes: List[List[int]],
config: ReplicateConfig = None) -> List[int]
Parameters:
keys(List[str]): List of object identifiersall_buffer_ptrs(List[int]): all List of memory addressessizes(List[int]): all List of buffer sizesconfig(ReplicateConfig, optional): Replication configuration
Returns:
List[int]: List of status codes for each operation (0 = success, negative = error)
batch_get_into_multi_buffers()#
Retrieve multiple objects into multiple pre-registered buffers (zero-copy).
def batch_get_into_multi_buffers(self, keys: List[str], all_buffer_ptrs: List[List[int], all_sizes: List[List[int]) ->
List[int]
Parameters:
keys(List[str]): List of object identifiersall_buffer_ptrs(List[int]): List of memory addressesall_sizes(List[int]): List of buffer sizes
Returns:
List[int]: List of bytes read for each operation (positive = success, negative = error)
⚠️ Buffer Registration Required: All buffers must be registered before batch zero-copy operations.
Example:
Click to expand: Batch zero-copy put and get for multiple buffers example
tensor = torch.ones(10, 61, 128*1024, dtype=torch.int8)
data_ptr = tensor.data_ptr()
store.register_buffer(data_ptr, 10*61*128*1024)
target_tensor = torch.zeros(10, 61, 128*1024, dtype=torch.int8)
target_data_ptr = target_tensor.data_ptr()
store.register_buffer(target_data_ptr, 10*61*128*1024)
all_local_addrs = []
all_remote_addrs = []
all_sizes = []
keys = []
for block_i in range(10):
local_addrs = []
remote_addrs = []
sizes = []
for _ in range(61):
local_addrs.append(data_ptr)
remote_addrs.append(target_data_ptr)
sizes.append(128*1024)
data_ptr += 128*1024
target_data_ptr += 128*1024
all_local_addrs.append(local_addrs)
all_remote_addrs.append(remote_addrs)
all_sizes.append(sizes)
keys.append(f"kv_{rank}_{block_i}")
config = ReplicateConfig()
config.prefer_alloc_in_same_node = True
store.batch_put_from_multi_buffers(keys, all_local_addrs, all_sizes, config)
store.batch_get_into_multi_buffers(keys, all_remote_addrs, all_sizes, True)
store.unregister_buffer(tensor.data_ptr())
store.unregister_buffer(target_tensor.data_ptr())
MooncakeHostMemAllocator Class#
The MooncakeHostMemAllocator class provides host memory allocation capabilities for Mooncake Store operations.
Class Definition#
from mooncake.store import MooncakeHostMemAllocator
# Create an allocator instance
allocator = MooncakeHostMemAllocator()
Methods#
alloc()#
Allocate memory from the host memory pool.
def alloc(self, size: int) -> int
Parameters:
size(int): Size of memory to allocate in bytes
Returns:
int: Memory address as integer, or 0 on failure
Example:
allocator = MooncakeHostMemAllocator()
ptr = allocator.alloc(1024 * 1024) # Allocate 1MB
if ptr != 0:
print(f"Allocated memory at address: {ptr}")
free()#
Free previously allocated memory.
def free(self, ptr: int) -> int
Parameters:
ptr(int): Memory address to free
Returns:
int: Status code (0 = success, non-zero = error code)
Example:
result = allocator.free(ptr)
if result == 0:
print("Memory freed successfully")
bind_to_numa_node Function#
The bind_to_numa_node function binds the current thread and memory allocation preference to a specified NUMA node.
Function Definition#
from mooncake.store import bind_to_numa_node
# Bind to NUMA node
bind_to_numa_node(node: int)
Parameters:
node(int): NUMA node number to bind to
Example:
from mooncake.store import bind_to_numa_node
# Bind current thread to NUMA node 0
bind_to_numa_node(0)
Error Handling#
Most methods return integer status codes:
0: SuccessNegative values: Error codes (for methods that can return data size)
For methods that return data (get, get_batch, get_buffer, get_tensor):
Return the requested data on success
Return empty/None on failure or key not found
📋 Complete Error Codes Reference: See Error Code Explanation for detailed descriptions of all error codes and their meanings.
Performance Tips#
Use batch operations when working with multiple objects to reduce network overhead
Use zero-copy APIs (
put_from,get_into) for large data transfersRegister buffers once and reuse them for multiple operations
Configure replication appropriately - more replicas provide better availability but use more storage
Use soft pinning for frequently accessed objects to keep them in memory
Choose RDMA protocol when available for maximum performance