diff --git a/controller/Dockerfile.windows-native b/controller/Dockerfile.windows-native index 6c6c47d1ce..db781a0231 100644 --- a/controller/Dockerfile.windows-native +++ b/controller/Dockerfile.windows-native @@ -3,24 +3,47 @@ # buildx targets, and this one requires legacy build. # Maybe one day: https://github.com/moby/buildkit/issues/616 ARG BUILDER_IMAGE + +FROM --platform=windows/amd64 ${BUILDER_IMAGE} as pktmon-builder +WORKDIR C:\\retina + # mcr.microsoft.com/oss/go/microsoft/golang:1.23.1-windowsservercore-ltsc2022 FROM --platform=windows/amd64 mcr.microsoft.com/oss/go/microsoft/golang@sha256:e2d55093522b5f4a311494255d0598145b1f13da5ae2354a09c7f7c1355f3ad9 AS builder WORKDIR C:\\retina COPY go.mod . COPY go.sum . RUN go mod download + + +SHELL ["powershell", "-Command", "$ErrorActionPreference = 'Stop'; $ProgressPreference = 'SilentlyContinue';"] + +RUN [Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12; \ + Invoke-WebRequest -UseBasicParsing -uri "https://github.com/msys2/msys2-installer/releases/download/2024-05-07/msys2-base-x86_64-20240507.sfx.exe" -OutFile msys2.exe; \ + .\msys2.exe -y -oC:\; \ + Remove-Item msys2.exe ; \ + function msys() { C:\msys64\usr\bin\bash.exe @('-lc') + @Args; } \ + msys ' '; \ + msys 'pacman --noconfirm -Syuu'; \ + msys 'pacman --noconfirm -S mingw-w64-x86_64-gcc'; \ + msys 'pacman --noconfirm -Scc'; + +# pure magic: https://github.com/MicrosoftDocs/Virtualization-Documentation/blob/3f7c7ed7ef8d582c74ec740414c54f25bf5850c0/windows-container-samples/golang/Dockerfile#L15C1-L15C179 +RUN setx path "C:\msys64\mingw64\bin" + ADD . . +RUN powershell -Command "Remove-Item -Recurse -Force C:\retina\pkg\plugin\windows\pktmon\stream" +COPY --from=pktmon-builder C:\\pktmon\\stream C:\\retina\\pkg\\plugin\\windows\\pktmon\\stream + ARG VERSION ARG APP_INSIGHTS_ID SHELL ["cmd", "/S", "/C"] ENV VERSION=$VERSION ENV APP_INSIGHTS_ID=$APP_INSIGHTS_ID +ENV CGO_ENABLED=1 RUN go build -v -o controller.exe -ldflags="-X github.com/microsoft/retina/internal/buildinfo.Version=%VERSION% -X github.com/microsoft/retina/internal/buildinfo.ApplicationInsightsID=%APP_INSIGHTS_ID%" .\controller RUN go build -v -o captureworkload.exe -ldflags="-X github.com/microsoft/retina/internal/buildinfo.Version=%VERSION% -X github.com/microsoft/retina/internal/buildinfo.ApplicationInsightsID=%APP_INSIGHTS_ID%" .\captureworkload -FROM --platform=windows/amd64 ${BUILDER_IMAGE} as pktmon-builder -WORKDIR C:\\retina FROM --platform=windows/amd64 mcr.microsoft.com/windows/nanoserver:ltsc2022 AS final ADD https://github.com/microsoft/etl2pcapng/releases/download/v1.10.0/etl2pcapng.exe /etl2pcapng.exe diff --git a/pkg/plugin/windows/pktmon/pktmon_plugin_windows.go b/pkg/plugin/windows/pktmon/pktmon_plugin_windows.go index 21e9b6f3df..4861cb119d 100644 --- a/pkg/plugin/windows/pktmon/pktmon_plugin_windows.go +++ b/pkg/plugin/windows/pktmon/pktmon_plugin_windows.go @@ -2,53 +2,49 @@ package pktmon import ( "context" - "encoding/json" - "fmt" - "os" - "os/exec" "github.com/pkg/errors" - observerv1 "github.com/cilium/cilium/api/v1/observer" + "github.com/cilium/cilium/api/v1/flow" v1 "github.com/cilium/cilium/pkg/hubble/api/v1" + "github.com/google/gopacket" kcfg "github.com/microsoft/retina/pkg/config" "github.com/microsoft/retina/pkg/enricher" "github.com/microsoft/retina/pkg/log" "github.com/microsoft/retina/pkg/metrics" "github.com/microsoft/retina/pkg/plugin/api" + "github.com/microsoft/retina/pkg/plugin/windows/pktmon/stream" "github.com/microsoft/retina/pkg/utils" "go.uber.org/zap" - "go.uber.org/zap/zapio" - "golang.org/x/sync/errgroup" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/status" ) var ( - ErrNilEnricher = errors.New("enricher is nil") - ErrUnexpectedExit = errors.New("unexpected exit") - ErrNilGrpcClient = errors.New("grpc client is nil") - - socket = "/temp/retina-pktmon.sock" + ErrNilEnricher = errors.New("enricher is nil") + ErrNotSupported = errors.New("not supported") ) const ( - Name = "pktmon" - connectionRetryAttempts = 5 - eventChannelSize = 1000 + Name = "pktmon" + eventChannelSize = 1000 + + defaultBufferMultiplier = 10 + defaultTruncationSize = 256 + defaultBufferSize = 9000 ) +type PktMonConn interface { + Initialize() error + PrintAndResetMissedWrite(sessionID string) + PrintAndResetMissedRead(sessionID string) + ParseDNS(fl *flow.Flow, metadata *utils.RetinaMetadata, packet gopacket.Packet) error + GetNextPacket(ctx context.Context) (*flow.Flow, *utils.RetinaMetadata, gopacket.Packet, error) +} + type Plugin struct { enricher enricher.EnricherInterface externalChannel chan *v1.Event l *log.ZapLogger - pktmonCmd *exec.Cmd - stdWriter *zapio.Writer - errWriter *zapio.Writer - - grpcClient *GRPCClient - stream observerv1.Observer_GetFlowsClient + pkt PktMonConn } func (p *Plugin) Init() error { @@ -59,156 +55,17 @@ func (p *Plugin) Name() string { return "pktmon" } -type GRPCClient struct { - observerv1.ObserverClient -} - -func newGRPCClient() (*GRPCClient, error) { - retryPolicy := map[string]any{ - "methodConfig": []map[string]any{ - { - "waitForReady": true, - "retryPolicy": map[string]any{ - "MaxAttempts": connectionRetryAttempts, - "InitialBackoff": ".01s", - "MaxBackoff": ".01s", - "BackoffMultiplier": 1.0, - "RetryableStatusCodes": []string{"UNAVAILABLE"}, - }, - }, - }, - } - - bytes, err := json.Marshal(retryPolicy) - if err != nil { - return nil, errors.Wrapf(err, "failed to marshal retry policy") - } - - retryPolicyStr := string(bytes) - - conn, err := grpc.Dial(fmt.Sprintf("%s:%s", "unix", socket), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(retryPolicyStr)) - if err != nil { - return nil, errors.Wrapf(err, "failed to dial pktmon server:") - } - - return &GRPCClient{observerv1.NewObserverClient(conn)}, nil -} - -func (p *Plugin) RunPktMonServer(ctx context.Context) error { - p.stdWriter = &zapio.Writer{Log: p.l.Logger, Level: zap.InfoLevel} - defer p.stdWriter.Close() - p.errWriter = &zapio.Writer{Log: p.l.Logger, Level: zap.ErrorLevel} - defer p.errWriter.Close() - - pwd, err := os.Getwd() - if err != nil { - return errors.Wrapf(err, "failed to get current working directory for pktmon") - } - - cmd := pwd + "\\" + "controller-pktmon.exe" - - p.pktmonCmd = exec.CommandContext(ctx, cmd) - p.pktmonCmd.Dir = pwd - p.pktmonCmd.Args = append(p.pktmonCmd.Args, "--socketpath", socket) - p.pktmonCmd.Env = os.Environ() - p.pktmonCmd.Stdout = p.stdWriter - p.pktmonCmd.Stderr = p.errWriter - - p.l.Info("calling start on pktmon stream server", zap.String("cmd", p.pktmonCmd.String())) - - // block this thread, and should it ever return, it's a problem - err = p.pktmonCmd.Run() - if err != nil { - return errors.Wrapf(err, "pktmon server exited when it should not have") - } - - // we never want to return happy from this - return errors.Wrapf(ErrUnexpectedExit, "pktmon server exited unexpectedly") -} - func (p *Plugin) Start(ctx context.Context) error { p.enricher = enricher.Instance() if p.enricher == nil { return ErrNilEnricher } - g, ctx := errgroup.WithContext(ctx) + p.pkt = stream.NewWinPktMonStreamer(p.l, defaultTruncationSize, defaultBufferSize, defaultBufferMultiplier) - g.Go(func() error { - err := p.RunPktMonServer(ctx) - if err != nil { - return errors.Wrapf(err, "pktmon server exited") - } - return nil - }) - - err := p.SetupStream() + err := p.pkt.Initialize() if err != nil { - return errors.Wrapf(err, "failed to setup initial pktmon stream") - } - - // run the getflows loop - g.Go(func() error { - for { - err := p.GetFlow(ctx) - if _, ok := status.FromError(err); ok { - p.l.Error("failed to get flow, retriable:", zap.Error(err)) - continue - } - return errors.Wrapf(err, "failed to get flow, unrecoverable") - } - }) - - return g.Wait() -} - -func (p *Plugin) SetupStream() error { - var err error - fn := func() error { - p.l.Info("creating pktmon client") - p.grpcClient, err = newGRPCClient() - if err != nil { - return errors.Wrapf(err, "failed to create pktmon client before getting flows") - } - - return nil - } - err = utils.Retry(fn, connectionRetryAttempts) - if err != nil { - return errors.Wrapf(err, "failed to create pktmon client") - } - - return nil -} - -func (p *Plugin) StartStream(ctx context.Context) error { - if p.grpcClient == nil { - return errors.Wrapf(ErrNilGrpcClient, "unable to start stream") - } - - var err error - fn := func() error { - p.stream, err = p.grpcClient.GetFlows(ctx, &observerv1.GetFlowsRequest{}) - if err != nil { - return errors.Wrapf(err, "failed to open pktmon stream") - } - return nil - } - err = utils.Retry(fn, connectionRetryAttempts) - if err != nil { - return errors.Wrapf(err, "failed to create pktmon client") - } - - return nil -} - -func (p *Plugin) GetFlow(ctx context.Context) error { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - err := p.StartStream(ctx) - if err != nil { - return errors.Wrapf(err, "failed to setup pktmon stream") + return errors.Wrapf(err, "Failed to initialize pktmon") } for { @@ -216,17 +73,26 @@ func (p *Plugin) GetFlow(ctx context.Context) error { case <-ctx.Done(): return errors.Wrapf(ctx.Err(), "pktmon plugin context done") default: - event, err := p.stream.Recv() + fl, meta, packet, err := p.pkt.GetNextPacket(ctx) + if fl == nil { + continue + } + if err != nil { - return errors.Wrapf(err, "failed to receive pktmon event") + p.l.Error("error getting packet", zap.Error(err)) + continue } - fl := event.GetFlow() - if fl == nil { - p.l.Error("received nil flow, flow proto mismatch from client/server?") - return nil + // do this here instead of GetNextPacket to keep higher level + // packet parsing out of L4 parsing + err = p.pkt.ParseDNS(fl, meta, packet) + if err != nil { + p.l.Error("failed to parse DNS", zap.Error(err)) + continue } + utils.AddRetinaMetadata(fl, meta) + ev := &v1.Event{ Event: fl, Timestamp: fl.GetTime(), @@ -264,13 +130,6 @@ func New(_ *kcfg.Config) api.Plugin { } func (p *Plugin) Stop() error { - if p.pktmonCmd != nil { - err := p.pktmonCmd.Process.Kill() - if err != nil { - return errors.Wrapf(err, "failed to kill pktmon server during stop") - } - } - return nil } diff --git a/pkg/plugin/windows/pktmon/stream/stream.go b/pkg/plugin/windows/pktmon/stream/stream.go new file mode 100644 index 0000000000..32867d5e05 --- /dev/null +++ b/pkg/plugin/windows/pktmon/stream/stream.go @@ -0,0 +1,49 @@ +package stream + +import ( + "context" + + "github.com/cilium/cilium/api/v1/flow" + "github.com/google/gopacket" + "github.com/microsoft/retina/pkg/log" + "github.com/microsoft/retina/pkg/utils" +) + +type WinPktMon struct { + l *log.ZapLogger +} + +func NewWinPktMonStreamer(l *log.ZapLogger, truncationSize, bufferSize, bufferMultiplier int) *WinPktMon { + return &WinPktMon{ + l: l, + } +} + +func (w *WinPktMon) Initialize() error { + return nil +} + +func (w *WinPktMon) GetNextPacket(ctx context.Context) (*flow.Flow, *utils.RetinaMetadata, gopacket.Packet, error) { + w.l.Info("pktmon plugin not implemented") + <-ctx.Done() + return nil, nil, nil, ctx.Err() +} + +func (w *WinPktMon) ParseDNS(fl *flow.Flow, metadata *utils.RetinaMetadata, packet gopacket.Packet) error { + return nil +} + +func (w *WinPktMon) IncMissedWrite(missed int) { +} + +func (w *WinPktMon) IncMissedRead(missed int) { +} + +func (w *WinPktMon) PrintAndResetMissedWrite(sessionID string) { +} + +func (w *WinPktMon) PrintAndResetMissedRead(sessionID string) { +} + +func AddTcpFlagsBool(f *flow.Flow, syn, ack, fin, rst, psh, urg bool) { +}