Skip to content

Commit

Permalink
chore: add experimental code useful for richer input
Browse files Browse the repository at this point in the history
  • Loading branch information
bassosimone committed Oct 19, 2023
1 parent 5c93d4c commit ae590be
Show file tree
Hide file tree
Showing 3 changed files with 304 additions and 0 deletions.
72 changes: 72 additions & 0 deletions internal/cmd/xo/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package main

import (
"context"
"os"

"github.com/apex/log"
"github.com/ooni/probe-cli/v3/internal/engine"
"github.com/ooni/probe-cli/v3/internal/enginerun"
"github.com/ooni/probe-cli/v3/internal/hujsonx"
"github.com/ooni/probe-cli/v3/internal/kvstore"
"github.com/ooni/probe-cli/v3/internal/model"
"github.com/ooni/probe-cli/v3/internal/runtimex"
"github.com/ooni/probe-cli/v3/internal/version"
)

func main() {
rawNettest := runtimex.Try1(os.ReadFile(os.Args[1]))
var nt enginerun.Nettest
runtimex.Try0(hujsonx.Unmarshal(rawNettest, &nt))

ctx := context.Background()

//log.SetLevel(log.DebugLevel)

config := engine.SessionConfig{
AvailableProbeServices: []model.OOAPIService{},
KVStore: &kvstore.Memory{},
Logger: log.Log,
ProxyURL: nil,
SoftwareName: "miniooni",
SoftwareVersion: version.Version,
TempDir: "/tmp",
TorArgs: []string{},
TorBinary: "",
SnowflakeRendezvous: "",
TunnelDir: "xo_tunnel_dir",
}
sess := runtimex.Try1(engine.NewSession(ctx, config))

// Note: we need to lookup backends and test helpers in this case
// because otherwise we cannot run web_connectivity
//
// XXX: ideally this would also call the check-in API
runtimex.Try0(sess.MaybeLookupBackends())
runtimex.Try0(sess.MaybeLookupLocation())

// while this API may be a bit weird, we have basically reimplemented miniooni in 50 LoC
submitter := runtimex.Try1(engine.NewSubmitter(ctx, engine.SubmitterConfig{
Enabled: true,
Session: sess,
Logger: log.Log,
}))

// run the nettest in a background goroutine and handle the generated events
events := runtimex.Try1(enginerun.Start(ctx, sess, &nt))
for {
select {
case <-events.Done():
return

case dataUsage := <-events.DataUsage():
log.Infof("data usage: %+v", dataUsage)

case runError := <-events.RunError():
log.Warnf("experiment failed: %s", runError.Err.Error())

case runSuccess := <-events.RunSuccess():
runtimex.Try0(submitter.Submit(ctx, runSuccess.Measurement))
}
}
}
226 changes: 226 additions & 0 deletions internal/enginerun/enginerun.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
// Package enginerun implements running a single nettest.
package enginerun

import (
"context"
"sync"

"github.com/ooni/probe-cli/v3/internal/model"
)

// Nettest describes a nettest to run. Make sure you fill MANDATORY fields.
type Nettest struct {
// Inputs contains MANDATORY inputs for the nettest. If the nettest does not take any input,
// you MUST fill this value using a single entry containing an empty string.
Inputs []string `json:"inputs"`

// Options contains the nettest options. Any option name starting with
// `Safe` will be available for the nettest run, but omitted from
// the serialized Measurement when we submit it to the OONI backend.
Options map[string]any `json:"options"`

// TestName contains the MANDATORY nettest name.
TestName string `json:"test_name"`
}

// Session is the measurement session.
//
// The engine.Session type implements this interface.
type Session interface {
// Logger returns the logger to use.
Logger() model.Logger

// NewExperimentBuilder creates a new model.ExperimentBuilder.
NewExperimentBuilder(name string) (model.ExperimentBuilder, error)
}

// config contains configuration for [Run].
type config struct {
// parallelism defines the number of goroutines that
// should run in parallel and measure.
parallelism int
}

// Option is an option for [Run].
type Option func(cfg *config)

// OptionParallelism configures the number of parallel goroutines
// that should perform concurrent measurements.
//
// Setting a value <= 1 is equivalent to setting 1 as the value.
//
// The default value of this option is 1.
func OptionParallelism(value int) Option {
return func(cfg *config) {
switch {
case value > 1:
cfg.parallelism = value
default:
cfg.parallelism = 1
}
}
}

