Skip to content

vllm.distributed.kv_transfer.kv_connector.v1.moriio.moriio_common

EngineId module-attribute

EngineId = str

ReqId module-attribute

ReqId = str

Transfer module-attribute

Transfer = tuple[int, float]

logger module-attribute

logger = init_logger(__name__)

HandshakeError

Bases: MoRIIOError

Exception raised when handshake fails.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py
class HandshakeError(MoRIIOError):
    """Exception raised when handshake fails."""

    pass

LayerTransferPlan dataclass

Plan for transferring a single layer.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py
@dataclass
class LayerTransferPlan:
    """Plan for transferring a single layer."""

    request_id: str
    layer_name: str
    sess_idx: int
    transfer_local_offsets: list[int]
    transfer_remote_offsets: list[int]
    transfer_sizes: list[int]
    use_batch: bool = True

layer_name instance-attribute

layer_name: str

request_id instance-attribute

request_id: str

sess_idx instance-attribute

sess_idx: int

transfer_local_offsets instance-attribute

transfer_local_offsets: list[int]

transfer_remote_offsets instance-attribute

transfer_remote_offsets: list[int]

transfer_sizes instance-attribute

transfer_sizes: list[int]

use_batch class-attribute instance-attribute

use_batch: bool = True

__init__

__init__(
    request_id: str,
    layer_name: str,
    sess_idx: int,
    transfer_local_offsets: list[int],
    transfer_remote_offsets: list[int],
    transfer_sizes: list[int],
    use_batch: bool = True,
) -> None

MoRIIOAgentMetadata

Bases: Struct

Source code in vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py
class MoRIIOAgentMetadata(
    msgspec.Struct,
    omit_defaults=True,  # type: ignore[call-arg]
    # required for @cached_property.d
    dict=True,
):
    engine_id: str
    agent_metadata: bytes
    kv_caches_base_addr: list[int]
    num_blocks: int
    block_len: int
    attn_backend_name: str

agent_metadata instance-attribute

agent_metadata: bytes

attn_backend_name instance-attribute

attn_backend_name: str

block_len instance-attribute

block_len: int

engine_id instance-attribute

engine_id: str

kv_caches_base_addr instance-attribute

kv_caches_base_addr: list[int]

num_blocks instance-attribute

num_blocks: int

MoRIIOConfig dataclass

Source code in vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py
@dataclass
class MoRIIOConfig:
    local_ip: str
    local_kv_port: int
    proxy_ip: str
    local_ping_port: int
    proxy_ping_port: int
    http_port: int
    handshake_port: int
    notify_port: int
    tp_rank: int
    dp_rank: int
    dp_size: int
    tp_size: int

    @classmethod
    def from_vllm_config(cls, vllm_config: VllmConfig) -> "MoRIIOConfig":
        # Port Configuration:
        # local_ping_port   -> Outgoing heartbeat to proxy
        # proxy_ping_port   -> Remote proxy's heartbeat ingress port
        # http_port         -> Instance's HTTP service endpoint
        # local_kv_port     -> service port for mori engine
        # notify_port       -> For synchronizing stages between prefill and decode
        # handshake_port    -> For initial handshake between mori engine

        # TODO : merge notify_port and handshake_port to simplify port management
        #        supports non-contiguous ports
        assert vllm_config.kv_transfer_config is not None, (
            "kv_transfer_config must be set for MoRIIOConnector"
        )
        kv_transfer_config = vllm_config.kv_transfer_config
        extra_config = kv_transfer_config.kv_connector_extra_config
        tp_rank = get_tensor_model_parallel_rank()
        dp_rank = vllm_config.parallel_config.data_parallel_rank
        base_notify_port = int(extra_config["notify_port"])
        dp_size = vllm_config.parallel_config.data_parallel_size
        tp_size = get_tensor_model_parallel_world_size()
        port_offset = get_port_offset(dp_rank, tp_rank)

        return cls(
            local_ip=get_ip(),
            local_kv_port=get_open_port(),
            proxy_ip=extra_config["proxy_ip"],
            local_ping_port=get_open_port(),
            proxy_ping_port=int(extra_config["proxy_ping_port"]),
            http_port=int(extra_config["http_port"]),
            handshake_port=int(extra_config["handshake_port"]),
            notify_port=base_notify_port + port_offset,
            tp_rank=tp_rank,
            dp_rank=dp_rank,
            dp_size=dp_size,
            tp_size=tp_size,
        )

