Skip to content

Commit

Permalink
Fixes and improvements in handling new event type in client code. (#399)
Browse files Browse the repository at this point in the history
- .Net client will ignore any uknown events.
- go client watch context now handles utilisation events correctly
- go client will not wait after receinig uknown events and will process next nessage imediately 
- armadactl prints utilisation correctly
  • Loading branch information
jankaspar authored Jul 1, 2020
1 parent 072ba00 commit 687c66e
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 27 deletions.
34 changes: 28 additions & 6 deletions client/DotNet/Armada.Client.Test/Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Threading.Tasks;
using NUnit.Framework;
using GResearch.Armada.Client;
using RichardSzalay.MockHttp;

namespace GResearch.Armada.Client.Test
{
Expand All @@ -18,26 +19,27 @@ public class Tests
public async Task TestWatchingEvents()
{
var client = new ArmadaClient("http://localhost:8080", new HttpClient());

var queue = "test";
var jobSet = $"set-{Guid.NewGuid()}";

// produce some events
await client.CreateQueueAsync(queue, new ApiQueue {PriorityFactor = 200});
var request = CreateJobRequest(jobSet);
var response = await client.SubmitJobsAsync(request);
var cancelResponse = await client.CancelJobsAsync(new ApiJobCancelRequest {Queue = "test", JobSetId = jobSet});

var cancelResponse =
await client.CancelJobsAsync(new ApiJobCancelRequest {Queue = "test", JobSetId = jobSet});

using (var cts = new CancellationTokenSource())
{
var eventCount = 0;
Task.Run(() => client.WatchEvents(queue, jobSet, null, cts.Token, m => eventCount++, e => throw e));
Task.Run(() => client.WatchEvents(queue, jobSet, null, cts.Token, m => eventCount++, e => throw e));
await Task.Delay(TimeSpan.FromMinutes(2));
cts.Cancel();
Assert.That(eventCount, Is.EqualTo(4));
}
}

[Test]
public async Task TestSimpleJobSubmitFlow()
{
Expand All @@ -59,6 +61,26 @@ public async Task TestSimpleJobSubmitFlow()
Assert.That(allEvents[0].Result.Message.Submitted, Is.Not.Null);
}

[Test]
public async Task TestProcessingUnknownEvents()
{
var mockHttp = new MockHttpMessageHandler();
mockHttp.When("http://localhost:8080/*")
.Respond("application/json",
@"{""result"":{""Id"":""1593611590122-0"",""message"":{""Queued"":{""JobId"":""01ec5ae6f9wvya6cr6stzwty7v"",""JobSetId"":""set-bae48cc8-9f70-465f-ae5c-c92713b5f24f"",""Queue"":""test"",""Created"":""2020-07-01T13:53:10.122263955Z""}}}}
{""result"":{""Id"":""1593611590122-0"",""message"":{""UnknownEvent"":""test""}}}
{""error"": ""test error""}
{}
{""a"":""b""}");

IArmadaClient client = new ArmadaClient("http://localhost:8080", new HttpClient(mockHttp));
var events = (await client.GetJobEventsStream("queue", "jobSet", watch: false)).ToList();
Assert.That(events.Count(), Is.EqualTo(2));
Assert.That(events[0].Result.Message.Event, Is.Not.Null);
Assert.That(events[1].Error, Is.EqualTo("test error"));
}

private static ApiJobSubmitRequest CreateJobRequest(string jobSet)
{
var pod = new V1PodSpec
Expand Down
51 changes: 40 additions & 11 deletions client/DotNet/Armada.Client/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ public partial class ApiEventMessage
{
public IEvent Event => Cancelled ?? Submitted ?? Queued ?? Leased ?? LeaseReturned ??
LeaseExpired ?? Pending ?? Running ?? UnableToSchedule ??
Failed ?? Succeeded ?? Reprioritized ?? Cancelling ?? Cancelled ?? Terminated as IEvent;
Failed ?? Succeeded ?? Reprioritized ?? Cancelling ?? Cancelled ?? Terminated ??
Utilisation as IEvent;
}

public partial class ApiJobSubmittedEvent : IEvent {}
Expand Down Expand Up @@ -76,10 +77,12 @@ private IEnumerable<StreamResponse<ApiEventStreamMessage>> ReadEventStream(Strea
while (!reader.EndOfStream)
{
var line = reader.ReadLine();
var eventMessage =
JsonConvert.DeserializeObject<StreamResponse<ApiEventStreamMessage>>(line,
this.JsonSerializerSettings);
yield return eventMessage;

var (_, eventMessage) = ProcessEventLine(null, line);
if (eventMessage != null)
{
yield return eventMessage;
}
}
}
}
Expand Down Expand Up @@ -107,12 +110,12 @@ public async Task WatchEvents(
while (!ct.IsCancellationRequested && !reader.EndOfStream)
{
var line = await reader.ReadLineAsync();
var eventMessage =
JsonConvert.DeserializeObject<StreamResponse<ApiEventStreamMessage>>(line,
this.JsonSerializerSettings);

onMessage(eventMessage);
fromMessageId = eventMessage.Result?.Id ?? fromMessageId;
var (newMessageId, eventMessage) = ProcessEventLine(fromMessageId, line);
fromMessageId = newMessageId;
if (eventMessage != null)
{
onMessage(eventMessage);
}
}
}
catch (IOException)
Expand All @@ -134,5 +137,31 @@ public async Task WatchEvents(
}
}
}

private (string, StreamResponse<ApiEventStreamMessage>) ProcessEventLine(string fromMessageId, string line)
{
try
{
var eventMessage =
JsonConvert.DeserializeObject<StreamResponse<ApiEventStreamMessage>>(line,
this.JsonSerializerSettings);

fromMessageId = eventMessage?.Result?.Id ?? fromMessageId;

// Ignore unknown event types
if (String.IsNullOrEmpty(eventMessage?.Error) &&
eventMessage?.Result?.Message?.Event == null)
{
eventMessage = null;
}
return (fromMessageId, eventMessage);
}
catch(Exception)
{
// Ignore messages which can't be deserialized
}

return (fromMessageId, null);
}
}
}
2 changes: 1 addition & 1 deletion cmd/armadactl/cmd/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ var resourcesCmd = &cobra.Command{
state := client.GetJobSetState(eventsClient, queue, jobSetId, context.Background())

for _, j := range state.GetCurrentState() {
log.Info("job id: %v, maximum used resources: %v", j.Job.Id, j.MaxUsedResources)
log.Infof("job id: %v, maximum used resources: %v", j.Job.Id, j.MaxUsedResources)
}
})
},
Expand Down
16 changes: 9 additions & 7 deletions internal/executor/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,6 @@ func StartUpWithContext(config configuration.ExecutorConfiguration, clusterConte
jobLeaseService,
clusterUtilisationService)