// RunError is the event emitted when we cannot run a nettest.
type RunError struct {
// Err is the error that occurred.
Err error

// Index is the input index.
Index int

// Input is the input value.
Input string
}

// RunSuccess is the event emitted after we successfully ran a nettest.
type RunSuccess struct {
// Index is the input index.
Index int

// Input is the input value.
Input string

// Measurement is the measurement.
Measurement *model.Measurement
}

// DataUsage contains information about the data consumed by running a nettest.
type DataUsage struct {
KibiBytesReceived float64
KibiBytesSent float64
}

// Events allows to access the channels where the goroutines created by [Start] emit events.
type Events struct {
dataUsage chan *DataUsage
done chan any
runError chan *RunError
runSuccess chan *RunSuccess
}

// DataUsage returns the channel where we return overall data usage information.
func (ev *Events) DataUsage() <-chan *DataUsage {
return ev.dataUsage
}

// Done returns the channel closed when done measuring.
func (ev *Events) Done() <-chan any {
return ev.done
}

// RunError returns the channel where we post cases where a measurement failed.
func (ev *Events) RunError() <-chan *RunError {
return ev.runError
}

// RunSuccess returns the channel where we post successful measurements.
func (ev *Events) RunSuccess() <-chan *RunSuccess {
return ev.runSuccess
}

// inputIdx contains input and its index.
type inputIdx struct {
idx int
input string
}

// Start starts running the given [Nettest] using the given options using background
// goroutines. This function returns an error if it cannot create the nettest. If the
// error is nil, the returned struct contains channels where we emit events.
func Start(ctx context.Context, sess Session, nt *Nettest, options ...Option) (*Events, error) {
// 1. create experiment builder
builder, err := sess.NewExperimentBuilder(nt.TestName)
if err != nil {
return nil, err
}

// 2. configure experiment options
if err := builder.SetOptionsAny(nt.Options); err != nil {
return nil, err
}

// 3. construct the experiment instance
experiment := builder.NewExperiment()

// 4. create a generator that produces input
inputs := produce(nt)

// 5. initialize the options
cfg := &config{
parallelism: 1,
}
for _, opt := range options {
opt(cfg)
}

// 6. create the output structure
events := &Events{
dataUsage: make(chan *DataUsage),
done: make(chan any),
runError: make(chan *RunError),
runSuccess: make(chan *RunSuccess),
}

// 7. start the required number of runners
wg := &sync.WaitGroup{}
for idx := 0; idx <= cfg.parallelism; idx++ {
wg.Add(1)
go func() {
defer wg.Done()
consume(ctx, experiment, inputs, events)
}()
}

// 8. make sure we close events when done
go func() {
defer close(events.done)
wg.Wait()
events.dataUsage <- &DataUsage{
KibiBytesReceived: experiment.KibiBytesReceived(),
KibiBytesSent: experiment.KibiBytesSent(),
}
}()

// 9. return to the caller
return events, nil
}

// produce generates a stream of the inputs along with their index.
func produce(nt *Nettest) <-chan *inputIdx {
inputs := make(chan *inputIdx)
go func() {
defer close(inputs)
for idx, input := range nt.Inputs {
inputs <- &inputIdx{
idx: idx,
input: input,
}
}
}()
return inputs
}

// consume transforms inputs into events.
func consume(ctx context.Context, experiment model.Experiment, inputs <-chan *inputIdx, events *Events) {
for input := range inputs {
// TODO(bassosimone): are experiments safe to run concurrently? Maybe
// we should double check this optimistic assumption!
meas, err := experiment.MeasureWithContext(ctx, input.input)

if err != nil {
events.runError <- &RunError{
Err: err,
Index: input.idx,
Input: input.input,
}
continue
}

events.runSuccess <- &RunSuccess{
Index: input.idx,
Input: input.input,
Measurement: meas,
}
}
}
6 changes: 6 additions & 0 deletions xoinput.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"inputs": [
"https://www.example.com/"
],
"test_name": "web_connectivity"
}

0 comments on commit ae590be

Please sign in to comment.