dp_rank instance-attribute

dp_rank: int

dp_size instance-attribute

dp_size: int

handshake_port instance-attribute

handshake_port: int

http_port instance-attribute

http_port: int

local_ip instance-attribute

local_ip: str

local_kv_port instance-attribute

local_kv_port: int

local_ping_port instance-attribute

local_ping_port: int

notify_port instance-attribute

notify_port: int

proxy_ip instance-attribute

proxy_ip: str

proxy_ping_port instance-attribute

proxy_ping_port: int

tp_rank instance-attribute

tp_rank: int

tp_size instance-attribute

tp_size: int

__init__

__init__(
    local_ip: str,
    local_kv_port: int,
    proxy_ip: str,
    local_ping_port: int,
    proxy_ping_port: int,
    http_port: int,
    handshake_port: int,
    notify_port: int,
    tp_rank: int,
    dp_rank: int,
    dp_size: int,
    tp_size: int,
) -> None

from_vllm_config classmethod

from_vllm_config(vllm_config: VllmConfig) -> MoRIIOConfig
Source code in vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py
@classmethod
def from_vllm_config(cls, vllm_config: VllmConfig) -> "MoRIIOConfig":
    # Port Configuration:
    # local_ping_port   -> Outgoing heartbeat to proxy
    # proxy_ping_port   -> Remote proxy's heartbeat ingress port
    # http_port         -> Instance's HTTP service endpoint
    # local_kv_port     -> service port for mori engine
    # notify_port       -> For synchronizing stages between prefill and decode
    # handshake_port    -> For initial handshake between mori engine

    # TODO : merge notify_port and handshake_port to simplify port management
    #        supports non-contiguous ports
    assert vllm_config.kv_transfer_config is not None, (
        "kv_transfer_config must be set for MoRIIOConnector"
    )
    kv_transfer_config = vllm_config.kv_transfer_config
    extra_config = kv_transfer_config.kv_connector_extra_config
    tp_rank = get_tensor_model_parallel_rank()
    dp_rank = vllm_config.parallel_config.data_parallel_rank
    base_notify_port = int(extra_config["notify_port"])
    dp_size = vllm_config.parallel_config.data_parallel_size
    tp_size = get_tensor_model_parallel_world_size()
    port_offset = get_port_offset(dp_rank, tp_rank)

    return cls(
        local_ip=get_ip(),
        local_kv_port=get_open_port(),
        proxy_ip=extra_config["proxy_ip"],
        local_ping_port=get_open_port(),
        proxy_ping_port=int(extra_config["proxy_ping_port"]),
        http_port=int(extra_config["http_port"]),
        handshake_port=int(extra_config["handshake_port"]),
        notify_port=base_notify_port + port_offset,
        tp_rank=tp_rank,
        dp_rank=dp_rank,
        dp_size=dp_size,
        tp_size=tp_size,
    )

MoRIIOConnectorMetadata

Bases: KVConnectorMetadata

Source code in vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py
class MoRIIOConnectorMetadata(KVConnectorMetadata):
    def __init__(self):
        self.reqs_to_recv: dict[ReqId, ReqMeta] = {}
        self.reqs_to_save: dict[ReqId, ReqMeta] = {}
        self.reqs_to_send: dict[ReqId, float] = {}

    def __repr__(self):
        return_str = ""
        for req_id, req_meta in self.reqs_to_recv.items():
            return_str += (
                f"{req_id = },{req_meta.local_block_ids = },"
                f"{req_meta.remote_host = },{req_meta.remote_port = }"
                f"{req_meta.remote_engine_id = },{req_meta.tp_size = }"
            )
        return_str = f"MoRIIOConnectorMetadata:reqs_to_recv:{return_str},"

        for req_id, expiry in self.reqs_to_send.items():
            return_str += f"{req_id = },{expiry = }"
        return_str = f"MoRIIOConnectorMetadata:reqs_to_send:{return_str},"
        return return_str

    def add_new_req(
        self,
        request_id: ReqId,
        local_block_ids: list[int],
        kv_transfer_params: dict[str, Any],
        write_mode=False,
    ):
        _req = ReqMeta(
            local_block_ids=local_block_ids,
            remote_block_ids=kv_transfer_params["remote_block_ids"],
            remote_engine_id=kv_transfer_params["remote_engine_id"],
            remote_host=kv_transfer_params["remote_host"],
            remote_port=kv_transfer_params["remote_port"],
            remote_handshake_port=kv_transfer_params["remote_handshake_port"],
            remote_notify_port=kv_transfer_params["remote_notify_port"],
            tp_size=kv_transfer_params.get("tp_size", 1),
            remote_dp_size=kv_transfer_params.get("remote_dp_size", 1),
        )
        if write_mode:
            self.reqs_to_save[request_id] = _req
        else:
            self.reqs_to_recv[request_id] = _req

