Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gateware.usb.request.windows: add MicrosoftOS10RequestHandler #251

Merged
merged 1 commit into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 138 additions & 0 deletions luna/gateware/usb/request/windows/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
#
# This file is part of LUNA.
#
# Copyright (c) 2024 Great Scott Gadgets <[email protected]>
# SPDX-License-Identifier: BSD-3-Clause

from amaranth import Module, Signal

from usb_protocol.emitters.descriptors.microsoft10 import MicrosoftOS10DescriptorCollection
from usb_protocol.types import USBRequestRecipient, USBRequestType

from ...usb2.request import USBRequestHandler
from .ms_descriptor import GetMicrosoftDescriptorHandlerBlock


class MicrosoftOS10RequestHandler(USBRequestHandler):
""" A platform-specific handler for Microsoft OS 1.0 requests.

Parameters
----------
descriptors: MicrosoftOS10DescriptorCollection
A collection of the platform-specific descriptors to respond to Windows with as requested.
request_code:
Request value defined in the device OS string descriptor (0xEE). This is the byte after 'MSFT100'.
Also called bMS_VendorCode in Microsoft OS 1.0 descriptor specification.
max_packet_size
The maximum packet size for the endpoint associated with this handler.
"""
def __init__(self, descriptors: MicrosoftOS10DescriptorCollection, request_code=0xee, max_packet_size=64):
self.descriptors = descriptors
self._request_code = request_code
self._max_packet_size = max_packet_size

super().__init__()

def elaborate(self, platform):
m = Module()

# Create convenience aliases for our interface components.
interface = self.interface
setup = interface.setup
handshake_generator = interface.handshakes_out
tx = interface.tx

# Handler for GET_DESCRIPTOR_SET requests.
m.submodules.ms_descriptor_handler = ms_descriptor_handler = \
GetMicrosoftDescriptorHandlerBlock(self.descriptors)
m.d.comb += [
ms_descriptor_handler.index .eq(setup.index),
ms_descriptor_handler.length .eq(setup.length),
]

#
# Handlers.
#
with m.If(
(setup.type == USBRequestType.VENDOR) &
(setup.recipient == USBRequestRecipient.DEVICE) &
((setup.index == 4) | (setup.index == 5))
):
m.d.comb += interface.claim.eq(1)

with m.FSM(domain='usb'):

# IDLE -- not handling any active request
with m.State('IDLE'):

m.d.usb += [
# Start at the beginning of our next / fresh GET_DESCRIPTOR_SET request.
ms_descriptor_handler.start_position .eq(0),

# Always start our responses with DATA1 pids, per [USB 2.0: 8.5.3].
interface.tx_data_pid .eq(1)
]

# If we've received a new setup packet, handle it.
with m.If(setup.received):

with m.Switch(setup.request):

with m.Case(self._request_code):
m.next = 'GET_MS_DESCRIPTOR'
with m.Default():
m.next = 'UNHANDLED'


# GET_MS_DESCRIPTOR -- The host is trying to request a OS Feature descriptor set
with m.State('GET_MS_DESCRIPTOR'):
# Keep track of whether we've sent a packet we're expecting an ACK to.
expecting_ack = Signal()

m.d.comb += [
ms_descriptor_handler.tx .attach(tx),
handshake_generator.stall .eq(ms_descriptor_handler.stall),
]

with m.If(interface.data_requested):
m.d.comb += ms_descriptor_handler.start.eq(1)
m.d.usb += expecting_ack.eq(1)

# Each time we receive an ACK, advance in our descriptor.
# This allows us to send descriptors with >64B of content.
with m.If(interface.handshakes_in.ack & expecting_ack):

next_start_position = ms_descriptor_handler.start_position + self._max_packet_size
m.d.usb += [
# We've received an ACK; so mark the section we've sent of the descriptor as
# received, and move forward...
ms_descriptor_handler.start_position .eq(next_start_position),

# ... and toggle our data PID.
self.interface.tx_data_pid .eq(~self.interface.tx_data_pid),

# We've got the ACK we expected.
expecting_ack .eq(0),
]

# ... and ACK our status stage.
with m.If(interface.status_requested):
m.d.comb += handshake_generator.ack.eq(1)
m.next = 'IDLE'

# If the requested descriptor doesn't exist, the request is terminated by STALLing the data stage.
with m.Elif(ms_descriptor_handler.stall):
m.d.usb += expecting_ack.eq(0)
m.next = 'IDLE'


# UNHANDLED -- we've received a request we're not prepared to handle
with m.State('UNHANDLED'):

# When we next have an opportunity to stall, do so,
# and then return to idle.
with m.If(interface.data_requested | interface.status_requested):
m.d.comb += handshake_generator.stall.eq(1)
m.next = 'IDLE'

return m
Loading