podUtilisationReporter := service.NewUtilisationEventReporter(
clusterContext,
queueUtilisationService,
eventReporter,
config.Task.UtilisationEventReportingInterval)

contextMetrics := pod_metrics.NewClusterContextMetrics(clusterContext, clusterUtilisationService, queueUtilisationService)

taskManager.Register(clusterUtilisationService.ReportClusterUtilisation, config.Task.UtilisationReportingInterval, "utilisation_reporting")
Expand All @@ -116,7 +110,15 @@ func StartUpWithContext(config configuration.ExecutorConfiguration, clusterConte

if config.Metric.ExposeQueueUsageMetrics {
taskManager.Register(queueUtilisationService.RefreshUtilisationData, config.Task.QueueUsageDataRefreshInterval, "pod_usage_data_refresh")
taskManager.Register(podUtilisationReporter.ReportUtilisationEvents, config.Task.UtilisationEventProcessingInterval, "pod_utilisation_event_reporting")

if config.Task.UtilisationEventReportingInterval > 0 {
podUtilisationReporter := service.NewUtilisationEventReporter(
clusterContext,
queueUtilisationService,
eventReporter,
config.Task.UtilisationEventReportingInterval)
taskManager.Register(podUtilisationReporter.ReportUtilisationEvents, config.Task.UtilisationEventProcessingInterval, "pod_utilisation_event_reporting")
}
}

return func() {
Expand Down
4 changes: 3 additions & 1 deletion pkg/client/domain/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ func NewWatchContext() *WatchContext {
func (context *WatchContext) ProcessEvent(event api.Event) {
info, exists := context.state[event.GetJobId()]
if !exists {
info = &JobInfo{}
info = &JobInfo{
MaxUsedResources: common.ComputeResources{},
}
context.state[event.GetJobId()] = info
}

Expand Down
18 changes: 18 additions & 0 deletions pkg/client/domain/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (

"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"

"github.com/G-Research/armada/internal/common"
"github.com/G-Research/armada/pkg/api"
)

Expand Down Expand Up @@ -191,3 +193,19 @@ func TestWatchContext_EventsOutOfOrder(t *testing.T) {
assert.Equal(t, &JobInfo{Status: Succeeded, Job: &job, LastUpdate: now}, watchContext.GetJobInfo("1"))
assert.Equal(t, map[JobStatus]int{Succeeded: 1}, watchContext.stateSummary)
}

func TestWatchContext_UtilisationEvent(t *testing.T) {
watchContext := NewWatchContext()
watchContext.ProcessEvent(&api.JobUtilisationEvent{
JobId: "job1",
JobSetId: "",
Queue: "",
Created: time.Now(),
ClusterId: "",
KubernetesId: "",
MaxResourcesForPeriod: common.ComputeResources{
"cpu": resource.MustParse("1"),
},
})
assert.Equal(t, resource.MustParse("1"), watchContext.GetJobInfo("job1").MaxUsedResources["cpu"])
}
2 changes: 1 addition & 1 deletion pkg/client/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ func WatchJobSetWithJobIdsFilter(client api.EventClient, queue, jobSetId string,

event, e := api.UnwrapEvent(msg.Message)
if e != nil {
// This can mean that the event type reported from server is unknown to the client
log.Error(e)
time.Sleep(5 * time.Second)
continue
}

Expand Down

0 comments on commit 687c66e

Please sign in to comment.