reqs_to_recv instance-attribute

reqs_to_recv: dict[ReqId, ReqMeta] = {}

reqs_to_save instance-attribute

reqs_to_save: dict[ReqId, ReqMeta] = {}

reqs_to_send instance-attribute

reqs_to_send: dict[ReqId, float] = {}

__init__

__init__()
Source code in vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py
def __init__(self):
    self.reqs_to_recv: dict[ReqId, ReqMeta] = {}
    self.reqs_to_save: dict[ReqId, ReqMeta] = {}
    self.reqs_to_send: dict[ReqId, float] = {}

__repr__

__repr__()
Source code in vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py
def __repr__(self):
    return_str = ""
    for req_id, req_meta in self.reqs_to_recv.items():
        return_str += (
            f"{req_id = },{req_meta.local_block_ids = },"
            f"{req_meta.remote_host = },{req_meta.remote_port = }"
            f"{req_meta.remote_engine_id = },{req_meta.tp_size = }"
        )
    return_str = f"MoRIIOConnectorMetadata:reqs_to_recv:{return_str},"

    for req_id, expiry in self.reqs_to_send.items():
        return_str += f"{req_id = },{expiry = }"
    return_str = f"MoRIIOConnectorMetadata:reqs_to_send:{return_str},"
    return return_str

add_new_req

add_new_req(
    request_id: ReqId,
    local_block_ids: list[int],
    kv_transfer_params: dict[str, Any],
    write_mode=False,
)
Source code in vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py
def add_new_req(
    self,
    request_id: ReqId,
    local_block_ids: list[int],
    kv_transfer_params: dict[str, Any],
    write_mode=False,
):
    _req = ReqMeta(
        local_block_ids=local_block_ids,
        remote_block_ids=kv_transfer_params["remote_block_ids"],
        remote_engine_id=kv_transfer_params["remote_engine_id"],
        remote_host=kv_transfer_params["remote_host"],
        remote_port=kv_transfer_params["remote_port"],
        remote_handshake_port=kv_transfer_params["remote_handshake_port"],
        remote_notify_port=kv_transfer_params["remote_notify_port"],
        tp_size=kv_transfer_params.get("tp_size", 1),
        remote_dp_size=kv_transfer_params.get("remote_dp_size", 1),
    )
    if write_mode:
        self.reqs_to_save[request_id] = _req
    else:
        self.reqs_to_recv[request_id] = _req

MoRIIOConstants

Constants for MoRIIO connector.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py
class MoRIIOConstants:
    """Constants for MoRIIO connector."""

    # ZMQ message types
    GET_META_MSG = b"get_meta_msg"
    POP_DONE_RECV = b"pop_done_recv"
    OVER = b"OVER"
    COMPLETION_PREFIX = "cmpl"

    PING_INTERVAL = 5
    MAX_PING_RETRIES = 100
    DEFAULT_HANDSHAKE_PORT = "6301"
    DEFAULT_NOTIFY_PORT = "61005"

    VLLM_MORI_READ_ABORT_REQUEST_TIMEOUT = 3600

COMPLETION_PREFIX class-attribute instance-attribute

COMPLETION_PREFIX = 'cmpl'

DEFAULT_HANDSHAKE_PORT class-attribute instance-attribute

DEFAULT_HANDSHAKE_PORT = '6301'

DEFAULT_NOTIFY_PORT class-attribute instance-attribute

DEFAULT_NOTIFY_PORT = '61005'

