decouple the transformer and decoder#940
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a significant architectural change by decoupling the transformer and decoder into separate services. The data processing pipeline now operates in two distinct phases, managed by an enhanced Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request aims to decouple the transformer and decoder stages through a two-phase data transfer model, which is a significant architectural improvement. However, a critical vulnerability has been identified: the DataManager lacks authentication and proper validation of remote endpoints and memory addresses, leading to potential Server-Side Request Forgery (SSRF) and unauthorized remote memory corruption via RDMA. Additionally, the review highlights areas for improvement concerning significant code duplication within the DataManager class and potential race conditions in thread startup sequences in example service files.
| ) = self.server_socket.recv_multipart() | ||
| if bootstrap_room.decode("ascii") == "None": | ||
| continue | ||
| endpoint = endpoint.decode("ascii") | ||
| mooncake_session_id = mooncake_session_id.decode("ascii") | ||
| bootstrap_room = int(bootstrap_room.decode("ascii")) | ||
| decode_ptrs = list(struct.unpack(f"{len(decode_ptrs)//8}Q", decode_ptrs)) | ||
| logger.info( | ||
| "Transformer received ZMQ: endpoint=%s session_id=%s room=%s decode_ptrs=%s", | ||
| endpoint, | ||
| mooncake_session_id, | ||
| bootstrap_room, | ||
| decode_ptrs, | ||
| ) | ||
| self.waiting_pool[bootstrap_room] = ( | ||
| endpoint, | ||
| mooncake_session_id, | ||
| decode_ptrs, | ||
| ) | ||
| self.transfer_event.set() | ||
|
|
||
| threading.Thread(target=transformer_thread).start() | ||
|
|
||
| def transfer_thread(): | ||
| while True: | ||
| self.transfer_event.wait() | ||
| self.transfer_event.clear() | ||
| bootstrap_room_ready = self.request_pool.keys() | ||
| bootstrap_room_request = self.waiting_pool.keys() | ||
| for room in list(bootstrap_room_request): | ||
| if room not in list(bootstrap_room_ready): | ||
| continue | ||
| status = DataPoll.Transferring | ||
| self.request_status[room] = status | ||
| ( | ||
| endpoint, | ||
| mooncake_session_id, | ||
| decode_ptrs, | ||
| ) = self.waiting_pool.pop(room) | ||
| self.sync_status_to_transformer_endpoint(endpoint, room) | ||
| transformer_data_ptrs = self.request_pool.pop(room) | ||
| ret = self.send_data( | ||
| mooncake_session_id, | ||
| transformer_data_ptrs, | ||
| decode_ptrs, | ||
| ) |
There was a problem hiding this comment.
The DataManager class receives a mooncake_session_id and remote memory addresses (decode_ptrs) from an unauthenticated ZMQ PULL socket (line 249). These values are passed directly to the Mooncake transfer engine's transfer_sync_write method via send_data (lines 290-294). This allows an unauthenticated attacker to trigger RDMA writes to arbitrary memory locations on any node whose Mooncake session ID they can guess or obtain. This could lead to unauthorized data modification or memory corruption on other nodes in the cluster.
| ) = self.server_socket.recv_multipart() | ||
| if bootstrap_room.decode("ascii") == "None": | ||
| continue | ||
| endpoint = endpoint.decode("ascii") | ||
| mooncake_session_id = mooncake_session_id.decode("ascii") | ||
| bootstrap_room = int(bootstrap_room.decode("ascii")) | ||
| decode_ptrs = list(struct.unpack(f"{len(decode_ptrs)//8}Q", decode_ptrs)) | ||
| logger.info( | ||
| "Transformer received ZMQ: endpoint=%s session_id=%s room=%s decode_ptrs=%s", | ||
| endpoint, | ||
| mooncake_session_id, | ||
| bootstrap_room, | ||
| decode_ptrs, | ||
| ) | ||
| self.waiting_pool[bootstrap_room] = ( | ||
| endpoint, | ||
| mooncake_session_id, | ||
| decode_ptrs, | ||
| ) | ||
| self.transfer_event.set() | ||
|
|
||
| threading.Thread(target=transformer_thread).start() | ||
|
|
||
| def transfer_thread(): | ||
| while True: | ||
| self.transfer_event.wait() | ||
| self.transfer_event.clear() | ||
| bootstrap_room_ready = self.request_pool.keys() | ||
| bootstrap_room_request = self.waiting_pool.keys() | ||
| for room in list(bootstrap_room_request): | ||
| if room not in list(bootstrap_room_ready): | ||
| continue | ||
| status = DataPoll.Transferring | ||
| self.request_status[room] = status | ||
| ( | ||
| endpoint, | ||
| mooncake_session_id, | ||
| decode_ptrs, | ||
| ) = self.waiting_pool.pop(room) | ||
| self.sync_status_to_transformer_endpoint(endpoint, room) |
There was a problem hiding this comment.
The DataManager class is vulnerable to Server-Side Request Forgery (SSRF) because it uses an unauthenticated endpoint string from a ZMQ PULL socket (line 249) to initiate connections to arbitrary hosts in sync_status_to_transformer_endpoint (line 288). This allows attackers to probe internal networks or exhaust resources via @cache. This vulnerability is present within duplicated thread starter methods like start_phase2_transformer_thread and start_phase1_encode_thread. Consolidating these into generic private methods, such as _start_sender_thread and _start_receiver_thread, would centralize the logic for implementing crucial authentication and validation for the endpoint.
| if self.disaggregation_phase == DisaggregationPhase.PHASE1: | ||
| if self.disaggregation_mode == DisaggregationMode.ENCODE: | ||
| self.waiting_pool: WaitingPoolType = {} | ||
| self.transfer_event = threading.Event() | ||
| self.start_phase1_encode_thread() | ||
| elif self.disaggregation_mode == DisaggregationMode.TRANSFORMER: | ||
| self.start_phase1_transformer_thread() | ||
| else: | ||
| raise ValueError(f"Unsupported DisaggregationMode in this phase: {self.disaggregation_phase}, {self.disaggregation_mode}") | ||
| elif self.disaggregation_phase == DisaggregationPhase.PHASE2: | ||
| if self.disaggregation_mode == DisaggregationMode.TRANSFORMER: | ||
| self.waiting_pool: WaitingPoolType = {} | ||
| self.transfer_event = threading.Event() | ||
| self.start_phase2_transformer_thread() # TODO: start_p2_transformer_thread | ||
| elif self.disaggregation_mode == DisaggregationMode.DECODE: | ||
| self.start_phase2_decode_thread() # TODO: start_p2_decode_thread | ||
| else: | ||
| raise ValueError(f"Unsupported DisaggregationMode in this phase: {self.disaggregation_phase}, {self.disaggregation_mode}") | ||
| else: | ||
| raise ValueError(f"Unsupported DisaggregationMode: {self.disaggregation_mode}") | ||
| raise ValueError(f"Unsupported DisaggregationPhase: {self.disaggregation_phase}") |
There was a problem hiding this comment.
The initialization of self.waiting_pool and self.transfer_event is duplicated for sender modes (PHASE1/ENCODE and PHASE2/TRANSFORMER). This could be consolidated by first determining if the current configuration is a 'sender' and then performing the initialization once to improve maintainability and reduce code repetition.
No description provided.