diff --git a/Makefile b/Makefile index c86cfbc..f6c3424 100644 --- a/Makefile +++ b/Makefile @@ -43,3 +43,6 @@ clean: rm -f samples/processInfo/processInfo rm -f samples/restApi/restApi rm -f samples/topology/topology + +lint: + golangci-lint run ./... \ No newline at end of file diff --git a/go.mod b/go.mod index c265656..6304f90 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,5 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/stretchr/objx v0.5.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 76f0f6c..6c28a99 100644 --- a/go.sum +++ b/go.sum @@ -1,28 +1,16 @@ github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww= github.com/Masterminds/semver v1.5.0/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF078ddwwvV3Y= -github.com/bits-and-blooms/bitset v1.2.1 h1:M+/hrU9xlMp7t4TyTDQW97d3tRPVuKFC6zBEK16QnXY= -github.com/bits-and-blooms/bitset v1.2.1/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA= github.com/bits-and-blooms/bitset v1.13.0 h1:bAQ9OPNFYbGHV6Nez0tmNI0RiEu7/hxlYJRUA0wFAVE= github.com/bits-and-blooms/bitset v1.13.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= -github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= -github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/dcgm/bcast.go b/pkg/dcgm/bcast.go deleted file mode 100644 index ced0a63..0000000 --- a/pkg/dcgm/bcast.go +++ /dev/null @@ -1,87 +0,0 @@ -package dcgm - -import ( - "fmt" - "sync" -) - -type publisher struct { - publish chan interface{} - close chan bool - subscribers []*subscriber - subscriberLock sync.Mutex -} - -type subscriber struct { - read chan interface{} - close chan bool -} - -func newPublisher() *publisher { - pub := &publisher{ - publish: make(chan interface{}), - close: make(chan bool), - } - return pub -} - -func (p *publisher) subscriberList() []*subscriber { - p.subscriberLock.Lock() - defer p.subscriberLock.Unlock() - return p.subscribers[:] -} - -func (p *publisher) add() *subscriber { - p.subscriberLock.Lock() - defer p.subscriberLock.Unlock() - newSub := &subscriber{ - read: make(chan interface{}), - close: make(chan bool), - } - p.subscribers = append(p.subscribers, newSub) - return newSub -} - -func (p *publisher) remove(leaving *subscriber) error { - p.subscriberLock.Lock() - defer p.subscriberLock.Unlock() - subscriberIndex := -1 - for i, sub := range p.subscribers { - if sub == leaving { - subscriberIndex = i - break - } - } - if subscriberIndex == -1 { - return fmt.Errorf("Could not find subscriber") - } - go func() { leaving.close <- true }() - p.subscribers = append(p.subscribers[:subscriberIndex], p.subscribers[subscriberIndex+1:]...) - return nil -} - -func (p *publisher) send(val interface{}) { - p.publish <- val -} - -func (p *publisher) broadcast() { - for { - select { - case publishing := <-p.publish: - for _, sub := range p.subscriberList() { - go func(s *subscriber, val interface{}) { - s.read <- val - }(sub, publishing) - } - case <-p.close: - return - } - } -} - -func (p *publisher) closePublisher() { - for _, s := range p.subscriberList() { - p.remove(s) - } - p.close <- true -} diff --git a/pkg/dcgm/field_values.go b/pkg/dcgm/field_values.go new file mode 100644 index 0000000..a116fcc --- /dev/null +++ b/pkg/dcgm/field_values.go @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dcgm + +/* +#include "dcgm_agent.h" +#include "dcgm_structs.h" +#include "field_values_cb.h" +extern int go_dcgmFieldValueEntityEnumeration(dcgm_field_entity_group_t entityGroupId, + dcgm_field_eid_t entityId, + dcgmFieldValue_v1 *values, + int numValues, + void *userData); +*/ +import "C" +import ( + "fmt" + "sync" + "time" + "unsafe" +) + +type callback struct { + mu sync.Mutex + Values []FieldValue_v2 +} + +func (cb *callback) processValues(entityGroup Field_Entity_Group, entityID uint, cvalues []C.dcgmFieldValue_v1) { + values := dcgmFieldValue_v1ToFieldValue_v2(entityGroup, entityID, cvalues) + cb.mu.Lock() + defer cb.mu.Unlock() + cb.Values = append(cb.Values, values...) +} + +//export go_dcgmFieldValueEntityEnumeration +func go_dcgmFieldValueEntityEnumeration( + entityGroup C.dcgm_field_entity_group_t, + entityID C.dcgm_field_eid_t, + values *C.dcgmFieldValue_v1, + numValues C.int, + userData unsafe.Pointer) C.int { + ptrValues := unsafe.Pointer(values) + if ptrValues != nil { + valuesSlice := (*[1 << 30]C.dcgmFieldValue_v1)(ptrValues)[0:numValues] + if userData != nil { + processor := (*callback)(userData) + processor.processValues(Field_Entity_Group(entityGroup), uint(entityID), valuesSlice) + } + } + return 0 +} + +// GetValuesSince reads and returns field values for a specified group of entities, such as GPUs, +// that have been updated since a given timestamp. It allows for targeted data retrieval based on time criteria. +// +// GPUGroup is a GroupHandle that identifies the group of entities to operate on. It can be obtained from CreateGroup +// for a specific group of GPUs or use GroupAllGPUs() to target all GPUs. +// +// fieldGroup is a FieldHandle representing the group of fields for which data is requested. +// +// sinceTime is a time.Time value representing the timestamp from which to request updated values. +// A zero value (time.Time{}) requests all available data. +// +// Returns []FieldValue_v2 slice containing the requested field values, a time.Time indicating the time +// of the latest data retrieval, and an error if there is any issue during the operation. +func GetValuesSince(GPUGroup GroupHandle, fieldGroup FieldHandle, sinceTime time.Time) ([]FieldValue_v2, time.Time, error) { + var ( + nextSinceTimestamp C.longlong + ) + + cbResult := &callback{} + + result := C.dcgmGetValuesSince_v2(handle.handle, + GPUGroup.handle, + C.dcgmFieldGrp_t(fieldGroup.handle), + C.longlong(sinceTime.UnixMicro()), + &nextSinceTimestamp, + (C.dcgmFieldValueEnumeration_f)(unsafe.Pointer(C.fieldValueEntityCallback)), + unsafe.Pointer(cbResult)) + if result != C.DCGM_ST_OK { + return nil, time.Time{}, fmt.Errorf("dcgmGetValuesSince_v2 failed with error code %d", int(result)) + } + + return cbResult.Values, timestampUSECToTime(int64(nextSinceTimestamp)), nil +} + +func timestampUSECToTime(timestampUSEC int64) time.Time { + // Convert microseconds to seconds and nanoseconds + sec := timestampUSEC / 1000000 // Convert microseconds to seconds + nsec := (timestampUSEC % 1000000) * 1000 // Convert the remaining microseconds to nanoseconds + // Use time.Unix to get a time.Time object + return time.Unix(sec, nsec) +} diff --git a/pkg/dcgm/field_values_cb.c b/pkg/dcgm/field_values_cb.c new file mode 100644 index 0000000..e5d0b19 --- /dev/null +++ b/pkg/dcgm/field_values_cb.c @@ -0,0 +1,11 @@ +#include "dcgm_agent.h" +#include "dcgm_structs.h" +#include "_cgo_export.h" + +int fieldValueEntityCallback(dcgm_field_entity_group_t entityGroupId, + dcgm_field_eid_t entityId, + dcgmFieldValue_v1 *values, + int numValues, + void *userData) { + return go_dcgmFieldValueEntityEnumeration(entityGroupId, entityId, values, numValues, userData); +} diff --git a/pkg/dcgm/field_values_cb.h b/pkg/dcgm/field_values_cb.h new file mode 100644 index 0000000..9cf02ea --- /dev/null +++ b/pkg/dcgm/field_values_cb.h @@ -0,0 +1,13 @@ +#ifndef FIELD_VALUES +#define FIELD_VALUES + +#include "dcgm_agent.h" +#include "dcgm_structs.h" + +int fieldValueEntityCallback(dcgm_field_entity_group_t entityGroupId, + dcgm_field_eid_t entityId, + dcgmFieldValue_v1 *values, + int numValues, + void *userData); + +#endif \ No newline at end of file diff --git a/pkg/dcgm/field_values_test.go b/pkg/dcgm/field_values_test.go new file mode 100644 index 0000000..2d54a51 --- /dev/null +++ b/pkg/dcgm/field_values_test.go @@ -0,0 +1,101 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dcgm + +import ( + "fmt" + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestGetValuesSince(t *testing.T) { + teardownTest := setupTest(t) + defer teardownTest(t) + runOnlyWithLiveGPUs(t) + + const gpu uint = 0 + + // Create a group of fields + const ( + xid int = iota + ) + + deviceFields := make([]Short, 1) + deviceFields[xid] = DCGM_FI_DEV_XID_ERRORS + + fieldGroupName := fmt.Sprintf("fieldGroupName%d", rand.Uint64()) + fieldsGroup, err := FieldGroupCreate(fieldGroupName, deviceFields) + assert.NoError(t, err) + defer func() { + _ = FieldGroupDestroy(fieldsGroup) + }() + + t.Run("When there is no data return error", func(t *testing.T) { + values, nextTime, err := GetValuesSince(GroupAllGPUs(), + fieldsGroup, time.Time{}) + require.Error(t, err) + assert.Empty(t, nextTime) + assert.Len(t, values, 0) + }) + + t.Run("When there are a few entries", func(t *testing.T) { + expectedNumberOfErrors := int64(43) + expectedInjectedValuesCount := 0 + t.Logf("injecting %s for gpuId %d", "DCGM_FI_DEV_XID_ERRORS", gpu) + err = InjectFieldValue(gpu, + DCGM_FI_DEV_XID_ERRORS, + DCGM_FT_INT64, + 0, + time.Now().Add(-time.Duration(5)*time.Second).UnixMicro(), + expectedNumberOfErrors, + ) + require.NoError(t, err) + expectedInjectedValuesCount++ + for i := 4; i > 0; i-- { + err = InjectFieldValue(gpu, + DCGM_FI_DEV_XID_ERRORS, + DCGM_FT_INT64, + 0, + time.Now().Add(-time.Duration(i)*time.Second).UnixMicro(), + int64(i), + ) + require.NoError(t, err) + expectedInjectedValuesCount++ + } + // Force an update of the fields so that we can fetch initial values. + err = UpdateAllFields() + assert.NoError(t, err) + values, nextTime, err := GetValuesSince(GroupAllGPUs(), fieldsGroup, time.Time{}) + assert.NoError(t, err) + assert.Greater(t, nextTime, time.Time{}) + assert.Len(t, values, expectedInjectedValuesCount) + assert.Equal(t, FE_GPU, values[0].EntityGroupId) + assert.Equal(t, gpu, values[0].EntityId) + assert.Equal(t, uint(DCGM_FI_DEV_XID_ERRORS), values[0].FieldId) + assert.Equal(t, expectedNumberOfErrors, values[0].Int64()) + for i := 1; i < 5; i++ { + assert.Equal(t, FE_GPU, values[i].EntityGroupId) + assert.Equal(t, gpu, values[i].EntityId) + assert.Equal(t, uint(DCGM_FI_DEV_XID_ERRORS), values[i].FieldId) + assert.Equal(t, int64(5-i), values[i].Int64()) + } + }) +} diff --git a/pkg/dcgm/fields.go b/pkg/dcgm/fields.go index 5f809f9..d967502 100644 --- a/pkg/dcgm/fields.go +++ b/pkg/dcgm/fields.go @@ -215,10 +215,55 @@ func toFieldValue_v2(cfields []C.dcgmFieldValue_v2) []FieldValue_v2 { return fields } +func dcgmFieldValue_v1ToFieldValue_v2(fieldEntityGroup Field_Entity_Group, entityId uint, cfields []C.dcgmFieldValue_v1) []FieldValue_v2 { + fields := make([]FieldValue_v2, len(cfields)) + for i, f := range cfields { + fields[i] = FieldValue_v2{ + Version: C.dcgmFieldValue_version2, + EntityGroupId: fieldEntityGroup, + EntityId: entityId, + FieldId: uint(f.fieldId), + FieldType: uint(f.fieldType), + Status: int(f.status), + Ts: int64(f.ts), + Value: f.value, + StringValue: nil, + } + + if uint(f.fieldType) == DCGM_FT_STRING { + fields[i].StringValue = stringPtr((*C.char)(unsafe.Pointer(&f.value[0]))) + } + } + + return fields +} + +func (fv FieldValue_v2) Int64() int64 { + return *(*int64)(unsafe.Pointer(&fv.Value[0])) +} + +func (fv FieldValue_v2) Float64() float64 { + return *(*float64)(unsafe.Pointer(&fv.Value[0])) +} + +func (fv FieldValue_v2) String() string { + return C.GoString((*C.char)(unsafe.Pointer(&fv.Value[0]))) +} + +func (fv FieldValue_v2) Blob() [4096]byte { + return fv.Value +} + +// Deprecated: Fv2_Int64 exists for backward compatibility +// and should not be used. To access the int64 returned by a FieldValue_v2, +// use the FieldValue_v2.Int64 method. func Fv2_Int64(fv FieldValue_v2) int64 { return *(*int64)(unsafe.Pointer(&fv.Value[0])) } +// Deprecated: Fv2_Float64 exists for backward compatibility +// and should not be used. To access the int64 returned by a FieldValue_v2, +// use the FieldValue_v2.Float64 method. func Fv2_Float64(fv FieldValue_v2) float64 { return *(*float64)(unsafe.Pointer(&fv.Value[0])) } diff --git a/pkg/dcgm/test_utils.go b/pkg/dcgm/test_utils.go new file mode 100644 index 0000000..ae46bd2 --- /dev/null +++ b/pkg/dcgm/test_utils.go @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dcgm + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func setupTest(t *testing.T) func(t *testing.T) { + cleanup, err := Init(Embedded) + assert.NoError(t, err) + + return func(t *testing.T) { + defer cleanup() + } +} + +func runOnlyWithLiveGPUs(t *testing.T) { + t.Helper() + gpus, err := getSupportedDevices() + assert.NoError(t, err) + if len(gpus) < 1 { + t.Skip("Skipping test that requires live GPUs. None were found") + } +}