Mooncake Store Python API#
Installation#
PyPI Package#
Install the Mooncake Transfer Engine package from PyPI, which includes both Mooncake Transfer Engine and Mooncake Store Python bindings:
pip install mooncake-transfer-engine
📦 Package Details: https://pypi.org/project/mooncake-transfer-engine/
Required Service#
Only one service is required now:
mooncake_master— Master service which now embeds the HTTP metadata server
Quick Start#
Start Master (with HTTP enabled)#
Enable the built-in HTTP metadata server when starting the master:
mooncake_master \
--enable_http_metadata_server=true \
--http_metadata_server_host=0.0.0.0 \
--http_metadata_server_port=8080
This exposes the metadata endpoint at http://<host>:<port>/metadata.
Hello World Example#
from mooncake.store import MooncakeDistributedStore
# 1. Create store instance
store = MooncakeDistributedStore()
# 2. Setup with all required parameters
store.setup(
"localhost", # Your node's address
"http://localhost:8080/metadata", # HTTP metadata server
512*1024*1024, # 512MB segment size
128*1024*1024, # 128MB local buffer
"tcp", # Use TCP (RDMA for high performance)
"", # Leave empty; Mooncake auto-picks RDMA devices when needed
"localhost:50051" # Master service
)
# 3. Store data
store.put("hello_key", b"Hello, Mooncake Store!")
# 4. Retrieve data
data = store.get("hello_key")
print(data.decode()) # Output: Hello, Mooncake Store!
# 5. Clean up
store.close()
RDMA device selection: Leave rdma_devices as "" to auto-select RDMA NICs. Provide a comma-separated list (e.g. "mlx5_0,mlx5_1") to pin to specific hardware.
Mooncake selects available ports internally at setup() , so you do not need to fix specific port numbers in these examples. Internally, ports are chosen from a dynamic range (currently 12300–14300).
P2P Hello World (preview)#
The following setup uses the new P2P handshake and does not require an HTTP metadata server. This feature is not released yet; use only if you’re testing the latest code.
from mooncake.store import MooncakeDistributedStore
store = MooncakeDistributedStore()
store.setup(
"localhost", # Your node's ip address
"P2PHANDSHAKE", # P2P handshake (no HTTP metadata)
512*1024*1024, # 512MB segment size
128*1024*1024, # 128MB local buffer
"tcp", # Use TCP (RDMA for high performance)
"", # Leave empty; Mooncake auto-picks RDMA devices when needed
"localhost:50051" # Master service
)
store.put("hello_key", b"Hello, Mooncake Store!")
print(store.get("hello_key").decode())
store.close()
Basic API Usage#
Simple Get/Put Operations#
Click to expand: Complete Get/Put example with NumPy arrays
import numpy as np
import json
from mooncake.store import MooncakeDistributedStore
# 1. Initialize
store = MooncakeDistributedStore()
store.setup("localhost",
"http://localhost:8080/metadata",
512*1024*1024,
128*1024*1024,
"tcp",
"",
"localhost:50051")
print("Store ready.")
# 2. Store data
store.put("config", b'{"model": "llama-7b", "temperature": 0.7}')
model_weights = np.random.randn(1000, 1000).astype(np.float32)
store.put("weights", model_weights.tobytes())
store.put("cache", b"some serialized cache data")
# 3. Retrieve and verify data
config = json.loads(store.get("config").decode())
weights = np.frombuffer(store.get("weights"), dtype=np.float32).reshape(1000, 1000)
print("Config OK:", config["model"])
print("Weights OK, mean =", round(float(weights.mean()), 4))
print("Cache exists?", bool(store.is_exist("cache")))
# 4. Close
store.close()
Zero-Copy API (Advanced Performance)#
For maximum performance, especially with RDMA networks, use the zero-copy API. This allows direct memory access without intermediate copies.
Memory Registration#
⚠️ Important: register_buffer is required for zero-copy RDMA operations. Without proper buffer registration, undefined behavior and memory corruption may occur.
Zero-copy operations require registering memory buffers with the store:
register_buffer()#
Register a memory buffer for direct RDMA access.
unregister_buffer()#
Unregister a previously registered buffer.
Click to expand: Buffer registration example
import numpy as np
from mooncake.store import MooncakeDistributedStore
# Initialize store
store = MooncakeDistributedStore()
store.setup("localhost", "http://localhost:8080/metadata", 512*1024*1024, 128*1024*1024, "tcp", "", "localhost:50051")
# Create a large buffer
buffer = np.zeros(100 * 1024 * 1024, dtype=np.uint8) # 100MB buffer
# Register the buffer for zero-copy operations
buffer_ptr = buffer.ctypes.data
result = store.register_buffer(buffer_ptr, buffer.nbytes)
if result != 0:
print(f"Failed to register buffer: {result}")
raise RuntimeError(f"Failed to register buffer: {result}")
print("Buffer registered successfully.")
store.unregister_buffer(buffer_ptr)
Zero-Copy Operations#
Complete Zero-Copy Workflow#
⚠️ Critical: Always register buffers before zero-copy operations. Failure to register buffers will cause undefined behavior and potential memory corruption.
Here’s a complete example showing the full zero-copy workflow with proper buffer management:
Click to expand: Complete zero-copy workflow example
import numpy as np
from mooncake.store import MooncakeDistributedStore
# Initialize store with RDMA protocol for maximum performance
store = MooncakeDistributedStore()
store.setup("localhost", "http://localhost:8080/metadata", 512*1024*1024, 16*1024*1024, "tcp", "", "localhost:50051")
# Create data to store
original_data = np.random.randn(1000, 1000).astype(np.float32)
buffer_ptr = original_data.ctypes.data
size = original_data.nbytes
# Step 1: Register the buffer
result = store.register_buffer(buffer_ptr, size)
if result != 0:
raise RuntimeError(f"Failed to register buffer: {result}")
# Step 2: Zero-copy store
result = store.put_from("large_tensor", buffer_ptr, size)
if result == 0:
print(f"Successfully stored {size} bytes with zero-copy")
else:
raise RuntimeError(f"Store failed with code: {result}")
# Step 3: Pre-allocate buffer for retrieval
retrieved_data = np.empty((1000, 1000), dtype=np.float32)
recv_buffer_ptr = retrieved_data.ctypes.data
recv_size = retrieved_data.nbytes
# Step 4: Register receive buffer
result = store.register_buffer(recv_buffer_ptr, recv_size)
if result != 0:
raise RuntimeError(f"Failed to register receive buffer: {result}")
# Step 5: Zero-copy retrieval
bytes_read = store.get_into("large_tensor", recv_buffer_ptr, recv_size)
if bytes_read > 0:
print(f"Successfully retrieved {bytes_read} bytes with zero-copy")
# Verify the data
print(f"Data matches: {np.array_equal(original_data, retrieved_data)}")
else:
raise RuntimeError(f"Retrieval failed with code: {bytes_read}")
# Step 6: Clean up - unregister both buffers
store.unregister_buffer(buffer_ptr)
store.unregister_buffer(recv_buffer_ptr)
store.close()
put_from()#
Store data directly from a registered buffer (zero-copy).
def put_from(self, key: str, buffer_ptr: int, size: int, config=None) -> int
Parameters:
key: Object identifierbuffer_ptr: Memory address (from ctypes.data or similar)size: Number of bytes to storeconfig: Optional replication configuration
get_into()#
Retrieve data directly into a registered buffer (zero-copy).
def get_into(self, key: str, buffer_ptr: int, size: int) -> int
Parameters:
key: Object identifier to retrievebuffer_ptr: Memory address of pre-allocated buffersize: Size of the buffer (must be >= object size)
Returns: Number of bytes read, or negative on error
get_into_ranges()#
Retrieve multiple byte ranges from multiple objects into registered buffers (zero-copy).
def get_into_ranges(self, buffer_ptrs: List[int], all_keys: List[List[str]], all_dst_offsets: List[List[List[int]]], all_src_offsets: List[List[List[int]]], all_sizes: List[List[List[int]]]) -> List[List[List[int]]]
This API is buffer-major and supports multiple fragments per key.
Think of the input shape as:
buffer_ptrs[i]: thei-th destination bufferall_keys[i][j]: thej-th key that writes into bufferiall_dst_offsets[i][j][k]: destination offset of fragmentkfor keyjin bufferiall_src_offsets[i][j][k]: source offset of fragmentkinside keyjfor bufferiall_sizes[i][j][k]: byte size of fragmentk
For each triple (i, j, k), Mooncake reads the source range
[all_src_offsets[i][j][k], all_src_offsets[i][j][k] + all_sizes[i][j][k])
from object all_keys[i][j], then writes it into destination buffer
buffer_ptrs[i] at offset all_dst_offsets[i][j][k].
This lets one buffer gather interleaved fragments from multiple keys, and lets one key contribute multiple disjoint fragments to the same buffer in a single call.
Parameters:
buffer_ptrs: Memory addresses of pre-allocated destination buffers. Every buffer must be registered withregister_buffer()before calling this API.all_keys: For each buffer, the ordered list of source object keys to read from.all_dst_offsets: For each buffer and key, the destination offsets of that key’s fragments.all_src_offsets: For each buffer and key, the source offsets of that key’s fragments inside the object.all_sizes: For each buffer and key, the byte lengths of that key’s fragments.
Shape rules:
len(buffer_ptrs) == len(all_keys) == len(all_dst_offsets) == len(all_src_offsets) == len(all_sizes)For each buffer
i,len(all_keys[i]) == len(all_dst_offsets[i]) == len(all_src_offsets[i]) == len(all_sizes[i])For each
(buffer i, key j),len(all_dst_offsets[i][j]) == len(all_src_offsets[i][j]) == len(all_sizes[i][j])
If a top-level shape or per-key fragment shape does not match, the corresponding result entries are negative error codes.
Returns: A nested list of per-buffer, per-key, per-fragment results. results[i][j][k] is the number of bytes read for fragment k, or a negative value on error.
A successful call can still contain per-fragment failures. For example, if one key is missing but another key in the same buffer is valid, the missing key’s fragment result will be negative while the valid fragment can still succeed.
Typical scenarios:
Partial read from one object: You only need a slice of a large value, such as a header, metadata block, or a small subrange of a tensor shard. In this case, use one buffer, one key, and one or more fragments under that key.
Stitch multiple fragments from one object into one buffer: You need several non-contiguous ranges from the same object and want to pack them into one destination buffer. In this case, keep a single key entry and place multiple fragments under that key.
Stitch data from multiple objects into one buffer: You want to assemble one logical payload from several keys. In this case, use one destination buffer and list multiple keys under that buffer, with each key contributing one or more fragments.
Fill multiple output buffers in one call: You have several destination buffers, each with its own read plan. In this case, each top-level entry in
buffer_ptrsand the parallel nested arrays describes one independent destination buffer.
How to use it for partial reads:
If you only want part of an object, do not call get_into() with the full object buffer size. Instead:
Allocate and register a destination buffer sized for the bytes you actually want to materialize.
Put that buffer pointer into
buffer_ptrs.Put the source key into
all_keys.Set
all_src_offsetsto the start offsets of the object ranges you want.Set
all_sizesto the lengths of those ranges.Set
all_dst_offsetsto where those ranges should land in your destination buffer.
A useful way to think about the arguments is:
buffer_ptrsanswers where does the data landall_keysanswers which object does it come fromall_src_offsetsandall_sizesanswer which bytes should be readall_dst_offsetsanswers where each fragment should be placed in the destination buffer
If you are extracting a single contiguous slice from one object, the minimal shape is:
results = store.get_into_ranges(
[buffer_ptr],
[["my_key"]],
[[[0]]],
[[[src_offset]]],
[[[size]]],
)
This means:
one destination buffer
one source key for that buffer
one fragment for that key
read
sizebytes frommy_key[src_offset:src_offset + size]write them into
buffer_ptr[0:size]
If you want to read several disjoint ranges from the same object and pack them together, keep the same key and add more fragments under it. For example:
results = store.get_into_ranges(
[buffer_ptr],
[["my_key"]],
[[[0, 16, 40]]],
[[[128, 4096, 8192]]],
[[[8, 12, 4]]],
)
This reads three fragments from my_key and places them into the same destination buffer at offsets 0, 16, and 40. This pattern is useful when you want to assemble only the needed pieces of a large object without reading the whole value.
If you want to assemble one output buffer from multiple objects, keep one top-level buffer entry and add multiple keys under it. Each key can still contribute one or more fragments. For example, you might put a header from meta_key at the front of the buffer, then place a payload slice from data_key after it.
Usage example:
import ctypes
buffer_size = 32
buffer0 = (ctypes.c_ubyte * buffer_size)()
buffer1 = (ctypes.c_ubyte * buffer_size)()
buffer_ptr0 = ctypes.addressof(buffer0)
buffer_ptr1 = ctypes.addressof(buffer1)
store.register_buffer(buffer_ptr0, buffer_size)
store.register_buffer(buffer_ptr1, buffer_size)
# Buffer 0 reads:
# - from key1: two fragments -> src[1:5] -> dst[0:4], src[30:33] -> dst[20:23]
# - from key2: one fragment -> src[2:7] -> dst[8:13]
# Buffer 1 reads:
# - from key2: one fragment -> src[0:6] -> dst[4:10]
# - from key1: one fragment -> src[10:14] -> dst[16:20]
results = store.get_into_ranges(
[buffer_ptr0, buffer_ptr1],
[["key1", "key2"], ["key2", "key1"]],
[[[0, 20], [8]], [[4], [16]]],
[[[1, 30], [2]], [[0], [10]]],
[[[4, 3], [5]], [[6], [4]]],
)
# results == [
# [[4, 3], [5]],
# [[6], [4]],
# ]
In the example above:
results[0][0][0] == 4: buffer 0, key 0 ("key1"), fragment 0 succeeded with 4 bytesresults[0][0][1] == 3: buffer 0, key 0 ("key1"), fragment 1 succeeded with 3 bytesresults[0][1][0] == 5: buffer 0, key 1 ("key2"), fragment 0 succeeded with 5 bytes
Common pitfalls:
Do not flatten all fragments for a buffer into one list. Fragments must be grouped under their corresponding key.
all_dst_offsets,all_src_offsets, andall_sizesare 3D, butall_keysis 2D.Buffer overflow is checked against the registered destination buffer size.
Source overflow is checked against the source object’s size.
Full-object
get_into()and rangedget_into_ranges()are different APIs; useget_into()when you want the whole object into one buffer.
Current limitation: true ranged items currently require the selected source replica to be memory-backed. Whole-object reads still follow the normal full-read path, but partial reads through get_into_ranges() do not support non-memory replicas.
ReplicateConfig Configuration#
The ReplicateConfig class allows you to control data replication behavior when storing objects in Mooncake Store. This configuration is essential for ensuring data reliability, performance optimization, and storage placement control.
Class Definition#
from mooncake.store import ReplicateConfig
# Create a configuration instance
config = ReplicateConfig()
Properties#
replica_num#
Type: int
Default: 1
Description: Specifies the total number of replicas to create for the stored object.
config = ReplicateConfig()
config.replica_num = 3 # Store 3 copies of the data
with_soft_pin#
Type: bool
Default: False
Description: Enables soft pinning for the stored object. Soft pinned objects are prioritized to remain in memory during eviction - they are only evicted when memory is insufficient and no other objects are eligible for eviction. This is useful for frequently accessed or important objects like system prompts.
config = ReplicateConfig()
config.with_soft_pin = True # Keep this object in memory longer
with_hard_pin#
Type: bool
Default: False
Description: Enables hard pinning for the stored object. Hard pinned objects will not be evicted. This grants user to manually control the life time of stored objects.
config = ReplicateConfig()
config.with_hard_pin = True # Keep this object in memory that will not be evicted
preferred_segment#
Type: str
Default: "" (empty string)
Description: Specifies a preferred segment (node) for data allocation. This is typically the hostname:port of a target server.
config = ReplicateConfig()
# Preferred replica location ("host:port")
config.preferred_segment = "localhost:12345" # pin to a specific machine
# Alternatively, pin to the local host
config.preferred_segment = self.get_hostname()
# Optional: speed up local transfers
# export MC_STORE_MEMCPY=1
prefer_alloc_in_same_node#
Type: str
Default: "" (empty string)
Description: Enables the preference for allocating data on the same node. Currently, this only supports batch_put_from_multi_buffers. Additionally, it does not support disk segments, and the replica_num can only be set to 1.
config = ReplicateConfig()
config.prefer_alloc_in_same_node = "True
Non-Zero-Copy API (Simple Usage)#
For simpler use cases, use the standard API without memory registration:
Basic Operations#
Click to expand: Non-zero-copy API examples
from mooncake.store import MooncakeDistributedStore
# Initialize (same as zero-copy)
store = MooncakeDistributedStore()
store.setup("localhost", "http://localhost:8080/metadata", 512*1024*1024, 128*1024*1024, "tcp", "", "localhost:50051")
# Simple put/get (automatic memory management)
data = b"Hello, World!" * 1000 # ~13KB
store.put("message", data)
retrieved = store.get("message")
print(retrieved == data) # True
# Batch operations
keys = ["key1", "key2", "key3"]
values = [b"value1", b"value2", b"value3"]
store.put_batch(keys, values)
retrieved = store.get_batch(keys)
print("Retrieved all keys successfully:", retrieved == values)
Performance Notes#
Choose the appropriate API based on your use case:
Zero-copy API is beneficial when:
Working with large data transfers
RDMA network infrastructure is available and configured
Direct memory access patterns fit your application design
Non-zero-copy API is suitable for:
Development and prototyping phases
Applications without specific performance requirements
Batch operations can improve throughput for:
Multiple related operations performed together
Scenarios where network round-trip reduction is beneficial
Topology & Devices#
Auto-discovery: Disabled by default. For
protocol="rdma", you must specify RDMA devices.Enable auto-discovery (optional):
MC_MS_AUTO_DISC=1enables auto-discovery; thenrdma_devicesis not required.Optionally restrict candidates with
MC_MS_FILTERS, a comma-separated whitelist of NIC names, e.g.MC_MS_FILTERS=mlx5_0,mlx5_2.If
MC_MS_AUTO_DISCis not set or set to0, auto-discovery remains disabled andrdma_devicesis required for RDMA.
Examples:
# Auto-select with default settings
python - <<'PY'
from mooncake.store import MooncakeDistributedStore as S
s = S()
s.setup("localhost", "http://localhost:8080/metadata", 512*1024*1024, 128*1024*1024, "rdma", "", "localhost:50051")
PY
# Manual device list
unset MC_MS_AUTO_DISC
python - <<'PY'
from mooncake.store import MooncakeDistributedStore as S
s = S()
s.setup("localhost", "http://localhost:8080/metadata", 512*1024*1024, 128*1024*1024, "rdma", "mlx5_0,mlx5_1", "localhost:50051")
PY
# Auto-select with filters
export MC_MS_AUTO_DISC=1
export MC_MS_FILTERS=mlx5_0,mlx5_2
python - <<'PY'
from mooncake.store import MooncakeDistributedStore as S
s = S()
s.setup("localhost", "http://localhost:8080/metadata", 512*1024*1024, 128*1024*1024, "rdma", "", "localhost:50051")
PY
get_buffer Buffer Protocol#
The get_buffer method returns a BufferHandle object that implements the Python buffer protocol:
Click to expand: Buffer protocol usage example
# Get buffer with buffer protocol support
buffer = store.get_buffer("large_object")
if buffer:
# Access as numpy array without copy
import numpy as np
arr = np.array(buffer, copy=False)
# Direct memory access
ptr = buffer.ptr() # Memory address
size = buffer.size() # Buffer size in bytes
# Use with other libraries that accept buffer protocol
print(f"Buffer size: {len(buffer)} bytes")
# The buffer is automatically freed
Full API Reference#
Class: MooncakeDistributedStore#
The main class for interacting with Mooncake Store.
Constructor#
store = MooncakeDistributedStore()
Creates a new store instance. No parameters required.
setup()#
Initialize distributed resources and establish network connections.
def setup(
self,
local_hostname: str,
metadata_server: str,
global_segment_size: int = 16777216,
local_buffer_size: int = 1073741824,
protocol: str = "tcp",
rdma_devices: str = "",
master_server_addr: str,
) -> int
Parameters:
local_hostname(str): Required. Local hostname and port (e.g., “localhost” or “localhost:12345”)metadata_server(str): Required. Metadata server address (e.g., “http://localhost:8080/metadata”)global_segment_size(int): Memory segment size in bytes for mounting (default: 16MB = 16777216)local_buffer_size(int): Local buffer size in bytes (default: 1GB = 1073741824)protocol(str): Network protocol - “tcp” or “rdma” (default: “tcp”)rdma_devices(str): RDMA device name(s), e.g."mlx5_0"or"mlx5_0,mlx5_1". Leave empty to auto-select NICs. Provide device names to pin the NICs. Always empty for TCP.master_server_addr(str): Required. Master server address (e.g., “localhost:50051”)
Returns:
int: Status code (0 = success, non-zero = error code)
Example:
Click to expand: Setup examples for TCP and RDMA
# TCP initialization
store.setup("localhost", "http://localhost:8080/metadata", 1024*1024*1024, 128*1024*1024, "tcp", "", "localhost:50051")
# RDMA auto-detect
store.setup("localhost", "http://localhost:8080/metadata", 512*1024*1024, 128*1024*1024, "rdma", "", "localhost:50051")
# RDMA with explicit device list
store.setup("localhost", "http://localhost:8080/metadata", 512*1024*1024, 128*1024*1024, "rdma", "mlx5_0,mlx5_1", "localhost:50051")
setup_dummy()#
Initialize the store with a dummy client for testing purposes.
def setup_dummy(self, mem_pool_size: int, local_buffer_size: int, server_address: str) -> int
Parameters:
mem_pool_size(int): Memory pool size in byteslocal_buffer_size(int): Local buffer size in bytesserver_address(str): Server address in format “hostname:port”
Returns:
int: Status code (0 = success, non-zero = error code)
Example:
# Initialize with dummy client
store.setup_dummy(1024*1024*256, 1024*1024*64, "localhost:8080")
put()#
Store binary data in the distributed storage.
def put(self, key: str, value: bytes, config: ReplicateConfig = None) -> int
Parameters:
key(str): Unique object identifiervalue(bytes): Binary data to storeconfig(ReplicateConfig, optional): Replication configuration
Returns:
int: Status code (0 = success, non-zero = error code)
Example:
Click to expand: Put operation examples
# Simple put
store.put("my_key", b"Hello, World!")
# Put with replication config
config = ReplicateConfig()
config.replica_num = 2
store.put("important_data", b"Critical information", config)
get()#
Retrieve binary data from distributed storage.
def get(self, key: str) -> bytes
Parameters:
key(str): Object identifier to retrieve
Returns:
bytes: Retrieved binary data
Raises:
Returns empty bytes if key doesn’t exist
Example:
Click to expand: Get operation example
data = store.get("my_key")
if data:
print(f"Retrieved: {data.decode()}")
else:
print("Key not found")
put_batch()#
Store multiple objects in a single batch operation.
def put_batch(self, keys: List[str], values: List[bytes], config: ReplicateConfig = None) -> int
Parameters:
keys(List[str]): List of object identifiersvalues(List[bytes]): List of binary data to storeconfig(ReplicateConfig, optional): Replication configuration for all objects
Returns:
int: Status code (0 = success, non-zero = error code)
Example:
Click to expand: Batch put example
keys = ["key1", "key2", "key3"]
values = [b"value1", b"value2", b"value3"]
result = store.put_batch(keys, values)
upsert()#
Insert a new object if the key does not exist, or update the existing object in place when possible. They use the same replication configuration model as put().
Upsert binary data in the distributed storage.
def upsert(self, key: str, value: bytes, config: ReplicateConfig = None) -> int
Parameters:
key(str): Unique object identifiervalue(bytes): Binary data to insert or updateconfig(ReplicateConfig, optional): Replication configuration
Returns:
int: Status code (0 = success, non-zero = error code)
Example:
config = ReplicateConfig()
config.replica_num = 2
rc = store.upsert("weights", b"new-bytes", config)
if rc == 0:
print("Upsert succeeded")
upsert_from()#
Upsert object data directly from a pre-allocated buffer (zero-copy).
def upsert_from(self, key: str, buffer_ptr: int, size: int, config: ReplicateConfig = None) -> int
Parameters:
key(str): Object identifierbuffer_ptr(int): Memory address of the source buffersize(int): Number of bytes to insert or updateconfig(ReplicateConfig, optional): Replication configuration
Returns:
int: Status code (0 = success, non-zero = error code)
Note: This is the zero-copy counterpart of upsert(). As with
put_from(), register the buffer before issuing the request.
batch_upsert_from()#
Upsert multiple objects directly from pre-allocated buffers.
def batch_upsert_from(self, keys: List[str], buffer_ptrs: List[int], sizes: List[int],
config: ReplicateConfig = None) -> List[int]
Parameters:
keys(List[str]): List of object identifiersbuffer_ptrs(List[int]): List of source buffer addressessizes(List[int]): List of byte lengths for each bufferconfig(ReplicateConfig, optional): Replication configuration shared by all objects
Returns:
List[int]: List of status codes for each upsert
upsert_parts()#
Upsert data from multiple buffer parts as a single object (insert or update).
def upsert_parts(self, key: str, *parts, config: ReplicateConfig = None) -> int
Parameters:
key(str): Object identifier*parts: Variable number of bytes-like objects to concatenateconfig(ReplicateConfig, optional): Replication configuration
Returns:
int: Status code (0 = success, non-zero = error code)
Example:
part1 = b"Hello, "
part2 = b"World!"
result = store.upsert_parts("greeting", part1, part2)
upsert_batch()#
Upsert multiple objects in a single batch operation.
def upsert_batch(self, keys: List[str], values: List[bytes], config: ReplicateConfig = None) -> int
Parameters:
keys(List[str]): List of object identifiersvalues(List[bytes]): List of binary data to insert or updateconfig(ReplicateConfig, optional): Replication configuration for all objects
Returns:
int: Status code (0 = success, non-zero = error code)
Example:
keys = ["key1", "key2", "key3"]
values = [b"value1", b"value2", b"value3"]
result = store.upsert_batch(keys, values)
get_batch()#
Retrieve multiple objects in a single batch operation.
def get_batch(self, keys: List[str]) -> List[bytes]
Parameters:
keys(List[str]): List of object identifiers to retrieve
Returns:
List[bytes]: List of retrieved binary data
Example:
Click to expand: Batch get example
keys = ["key1", "key2", "key3"]
values = store.get_batch(keys)
for key, value in zip(keys, values):
print(f"{key}: {len(value)} bytes")
remove()#
Delete an object from the storage system.
def remove(self, key: str) -> int
Parameters:
key(str): Object identifier to remove
Returns:
int: Status code (0 = success, non-zero = error code)
Example:
result = store.remove("my_key")
if result == 0:
print("Successfully removed")
remove_by_regex()#
Remove objects from the storage system whose keys match a regular expression.
def remove_by_regex(self, regex: str) -> int
Parameters:
regex(str): The regular expression to match against object keys.
Returns:
int: The number of objects removed, or a negative value on error.
Example:
# Remove all keys starting with "user_session_"
count = store.remove_by_regex("^user_session_.*")
if count >= 0:
print(f"Removed {count} objects")
remove_all()#
Remove all objects from the storage system.
def remove_all(self) -> int
Returns:
int: Number of objects removed, or -1 on error
Example:
count = store.remove_all()
print(f"Removed {count} objects")
batch_remove()#
Remove multiple objects by their keys in a single batch operation.
def batch_remove(self, keys: List[str], force: bool = False) -> List[int]
Parameters:
keys(List[str]): List of object identifiers to removeforce(bool): If True, skip lease and replication task checks (default: False)
Returns:
List[int]: List of status codes for each key (0 = success, negative = error code)
Example:
# Remove multiple keys in one batch
keys = ["key1", "key2", "key3", "key4", "key5"]
results = store.batch_remove(keys)
# Check results
for key, result in zip(keys, results):
if result == 0:
print(f"✓ {key} removed successfully")
else:
print(f"✗ {key} failed with error code: {result}")
# Force remove (bypass lease checks)
results = store.batch_remove(keys, force=True)
is_exist()#
Check if an object exists in the storage system.
def is_exist(self, key: str) -> int
Parameters:
key(str): Object identifier to check
Returns:
int:1: Object exists0: Object doesn’t exist-1: Error occurred
Example:
exists = store.is_exist("my_key")
if exists == 1:
print("Object exists")
elif exists == 0:
print("Object not found")
else:
print("Error checking existence")
batch_is_exist()#
Check existence of multiple objects in a single batch operation.
def batch_is_exist(self, keys: List[str]) -> List[int]
Parameters:
keys(List[str]): List of object identifiers to check
Returns:
List[int]: List of existence results (1=exists, 0=not exists, -1=error)
Example:
keys = ["key1", "key2", "key3"]
results = store.batch_is_exist(keys)
for key, exists in zip(keys, results):
status = "exists" if exists == 1 else "not found" if exists == 0 else "error"
print(f"{key}: {status}")
get_size()#
Get the size of a stored object in bytes.
def get_size(self, key: str) -> int
Parameters:
key(str): Object identifier
Returns:
int: Size in bytes, or negative value on error
Example:
size = store.get_size("my_key")
if size >= 0:
print(f"Object size: {size} bytes")
else:
print("Error getting size or object not found")
get_buffer()#
Get object data as a buffer that implements Python’s buffer protocol.
def get_buffer(self, key: str) -> BufferHandle
Parameters:
key(str): Object identifier
Returns:
BufferHandle: Buffer object or None if not found
Example:
buffer = store.get_buffer("large_object")
if buffer:
print(f"Buffer size: {buffer.size()} bytes")
# Use with numpy without copying
import numpy as np
arr = np.array(buffer, copy=False)
put_parts()#
Store data from multiple buffer parts as a single object.
def put_parts(self, key: str, *parts, config: ReplicateConfig = None) -> int
Parameters:
key(str): Object identifier*parts: Variable number of bytes-like objects to concatenateconfig(ReplicateConfig, optional): Replication configuration
Returns:
int: Status code (0 = success, non-zero = error code)
Example:
part1 = b"Hello, "
part2 = b"World!"
part3 = b" From Mooncake"
result = store.put_parts("greeting", part1, part2, part3)
batch_get_buffer()#
Get multiple objects as buffers that implement Python’s buffer protocol.
def batch_get_buffer(self, keys: List[str]) -> List[BufferHandle]
Parameters:
keys(List[str]): List of object identifiers to retrieve
Returns:
List[BufferHandle]: List of buffer objects, with None for keys not found
Note: This function is not supported for dummy client.
Example:
buffers = store.batch_get_buffer(["key1", "key2", "key3"])
for i, buffer in enumerate(buffers):
if buffer:
print(f"Buffer {i} size: {buffer.size()} bytes")
alloc_from_mem_pool()#
Allocate memory from the memory pool.
def alloc_from_mem_pool(self, size: int) -> int
Parameters:
size(int): Size of memory to allocate in bytes
Returns:
int: Memory address as integer, or 0 on failure
init_all()#
Initialize all resources with specified protocol and device.
def init_all(self, protocol: str, device_name: str, mount_segment_size: int = 16777216) -> int
Parameters:
protocol(str): Network protocol - “tcp” or “rdma”device_name(str): Device name for the protocolmount_segment_size(int): Memory segment size in bytes for mounting (default: 16MB = 16777216)
Returns:
int: Status code (0 = success, non-zero = error code)
get_hostname()#
Get the hostname of the current store instance.
def get_hostname(self) -> str
Returns:
str: Hostname and port of this store instance
Example:
hostname = store.get_hostname()
print(f"Store running on: {hostname}")
get_replica_desc()#
Get descriptors of replicas for a key.
def get_replica_desc(self, key: str) -> List[Replica::Descriptor]
Parameters:
key(str): mooncake store key
Returns:
List[Replica::Descriptor]: List of replica descriptors
Example:
descriptors = store.get_replica_desc("mooncake_key")
for desc in descriptors:
print("Status:", desc.status)
if desc.is_memory_replica():
mem_desc = desc.get_memory_descriptor()
print("Memory buffer desc:", mem_desc.buffer_descriptor)
elif desc.is_disk_replica():
disk_desc = desc.get_disk_descriptor()
print("Disk path:", disk_desc.file_path, "Size:", disk_desc.object_size)
batch_get_replica_desc()#
Get descriptors of replicas for a tuple of keys.
def batch_get_replica_desc(self, keys: List[str]) -> Dict[str, List[Replica::Descriptor]]
Parameters:
keys(List[str]): List of mooncake store keys
Returns:
Dict[str, List[Replica::Descriptor]]: Dictionary mapping keys to their list of replica descriptors
Example:
descriptors_map = store.batch_get_replica_desc(["key1", "key2"])
for key, desc_list in descriptors_map.items():
print(f"Replicas for key: {key}")
for desc in desc_list:
if desc.is_memory_replica():
mem_desc = desc.get_memory_descriptor()
print("Memory buffer desc:", mem_desc.buffer_descriptor)
elif desc.is_disk_replica():
disk_desc = desc.get_disk_descriptor()
print("Disk path:", disk_desc.file_path, "Size:", disk_desc.object_size)
create_copy_task()#
Creates an asynchronous copy task to replicate an object to target segments.
def create_copy_task(self, key: str, targets: List[str]) -> Tuple[UUID, int]
Parameters:
key(str): Object key to copytargets(List[str]): List of target segment names where replicas should be created
Returns:
Tuple[UUID, int]: (task UUID, error code)If successful: (task UUID, 0)
If failed: (UUID{0, 0}, error code)
Example:
# Create an asynchronous copy task
task_id, error_code = store.create_copy_task("my_key", ["segment1", "segment2"])
if error_code == 0:
print(f"Copy task created with ID: {task_id}")
# Query task status later
response, status = store.query_task(task_id)
if status == 0:
print(f"Task status: {response.status}")
else:
print(f"Failed to create copy task: {error_code}")
create_move_task()#
Creates an asynchronous move task to move an object from source segment to target segment.
def create_move_task(self, key: str, source: str, target: str) -> Tuple[UUID, int]
Parameters:
key(str): Object key to movesource(str): Source segment name where the replica currently existstarget(str): Target segment name where the replica should be moved to
Returns:
Tuple[UUID, int]: (task UUID, error code)If successful: (task UUID, 0)
If failed: (UUID{0, 0}, error code)
Example:
# Create an asynchronous move task
task_id, error_code = store.create_move_task("my_key", "old_segment", "new_segment")
if error_code == 0:
print(f"Move task created with ID: {task_id}")
# Query task status later
response, status = store.query_task(task_id)
if status == 0:
print(f"Task status: {response.status}")
else:
print(f"Failed to create move task: {error_code}")
query_task()#
Queries the status of an asynchronous task (copy or move).
def query_task(self, task_id: UUID) -> Tuple[QueryTaskResponse | None, int]
Parameters:
task_id(UUID): UUID of the task to query
Returns:
Tuple[QueryTaskResponse | None, int]: (QueryTaskResponse if success, error code)If successful: (QueryTaskResponse, 0)
If failed: (None, error code)
Example:
from mooncake.store import MooncakeDistributedStore, TaskStatus
import time
# Initialize store
store = MooncakeDistributedStore()
store.setup("localhost", "http://localhost:8080/metadata",
512*1024*1024, 128*1024*1024, "tcp", "", "localhost:50051")
# Submit multiple copy tasks
tasks = []
for key in ["key1", "key2", "key3"]:
task_id, error = store.create_copy_task(key, ["segment1", "segment2"])
if error == 0:
tasks.append(task_id)
print(f"Created copy task {task_id} for {key}")
# Monitor task progress
while tasks:
completed = []
for task_id in tasks:
response, status = store.query_task(task_id)
if status == 0 and response:
if response.status == TaskStatus.SUCCESS:
print(f"Task {task_id} succeeded")
completed.append(task_id)
elif response.status == TaskStatus.FAILED:
print(f"Task {task_id} failed: {response.message}")
completed.append(task_id)
# Remove completed tasks
tasks = [t for t in tasks if t not in completed]
if tasks:
time.sleep(1) # Wait before next check
print("All tasks completed")
store.close()
close()#
Clean up all resources and terminate connections.
def close(self) -> int
Returns:
int: Status code (0 = success, non-zero = error code)
Example:
store.close()
put_from_with_metadata()#
Store data directly from a registered buffer with metadata (zero-copy).
def put_from_with_metadata(self, key: str, buffer_ptr: int, metadata_buffer_ptr: int, size: int, metadata_size: int, config: ReplicateConfig = None) -> int
Parameters:
key(str): Object identifierbuffer_ptr(int): Memory address of the main data buffer (from ctypes.data or similar)metadata_buffer_ptr(int): Memory address of the metadata buffersize(int): Number of bytes for the main datametadata_size(int): Number of bytes for the metadataconfig(ReplicateConfig, optional): Replication configuration
Returns:
int: Status code (0 = success, non-zero = error code)
Note: This function is not supported for dummy client.
Example:
import numpy as np
# Create data and metadata
data = np.random.randn(1000).astype(np.float32)
metadata = np.array([42, 100], dtype=np.int32) # example metadata
# Register buffers
data_ptr = data.ctypes.data
metadata_ptr = metadata.ctypes.data
store.register_buffer(data_ptr, data.nbytes)
store.register_buffer(metadata_ptr, metadata.nbytes)
# Store with metadata
result = store.put_from_with_metadata("data_with_metadata", data_ptr, metadata_ptr,
data.nbytes, metadata.nbytes)
if result == 0:
print("Data with metadata stored successfully")
# Cleanup
store.unregister_buffer(data_ptr)
store.unregister_buffer(metadata_ptr)
pub_tensor()#
Publish a PyTorch tensor with configurable replication settings.
def pub_tensor(self, key: str, tensor: torch.Tensor, config: ReplicateConfig = None) -> int
Parameters:
key(str): Unique object identifiertensor(torch.Tensor): PyTorch tensor to storeconfig(ReplicateConfig, optional): Replication configuration
Returns:
int: Status code (0 = success, non-zero = error code)
Note: This function requires torch to be installed and available in the environment.
Example:
import torch
from mooncake.store import ReplicateConfig
# Create a tensor
tensor = torch.randn(100, 100)
# Create replication config
config = ReplicateConfig()
config.replica_num = 3
config.with_soft_pin = True
# Publish tensor with replication settings
result = store.pub_tensor("my_tensor", tensor, config)
if result == 0:
print("Tensor published successfully")
PyTorch Tensor Operations (Tensor Parallelism)#
These methods provide direct support for storing and retrieving PyTorch tensors. They automatically handle serialization and metadata, and include built-in support for Tensor Parallelism (TP) by automatically splitting and reconstructing tensor shards.
⚠️ Note: These methods require torch to be installed and available in the environment.
put_tensor_with_tp()#
Put a PyTorch tensor into the store, optionally splitting it into shards for tensor parallelism.
The tensor is chunked immediately and stored as separate keys (e.g., key_tp_0, key_tp_1…).
def put_tensor_with_tp(self, key: str, tensor: torch.Tensor, tp_rank: int = 0, tp_size: int = 1, split_dim: int = 0) -> int
Parameters:
key(str): Base identifier for the tensor.tensor(torch.Tensor): The PyTorch tensor to store.tp_rank(int): Current tensor parallel rank (default: 0). Note: The method splits and stores all chunks for all ranks regardless of this value.tp_size(int): Total tensor parallel size (default: 1). If > 1, the tensor is split intotp_sizechunks.split_dim(int): The dimension to split the tensor along (default: 0).
Returns:
int: Status code (0 = success, non-zero = error code).
pub_tensor_with_tp()#
Publish a PyTorch tensor into the store with configurable replication settings, optionally splitting it into shards for tensor parallelism.
The tensor is chunked immediately and stored as separate keys (e.g., key_tp_0, key_tp_1…).
def pub_tensor_with_tp(self, key: str, tensor: torch.Tensor, config: ReplicateConfig, tp_rank: int = 0, tp_size: int = 1, split_dim: int = 0) -> int
Parameters:
key(str): Base identifier for the tensor.tensor(torch.Tensor): The PyTorch tensor to store.config(ReplicateConfig): Optional replication configuration.tp_rank(int): Current tensor parallel rank (default: 0). Note: The method splits and stores all chunks for all ranks regardless of this value.tp_size(int): Total tensor parallel size (default: 1). If > 1, the tensor is split intotp_sizechunks.split_dim(int): The dimension to split the tensor along (default: 0).
Returns:
int: Status code (0 = success, non-zero = error code).
get_tensor_with_tp()#
Get a PyTorch tensor from the store, specifically retrieving the shard corresponding to the given Tensor Parallel rank.
def get_tensor_with_tp(self, key: str, tp_rank: int = 0, tp_size: int = 1, split_dim: int = 0) -> torch.Tensor
Parameters:
key(str): Base identifier of the tensor.tp_rank(int): The tensor parallel rank to retrieve (default: 0). Fetches keykey_tp_{rank}iftp_size > 1.tp_size(int): Total tensor parallel size (default: 1).split_dim(int): The dimension used during splitting (default: 0).
Returns:
torch.Tensor: The retrieved tensor (or shard). ReturnsNoneif not found.
batch_put_tensor_with_tp()#
Put a batch of PyTorch tensors into the store, splitting each into shards for tensor parallelism.
def batch_put_tensor_with_tp(self, base_keys: List[str], tensors_list: List[torch.Tensor], tp_rank: int = 0, tp_size: int = 1, split_dim: int = 0) -> List[int]
Parameters:
base_keys(List[str]): List of base identifiers.tensors_list(List[torch.Tensor]): List of tensors to store.tp_rank(int): Current rank (default: 0).tp_size(int): Total TP size (default: 1).split_dim(int): Split dimension (default: 0).
Returns:
List[int]: List of status codes for each tensor operation.
batch_pub_tensor_with_tp()#
Publish a batch of PyTorch tensors into the store with configurable replication settings, splitting each into shards for tensor parallelism.
def batch_pub_tensor_with_tp(self, base_keys: List[str], tensors_list: List[torch.Tensor], config: ReplicateConfig, tp_rank: int = 0, tp_size: int = 1, split_dim: int = 0) -> List[int]
Parameters:
base_keys(List[str]): List of base identifiers.tensors_list(List[torch.Tensor]): List of tensors to store.config(ReplicateConfig): Optional replication configuration.tp_rank(int): Current rank (default: 0).tp_size(int): Total tp size (default: 1).split_dim(int): Split dimension (default: 0).
Returns:
List[int]: List of status codes for each tensor operation.
batch_get_tensor_with_tp()#
Get a batch of PyTorch tensor shards from the store for a given Tensor Parallel rank.
def batch_get_tensor_with_tp(self, base_keys: List[str], tp_rank: int = 0, tp_size: int = 1) -> List[torch.Tensor]
Parameters:
base_keys(List[str]): List of base identifiers.tp_rank(int): The tensor parallel rank to retrieve (default: 0).tp_size(int): Total tensor parallel size (default: 1).
Returns:
List[torch.Tensor]: List of retrieved tensors (or shards). ContainsNonefor missing keys.
put_tensor()#
Put a PyTorch tensor into the store.
def put_tensor(self, key: str, tensor: torch.Tensor) -> int
Parameters:
key(str): Object identifiertensor(torch.Tensor): The PyTorch tensor to store
Returns:
int: Status code (0 = success, non-zero = error code)
Note: This function requires torch to be installed and available in the environment.
Example:
import torch
from mooncake.store import MooncakeDistributedStore
store = MooncakeDistributedStore()
store.setup("localhost", "http://localhost:8080/metadata", 512*1024*1024, 128*1024*1024, "tcp", "", "localhost:50051")
# Store a tensor
tensor = torch.randn(100, 100)
result = store.put_tensor("my_tensor", tensor)
if result == 0:
print("Tensor stored successfully")
get_tensor()#
Get a PyTorch tensor from the store.
def get_tensor(self, key: str) -> torch.Tensor
Parameters:
key(str): Object identifier to retrieve
Returns:
torch.Tensor: The retrieved tensor. ReturnsNoneif not found.
Note: This function requires torch to be installed and available in the environment.
Example:
import torch
from mooncake.store import MooncakeDistributedStore
store = MooncakeDistributedStore()
store.setup("localhost", "http://localhost:8080/metadata", 512*1024*1024, 128*1024*1024, "tcp", "", "localhost:50051")
# Store a tensor
tensor = torch.randn(100, 100)
store.put_tensor("my_tensor", tensor)
# Retrieve the tensor
retrieved_tensor = store.get_tensor("my_tensor")
if retrieved_tensor is not None:
print(f"Retrieved tensor with shape: {retrieved_tensor.shape}")
batch_get_tensor()#
Get a batch of PyTorch tensors from the store.
def batch_get_tensor(self, keys: List[str]) -> List[torch.Tensor]
Parameters:
keys(List[str]): List of object identifiers to retrieve
Returns:
List[torch.Tensor]: List of retrieved tensors. ContainsNonefor missing keys.
Note: This function requires torch to be installed and available in the environment.
Example:
import torch
from mooncake.store import MooncakeDistributedStore
store = MooncakeDistributedStore()
store.setup("localhost", "http://localhost:8080/metadata", 512*1024*1024, 128*1024*1024, "tcp", "", "localhost:50051")
# Store tensors
tensor1 = torch.randn(100, 100)
tensor2 = torch.randn(50, 50)
store.put_tensor("tensor1", tensor1)
store.put_tensor("tensor2", tensor2)
# Retrieve multiple tensors
tensors = store.batch_get_tensor(["tensor1", "tensor2", "nonexistent"])
for i, tensor in enumerate(tensors):
if tensor is not None:
print(f"Tensor {i} shape: {tensor.shape}")
else:
print(f"Tensor {i} not found")
batch_put_tensor()#
Put a batch of PyTorch tensors into the store.
def batch_put_tensor(self, keys: List[str], tensors_list: List[torch.Tensor]) -> List[int]
Parameters:
keys(List[str]): List of object identifierstensors_list(List[torch.Tensor]): List of tensors to store
Returns:
List[int]: List of status codes for each tensor operation.
Note: This function requires torch to be installed and available in the environment.
Example:
import torch
from mooncake.store import MooncakeDistributedStore
store = MooncakeDistributedStore()
store.setup("localhost", "http://localhost:8080/metadata", 512*1024*1024, 128*1024*1024, "tcp", "", "localhost:50051")
# Create tensors
tensors = [torch.randn(100, 100), torch.randn(50, 50), torch.randn(25, 25)]
keys = ["tensor1", "tensor2", "tensor3"]
# Store multiple tensors
results = store.batch_put_tensor(keys, tensors)
for i, result in enumerate(results):
if result == 0:
print(f"Tensor {i} stored successfully")
else:
print(f"Tensor {i} failed to store with code: {result}")
batch_pub_tensor()#
Pub a batch of PyTorch tensors into the store with configurable replication settings.
def batch_pub_tensor(self, keys: List[str], tensors_list: List[torch.Tensor], config: ReplicateConfig) -> List[int]
Parameters:
keys(List[str]): List of object identifierstensors_list(List[torch.Tensor]): List of tensors to storeconfig(ReplicateConfig): Optional replication configuration.
Returns:
List[int]: List of status codes for each tensor operation.
Note: This function requires torch to be installed and available in the environment.
upsert_tensor()#
Insert a tensor if its key is missing, or update the existing tensor if the key already exists. The current tensor upsert helpers use the default ReplicateConfig and therefore do not take a config parameter.
Upsert a PyTorch tensor into the store.
def upsert_tensor(self, key: str, tensor: torch.Tensor) -> int
Parameters:
key(str): Object identifiertensor(torch.Tensor): The PyTorch tensor to insert or update
Returns:
int: Status code (0 = success, non-zero = error code)
Note: This function requires torch to be installed and available in the environment.
upsert_tensor_from()#
Upsert a tensor directly from a pre-allocated buffer. The buffer layout must be
[TensorMetadata][tensor data], matching the layout used by
get_tensor_into().
def upsert_tensor_from(self, key: str, buffer_ptr: int, size: int) -> int
Parameters:
key(str): Object identifierbuffer_ptr(int): Buffer pointer containing serialized tensor metadata and payloadsize(int): Actual serialized byte length of the tensor buffer
Returns:
int: Status code (0 = success, non-zero = error code)
Note: This function is not supported for dummy client.
batch_upsert_tensor_from()#
Upsert multiple tensors directly from pre-allocated buffers. Each buffer must
use layout [TensorMetadata][tensor data].
def batch_upsert_tensor_from(self, keys: List[str], buffer_ptrs: List[int], sizes: List[int]) -> List[int]
Parameters:
keys(List[str]): List of object identifiersbuffer_ptrs(List[int]): List of serialized tensor buffer pointerssizes(List[int]): List of actual serialized byte lengths
Returns:
List[int]: List of status codes for each tensor upsert
batch_upsert_tensor()#
Upsert a batch of PyTorch tensors into the store (insert or update).
def batch_upsert_tensor(self, keys: List[str], tensors_list: List[torch.Tensor]) -> List[int]
Parameters:
keys(List[str]): List of object identifierstensors_list(List[torch.Tensor]): List of tensors to insert or update
Returns:
List[int]: List of status codes for each tensor operation.
Note: This function requires torch to be installed and available in the environment. Not supported for dummy client.
upsert_pub_tensor()#
Upsert a PyTorch tensor with configurable replication settings (insert or update).
def upsert_pub_tensor(self, key: str, tensor: torch.Tensor, config: ReplicateConfig = None) -> int
Parameters:
key(str): Unique object identifiertensor(torch.Tensor): PyTorch tensor to insert or updateconfig(ReplicateConfig, optional): Replication configuration
Returns:
int: Status code (0 = success, non-zero = error code)
Note: This function requires torch to be installed and available in the environment. Not supported for dummy client.
Example:
import torch
from mooncake.store import ReplicateConfig
tensor = torch.randn(100, 100)
config = ReplicateConfig()
config.replica_num = 2
config.with_soft_pin = True
result = store.upsert_pub_tensor("my_tensor", tensor, config)
if result == 0:
print("Tensor upserted successfully")
batch_upsert_pub_tensor()#
Batch upsert PyTorch tensors with configurable replication settings (insert or update).
def batch_upsert_pub_tensor(self, keys: List[str], tensors_list: List[torch.Tensor], config: ReplicateConfig = None) -> List[int]
Parameters:
keys(List[str]): List of object identifierstensors_list(List[torch.Tensor]): List of tensors to insert or updateconfig(ReplicateConfig, optional): Replication configuration
Returns:
List[int]: List of status codes for each tensor operation.
Note: This function requires torch to be installed and available in the environment. Not supported for dummy client.
PyTorch Tensor Operations (Zero Copy)#
These methods provide direct support for storing and retrieving PyTorch tensors. They automatically handle serialization and metadata, and include built-in support for Tensor Parallelism (TP) by automatically splitting and reconstructing tensor shards.
⚠️ Note: These methods require torch to be installed and available in the environment.
get_tensor_into()#
Get a PyTorch tensor from the store directly into a pre-allocated buffer.
def get_tensor_into(self, key: str, buffer_ptr: int, size: int) -> torch.Tensor
Parameters:
key(str): Base identifier of the tensor.buffer_ptr(int): The buffer pointer pre-allocated for tensor, and the buffer should be registered.size(int): The size of buffer.
Returns:
torch.Tensor: The retrieved tensor (or shard). ReturnsNoneif not found.
batch_get_tensor()#
Get a batch of PyTorch tensor from the store directly into a pre-allocated buffer.
def batch_get_tensor_into(self, base_keys: List[str], buffer_ptrs: List[int], sizes: List[int]) -> List[torch.Tensor]
Parameters:
base_keys(List[str]): List of base identifiers.buffer_ptrs(List[int]): List of buffer pointers pre-allocated for tensor; buffers should be registered.sizes(List[int]): List of buffer sizes.
Returns:
List[torch.Tensor]: List of retrieved tensors (or shards). ContainsNonefor missing keys.
get_tensor_with_tp_into()#
Get a PyTorch tensor from the store, specifically retrieving the shard corresponding to the given Tensor Parallel rank, directly into the pre-allocated buffer.
def get_tensor_with_tp_into(self, key: str, buffer_ptr: int, size: int, tp_rank: int = 0, tp_size: int = 1, split_dim: int = 0) -> torch.Tensor
Parameters:
key(str): Base identifier of the tensor.buffer_ptr(int): The buffer pointer pre-allocated for tensor, and the buffer should be registered.size(int): The size of buffer.tp_rank(int): The tensor parallel rank to retrieve (default: 0). Fetches keykey_tp_{rank}iftp_size > 1.tp_size(int): Total tensor parallel size (default: 1).split_dim(int): The dimension used during splitting (default: 0).
Returns:
torch.Tensor: The retrieved tensor (or shard). ReturnsNoneif not found.
batch_get_tensor_with_tp_into()#
Get a batch of PyTorch tensor shards from the store for a given Tensor Parallel rank, directly into the pre-allocated buffer.
def batch_get_tensor_with_tp_into(self, base_keys: List[str], buffer_ptrs: List[int], sizes: List[int], tp_rank: int = 0, tp_size: int = 1) -> List[torch.Tensor]
Parameters:
base_keys(List[str]): List of base identifiers.buffer_ptrs(List[int]): List of buffer pointers pre-allocated for tensor; buffers should be registered.sizes(List[int]): List of buffer sizes.tp_rank(int): The tensor parallel rank to retrieve (default: 0).tp_size(int): Total tensor parallel size (default: 1).
Returns:
List[torch.Tensor]: List of retrieved tensors (or shards). ContainsNonefor missing keys.
put_tensor_from()#
Put a PyTorch tensor into the store directly from a pre-allocated buffer (zero-copy). The buffer must contain data in the same layout as produced by get_tensor_into: [TensorMetadata][tensor data]. The buffer is only read during this call; no Python object references it.
def put_tensor_from(self, key: str, buffer_ptr: int, size: int) -> int
Parameters:
key(str): Object identifier for the tensor.buffer_ptr(int): The buffer pointer; the buffer should be registered. Layout must be [TensorMetadata][tensor data].size(int): Actual serialized byte length of the data in the buffer (metadata + tensor bytes), not the buffer capacity.
Returns:
int: Status code (0 = success, non-zero = error code).
batch_put_tensor_from()#
Put a batch of PyTorch tensors into the store directly from pre-allocated buffers (zero-copy). Each buffer must contain data in the layout [TensorMetadata][tensor data], same as get_tensor_into.
def batch_put_tensor_from(self, keys: List[str], buffer_ptrs: List[int], sizes: List[int]) -> List[int]
Parameters:
keys(List[str]): List of object identifiers.buffer_ptrs(List[int]): List of buffer pointers; buffers should be registered.sizes(List[int]): List of actual serialized byte lengths for each buffer (metadata + tensor bytes), not buffer capacities.
Returns:
List[int]: List of status codes for each tensor operation (0 = success, non-zero = error code).
put_tensor_with_tp_from()#
Put a full tensor into the store directly from a pre-allocated buffer (zero-copy), for use with Tensor Parallelism. This is the zero-copy counterpart of put_tensor_with_tp(): the buffer must contain the complete tensor in layout [TensorMetadata][tensor data], and Mooncake will split it internally and store all shards under key_tp_<rank>.
def put_tensor_with_tp_from(self, key: str, buffer_ptr: int, size: int, tp_rank: int = 0, tp_size: int = 1, split_dim: int = 0) -> int
Parameters:
key(str): Base identifier for the tensor.buffer_ptr(int): The buffer pointer; the buffer should be registered.size(int): Actual serialized byte length of the full tensor in the buffer.tp_rank(int): Kept for signature compatibility withput_tensor_with_tp()(default: 0). It does not mean “only write one shard”.tp_size(int): Total tensor parallel size (default: 1). If 1, equivalent toput_tensor_from(key, buffer_ptr, size).split_dim(int): Dimension along which the full tensor is split before storing shards.
Returns:
int: Status code (0 = success, non-zero = error code).
batch_put_tensor_with_tp_from()#
Put a batch of full tensors into the store directly from pre-allocated buffers (zero-copy). This is the zero-copy counterpart of batch_put_tensor_with_tp(): each buffer contains one full tensor in layout [TensorMetadata][tensor data], and Mooncake splits each tensor internally and stores all TP shards.
def batch_put_tensor_with_tp_from(self, base_keys: List[str], buffer_ptrs: List[int], sizes: List[int], tp_rank: int = 0, tp_size: int = 1, split_dim: int = 0) -> List[int]
Parameters:
base_keys(List[str]): List of base identifiers.buffer_ptrs(List[int]): List of buffer pointers; buffers should be registered.sizes(List[int]): List of actual serialized byte lengths for each full-tensor buffer.tp_rank(int): Kept for signature compatibility withbatch_put_tensor_with_tp()(default: 0). It does not select a single shard to write.tp_size(int): Total tensor parallel size (default: 1). If 1, equivalent tobatch_put_tensor_from(base_keys, buffer_ptrs, sizes).split_dim(int): Dimension along which each full tensor is split before storing shards.
Returns:
List[int]: List of status codes for each tensor operation (0 = success, non-zero = error code).
Batch Zero-Copy Operations#
batch_put_from()#
Store multiple objects from pre-registered buffers (zero-copy).
def batch_put_from(self, keys: List[str], buffer_ptrs: List[int], sizes: List[int], config: ReplicateConfig = None) -> List[int]
Parameters:
keys(List[str]): List of object identifiersbuffer_ptrs(List[int]): List of memory addressessizes(List[int]): List of buffer sizesconfig(ReplicateConfig, optional): Replication configuration
Returns:
List[int]: List of status codes for each operation (0 = success, negative = error)
batch_get_into()#
Retrieve multiple objects into pre-registered buffers (zero-copy).
def batch_get_into(self, keys: List[str], buffer_ptrs: List[int], sizes: List[int]) -> List[int]
Parameters:
keys(List[str]): List of object identifiersbuffer_ptrs(List[int]): List of memory addressessizes(List[int]): List of buffer sizes
Returns:
List[int]: List of bytes read for each operation (positive = success, negative = error)
⚠️ Buffer Registration Required: All buffers must be registered before batch zero-copy operations.
Example:
Click to expand: Batch zero-copy retrieval example
# Prepare buffers
keys = ["tensor1", "tensor2", "tensor3"]
buffer_size = 1024 * 1024 # 1MB each
buffers = []
buffer_ptrs = []
for i in range(len(keys)):
buffer = np.empty(buffer_size, dtype=np.uint8)
buffers.append(buffer)
buffer_ptrs.append(buffer.ctypes.data)
store.register_buffer(buffer.ctypes.data, buffer_size)
# Batch retrieve
sizes = [buffer_size] * len(keys)
results = store.batch_get_into(keys, buffer_ptrs, sizes)
# Check results
for key, result in zip(keys, results):
if result > 0:
print(f"Retrieved {key}: {result} bytes")
else:
print(f"Failed to retrieve {key}: error {result}")
# Cleanup
for ptr in buffer_ptrs:
store.unregister_buffer(ptr)
batch_put_from_multi_buffers()#
Store multiple objects from multiple pre-registered buffers (zero-copy).
def batch_put_from_multi_buffers(self, keys: List[str], all_buffer_ptrs: List[List[int]], all_sizes: List[List[int]],
config: ReplicateConfig = None) -> List[int]
Parameters:
keys(List[str]): List of object identifiersall_buffer_ptrs(List[int]): all List of memory addressessizes(List[int]): all List of buffer sizesconfig(ReplicateConfig, optional): Replication configuration
Returns:
List[int]: List of status codes for each operation (0 = success, negative = error)
batch_get_into_multi_buffers()#
Retrieve multiple objects into multiple pre-registered buffers (zero-copy).
def batch_get_into_multi_buffers(self, keys: List[str], all_buffer_ptrs: List[List[int], all_sizes: List[List[int]) ->
List[int]
Parameters:
keys(List[str]): List of object identifiersall_buffer_ptrs(List[int]): List of memory addressesall_sizes(List[int]): List of buffer sizes
Returns:
List[int]: List of bytes read for each operation (positive = success, negative = error)
⚠️ Buffer Registration Required: All buffers must be registered before batch zero-copy operations.
Example:
Click to expand: Batch zero-copy put and get for multiple buffers example
tensor = torch.ones(10, 61, 128*1024, dtype=torch.int8)
data_ptr = tensor.data_ptr()
store.register_buffer(data_ptr, 10*61*128*1024)
target_tensor = torch.zeros(10, 61, 128*1024, dtype=torch.int8)
target_data_ptr = target_tensor.data_ptr()
store.register_buffer(target_data_ptr, 10*61*128*1024)
all_local_addrs = []
all_remote_addrs = []
all_sizes = []
keys = []
for block_i in range(10):
local_addrs = []
remote_addrs = []
sizes = []
for _ in range(61):
local_addrs.append(data_ptr)
remote_addrs.append(target_data_ptr)
sizes.append(128*1024)
data_ptr += 128*1024
target_data_ptr += 128*1024
all_local_addrs.append(local_addrs)
all_remote_addrs.append(remote_addrs)
all_sizes.append(sizes)
keys.append(f"kv_{rank}_{block_i}")
config = ReplicateConfig()
config.prefer_alloc_in_same_node = True
store.batch_put_from_multi_buffers(keys, all_local_addrs, all_sizes, config)
store.batch_get_into_multi_buffers(keys, all_remote_addrs, all_sizes, True)
store.unregister_buffer(tensor.data_ptr())
store.unregister_buffer(target_tensor.data_ptr())
MooncakeHostMemAllocator Class#
The MooncakeHostMemAllocator class provides host memory allocation capabilities for Mooncake Store operations.
Class Definition#
from mooncake.store import MooncakeHostMemAllocator
# Create an allocator instance
allocator = MooncakeHostMemAllocator()
Methods#
alloc()#
Allocate memory from the host memory pool.
def alloc(self, size: int) -> int
Parameters:
size(int): Size of memory to allocate in bytes
Returns:
int: Memory address as integer, or 0 on failure
Example:
allocator = MooncakeHostMemAllocator()
ptr = allocator.alloc(1024 * 1024) # Allocate 1MB
if ptr != 0:
print(f"Allocated memory at address: {ptr}")
free()#
Free previously allocated memory.
def free(self, ptr: int) -> int
Parameters:
ptr(int): Memory address to free
Returns:
int: Status code (0 = success, non-zero = error code)
Example:
result = allocator.free(ptr)
if result == 0:
print("Memory freed successfully")
bind_to_numa_node Function#
The bind_to_numa_node function binds the current thread and memory allocation preference to a specified NUMA node.
Function Definition#
from mooncake.store import bind_to_numa_node
# Bind to NUMA node
bind_to_numa_node(node: int)
Parameters:
node(int): NUMA node number to bind to
Example:
from mooncake.store import bind_to_numa_node
# Bind current thread to NUMA node 0
bind_to_numa_node(0)
Error Handling#
Most methods return integer status codes:
0: SuccessNegative values: Error codes (for methods that can return data size)
For methods that return data (get, get_batch, get_buffer, get_tensor):
Return the requested data on success
Return empty/None on failure or key not found
📋 Complete Error Codes Reference: See Error Code Explanation for detailed descriptions of all error codes and their meanings.
Performance Tips#
Use batch operations when working with multiple objects to reduce network overhead
Use zero-copy APIs (
put_from,get_into) for large data transfersRegister buffers once and reuse them for multiple operations
Configure replication appropriately - more replicas provide better availability but use more storage
Use soft pinning for frequently accessed objects to keep them in memory
Choose RDMA protocol when available for maximum performance