GET_META_MSG class-attribute instance-attribute

GET_META_MSG = b'get_meta_msg'

MAX_PING_RETRIES class-attribute instance-attribute

MAX_PING_RETRIES = 100

OVER class-attribute instance-attribute

OVER = b'OVER'

PING_INTERVAL class-attribute instance-attribute

PING_INTERVAL = 5

POP_DONE_RECV class-attribute instance-attribute

POP_DONE_RECV = b'pop_done_recv'

VLLM_MORI_READ_ABORT_REQUEST_TIMEOUT class-attribute instance-attribute

VLLM_MORI_READ_ABORT_REQUEST_TIMEOUT = 3600

MoRIIOError

Bases: Exception

Base exception for MoRIIO operations.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py
class MoRIIOError(Exception):
    """Base exception for MoRIIO operations."""

    pass

MoRIIOMode

Bases: Enum

Source code in vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py
class MoRIIOMode(Enum):
    READ = "read"
    WRITE = "write"

READ class-attribute instance-attribute

READ = 'read'

WRITE class-attribute instance-attribute

WRITE = 'write'

ROLE

Bases: Enum

Source code in vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py
class ROLE(Enum):
    PRODUCER = "producer"
    CONSUMER = "consumer"
    NOTINIT = "notinit"

CONSUMER class-attribute instance-attribute

CONSUMER = 'consumer'

NOTINIT class-attribute instance-attribute

NOTINIT = 'notinit'

PRODUCER class-attribute instance-attribute

PRODUCER = 'producer'

RemoteAllocInfo dataclass

Information about remote block allocation.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py
@dataclass
class RemoteAllocInfo:
    """Information about remote block allocation."""

    block_ids: list[int]
    writes_done: int = 0
    decode_dp_rank: int = 0
    transfer_offset: tuple[list[int], list[int], list[int]] | None = None

block_ids instance-attribute

block_ids: list[int]

decode_dp_rank class-attribute instance-attribute

decode_dp_rank: int = 0

transfer_offset class-attribute instance-attribute

transfer_offset: (
    tuple[list[int], list[int], list[int]] | None
) = None

writes_done class-attribute instance-attribute

writes_done: int = 0

__init__

__init__(
    block_ids: list[int],
    writes_done: int = 0,
    decode_dp_rank: int = 0,
    transfer_offset: tuple[list[int], list[int], list[int]]
    | None = None,
) -> None

ReqMeta dataclass

Metadata for a single request.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py
@dataclass
class ReqMeta:
    """Metadata for a single request."""

    local_block_ids: list[int]
    remote_block_ids: list[int]
    remote_host: str
    remote_port: int
    remote_handshake_port: int
    remote_notify_port: int
    remote_engine_id: str
    tp_size: int
    remote_dp_size: int

local_block_ids instance-attribute

local_block_ids: list[int]

remote_block_ids instance-attribute

remote_block_ids: list[int]

remote_dp_size instance-attribute

remote_dp_size: int

remote_engine_id instance-attribute

remote_engine_id: str

remote_handshake_port instance-attribute

remote_handshake_port: int

remote_host instance-attribute

remote_host: str

remote_notify_port instance-attribute

remote_notify_port: int

remote_port instance-attribute

remote_port: int

tp_size instance-attribute

tp_size: int

__init__

__init__(
    local_block_ids: list[int],
    remote_block_ids: list[int],
    remote_host: str,
    remote_port: int,
    remote_handshake_port: int,
    remote_notify_port: int,
    remote_engine_id: str,
    tp_size: int,
    remote_dp_size: int,
) -> None

RoleManager

Manages role state across the connector.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py
class RoleManager:
    """Manages role state across the connector."""

    _instance: "RoleManager | None" = None
    _lock = threading.Lock()

    def __init__(self) -> None:
        self._role: ROLE = ROLE.NOTINIT

    @classmethod
    def get_instance(cls) -> "RoleManager":
        if cls._instance is None:
            with cls._lock:
                if cls._instance is None:
                    cls._instance = cls()
        return cls._instance

    def set_role(self, role: ROLE) -> None:
        """Set the current role."""
        with self._lock:
            self._role = role

    def get_role(self) -> ROLE:
        """Get the current role."""
        return self._role

_instance class-attribute instance-attribute

_instance: RoleManager | None = None

_lock class-attribute instance-attribute

_lock = Lock()

_role instance-attribute

_role: ROLE = NOTINIT

__init__

__init__() -> None
Source code in vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py
def __init__(self) -> None:
    self._role: ROLE = ROLE.NOTINIT

get_instance classmethod

get_instance() -> RoleManager
Source code in vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py
@classmethod
def get_instance(cls) -> "RoleManager":
    if cls._instance is None:
        with cls._lock:
            if cls._instance is None:
                cls._instance = cls()
    return cls._instance

get_role

get_role() -> ROLE

Get the current role.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py
def get_role(self) -> ROLE:
    """Get the current role."""
    return self._role

set_role

set_role(role: ROLE) -> None

Set the current role.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py
def set_role(self, role: ROLE) -> None:
    """Set the current role."""
    with self._lock:
        self._role = role

TransferError

Bases: MoRIIOError

Exception raised when transfer fails.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py
class TransferError(MoRIIOError):
    """Exception raised when transfer fails."""

    pass

WriteTask dataclass

Source code in vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py
@dataclass
class WriteTask:
    request_id: str
    dst_engine_id: str
    local_block_ids: list[int]
    remote_block_ids_hint: list[int] | None
    layer_name: str
    event: torch.cuda.Event
    remote_notify_port: int
    remote_ip: str
    enqueue_time: float = field(default_factory=time.perf_counter)
    retried: int = 0

dst_engine_id instance-attribute

dst_engine_id: str

enqueue_time class-attribute instance-attribute

enqueue_time: float = field(default_factory=perf_counter)

event instance-attribute

event: Event

layer_name instance-attribute

layer_name: str

local_block_ids instance-attribute

local_block_ids: list[int]

remote_block_ids_hint instance-attribute

remote_block_ids_hint: list[int] | None

remote_ip instance-attribute

remote_ip: str

remote_notify_port instance-attribute

remote_notify_port: int

request_id instance-attribute

request_id: str

retried class-attribute instance-attribute

retried: int = 0

__init__

__init__(
    request_id: str,
    dst_engine_id: str,
    local_block_ids: list[int],
    remote_block_ids_hint: list[int] | None,
    layer_name: str,
    event: Event,
    remote_notify_port: int,
    remote_ip: str,
    enqueue_time: float = perf_counter(),
    retried: int = 0,
) -> None

get_moriio_mode

get_moriio_mode() -> MoRIIOMode
Source code in vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py
def get_moriio_mode() -> MoRIIOMode:
    read_mode = envs.VLLM_MORIIO_CONNECTOR_READ_MODE
    logger.debug("MoRIIO Connector read_mode: %s", read_mode)
    if read_mode:
        return MoRIIOMode.READ
    else:
        return MoRIIOMode.WRITE

get_port_offset

get_port_offset(
    dp_rank: int, tp_rank: int, tp_size: int = 1
) -> int
Source code in vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py
def get_port_offset(dp_rank: int, tp_rank: int, tp_size: int = 1) -> int:
    return (dp_rank) * tp_size + tp_rank

get_role

get_role() -> ROLE

Get the global role.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py
def get_role() -> ROLE:
    """Get the global role."""
    return RoleManager.get_instance().get_role()

set_role

set_role(role: ROLE)

Set the global role.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py
def set_role(role: ROLE):
    """Set the global role."""
    RoleManager.get_instance().set_role(role)

zmq_ctx

zmq_ctx(socket_type: Any, addr: str) -> Iterator[Socket]

Context manager for a ZMQ socket

Source code in vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py
@contextlib.contextmanager
def zmq_ctx(socket_type: Any, addr: str) -> Iterator[zmq.Socket]:
    """Context manager for a ZMQ socket"""

    if socket_type not in (zmq.ROUTER, zmq.REQ, zmq.DEALER):
        raise ValueError(f"Unexpected socket type: {socket_type}")

    ctx: zmq.Context | None = None
    try:
        ctx = zmq.Context()  # type: ignore[attr-defined]
        yield make_zmq_socket(
            ctx=ctx, path=addr, socket_type=socket_type, bind=socket_type == zmq.ROUTER
        )
    finally:
        if ctx is not None:
            ctx.destroy(linger=0)