Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Store tokens in local storage paths" #29294

Merged
merged 1 commit into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 14 additions & 16 deletions vault/activity_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,20 @@ import (
const (
// activitySubPath is the directory under the system view where
// the log will be stored.
activitySubPath = "counters/activity/"
activityEntityBasePath = "log/entity/"
activityTokenBasePath = "log/directtokens/"
activityTokenLocalBasePath = "local/" + activityTokenBasePath
activityQueryBasePath = "queries/"
activityConfigKey = "config"
activityIntentLogKey = "endofmonth"
activitySubPath = "counters/activity/"
activityEntityBasePath = "log/entity/"
activityTokenBasePath = "log/directtokens/"
activityQueryBasePath = "queries/"
activityConfigKey = "config"
activityIntentLogKey = "endofmonth"

activityACMERegenerationKey = "acme-regeneration"
// sketch for each month that stores hash of client ids
distinctClientsBasePath = "log/distinctclients/"

// for testing purposes (public as needed)
ActivityLogPrefix = "sys/counters/activity/log/"
ActivityLogLocalPrefix = "sys/counters/activity/local/log/"
ActivityPrefix = "sys/counters/activity/"
ActivityLogPrefix = "sys/counters/activity/log/"
ActivityPrefix = "sys/counters/activity/"

// Time to wait on perf standby before sending fragment
activityFragmentStandbyTime = 10 * time.Minute
Expand Down Expand Up @@ -506,7 +504,7 @@ func (a *ActivityLog) saveSegmentTokensInternal(ctx context.Context, currentSegm
return "", nil
}
// RFC (VLT-120) defines this as 1-indexed, but it should be 0-indexed
tokenPath := fmt.Sprintf("%s%d/0", activityTokenLocalBasePath, currentSegment.startTimestamp)
tokenPath := fmt.Sprintf("%s%d/0", activityTokenBasePath, currentSegment.startTimestamp)
// We must still allow for the tokenCount of the current segment to
// be written to storage, since if we remove this code we will incur
// data loss for one segment's worth of TWEs.
Expand Down Expand Up @@ -588,7 +586,7 @@ func parseSegmentNumberFromPath(path string) (int, bool) {
// sorted last to first
func (a *ActivityLog) availableLogs(ctx context.Context, upTo time.Time) ([]time.Time, error) {
paths := make([]string, 0)
for _, basePath := range []string{activityEntityBasePath, activityTokenLocalBasePath} {
for _, basePath := range []string{activityEntityBasePath, activityTokenBasePath} {
p, err := a.view.List(ctx, basePath)
if err != nil {
return nil, err
Expand Down Expand Up @@ -696,7 +694,7 @@ func (a *ActivityLog) WalkTokenSegments(ctx context.Context,
startTime time.Time,
walkFn func(*activity.TokenCount),
) error {
basePath := activityTokenLocalBasePath + fmt.Sprint(startTime.Unix()) + "/"
basePath := activityTokenBasePath + fmt.Sprint(startTime.Unix()) + "/"
pathList, err := a.view.List(ctx, basePath)
if err != nil {
return err
Expand Down Expand Up @@ -797,7 +795,7 @@ func (a *ActivityLog) loadCurrentClientSegment(ctx context.Context, startTime ti
// tokenCountExists checks if there's a token log for :startTime:
// this function should be called with the lock held
func (a *ActivityLog) tokenCountExists(ctx context.Context, startTime time.Time) (bool, error) {
p, err := a.view.List(ctx, activityTokenLocalBasePath+fmt.Sprint(startTime.Unix())+"/")
p, err := a.view.List(ctx, activityTokenBasePath+fmt.Sprint(startTime.Unix())+"/")
if err != nil {
return false, err
}
Expand All @@ -822,7 +820,7 @@ func (a *ActivityLog) loadTokenCount(ctx context.Context, startTime time.Time) e
return nil
}

path := activityTokenLocalBasePath + fmt.Sprint(startTime.Unix()) + "/0"
path := activityTokenBasePath + fmt.Sprint(startTime.Unix()) + "/0"
data, err := a.view.Get(ctx, path)
if err != nil {
return err
Expand Down Expand Up @@ -918,7 +916,7 @@ func (a *ActivityLog) resetCurrentLog() {

func (a *ActivityLog) deleteLogWorker(ctx context.Context, startTimestamp int64, whenDone chan struct{}) {
entityPath := fmt.Sprintf("%v%v/", activityEntityBasePath, startTimestamp)
tokenPath := fmt.Sprintf("%v%v/", activityTokenLocalBasePath, startTimestamp)
tokenPath := fmt.Sprintf("%v%v/", activityTokenBasePath, startTimestamp)

entitySegments, err := a.view.List(ctx, entityPath)
if err != nil {
Expand Down
39 changes: 14 additions & 25 deletions vault/activity_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func TestActivityLog_SaveTokensToStorage(t *testing.T) {
a.SetStartTimestamp(time.Now().Unix()) // set a nonzero segment

nsIDs := [...]string{"ns1_id", "ns2_id", "ns3_id"}
path := fmt.Sprintf("%sdirecttokens/%d/0", ActivityLogLocalPrefix, a.GetStartTimestamp())
path := fmt.Sprintf("%sdirecttokens/%d/0", ActivityLogPrefix, a.GetStartTimestamp())

for i := 0; i < 3; i++ {
a.AddTokenToFragment(nsIDs[0])
Expand Down Expand Up @@ -380,7 +380,7 @@ func TestActivityLog_SaveTokensToStorageDoesNotUpdateTokenCount(t *testing.T) {
a.SetStandbyEnable(ctx, true)
a.SetStartTimestamp(time.Now().Unix()) // set a nonzero segment

tokenPath := fmt.Sprintf("%sdirecttokens/%d/0", ActivityLogLocalPrefix, a.GetStartTimestamp())
tokenPath := fmt.Sprintf("%sdirecttokens/%d/0", ActivityLogPrefix, a.GetStartTimestamp())
clientPath := fmt.Sprintf("sys/counters/activity/log/entity/%d/0", a.GetStartTimestamp())
// Create some entries without entityIDs
tokenEntryOne := logical.TokenEntry{NamespaceID: namespace.RootNamespaceID, Policies: []string{"hi"}}
Expand Down Expand Up @@ -637,18 +637,13 @@ func TestActivityLog_availableLogs(t *testing.T) {
// set up a few files in storage
core, _, _ := TestCoreUnsealed(t)
a := core.activityLog
paths := [...]string{"entity/1111/1", "entity/992/3"}
tokenPaths := [...]string{"directtokens/1111/1", "directtokens/1000000/1", "directtokens/992/1"}
paths := [...]string{"entity/1111/1", "directtokens/1111/1", "directtokens/1000000/1", "entity/992/3", "directtokens/992/1"}
expectedTimes := [...]time.Time{time.Unix(1000000, 0), time.Unix(1111, 0), time.Unix(992, 0)}

for _, path := range paths {
WriteToStorage(t, core, ActivityLogPrefix+path, []byte("test"))
}

for _, path := range tokenPaths {
WriteToStorage(t, core, ActivityLogLocalPrefix+path, []byte("test"))
}

// verify above files are there, and dates in correct order
times, err := a.availableLogs(context.Background(), time.Now())
if err != nil {
Expand Down Expand Up @@ -783,7 +778,7 @@ func TestActivityLog_MultipleFragmentsAndSegments(t *testing.T) {
path0 := fmt.Sprintf("sys/counters/activity/log/entity/%d/0", startTimestamp)
path1 := fmt.Sprintf("sys/counters/activity/log/entity/%d/1", startTimestamp)
path2 := fmt.Sprintf("sys/counters/activity/log/entity/%d/2", startTimestamp)
tokenPath := fmt.Sprintf("sys/counters/activity/local/log/directtokens/%d/0", startTimestamp)
tokenPath := fmt.Sprintf("sys/counters/activity/log/directtokens/%d/0", startTimestamp)

genID := func(i int) string {
return fmt.Sprintf("11111111-1111-1111-1111-%012d", i)
Expand Down Expand Up @@ -1145,7 +1140,7 @@ func TestActivityLog_tokenCountExists(t *testing.T) {
a := core.activityLog
paths := [...]string{"directtokens/992/0", "directtokens/1001/foo", "directtokens/1111/0", "directtokens/2222/1"}
for _, path := range paths {
WriteToStorage(t, core, ActivityLogLocalPrefix+path, []byte("test"))
WriteToStorage(t, core, ActivityLogPrefix+path, []byte("test"))
}

testCases := []struct {
Expand Down Expand Up @@ -1512,7 +1507,7 @@ func TestActivityLog_loadTokenCount(t *testing.T) {

ctx := context.Background()
for _, tc := range testCases {
WriteToStorage(t, core, ActivityLogLocalPrefix+tc.path, data)
WriteToStorage(t, core, ActivityLogPrefix+tc.path, data)
}

for _, tc := range testCases {
Expand Down Expand Up @@ -1656,7 +1651,7 @@ func setupActivityRecordsInStorage(t *testing.T, base time.Time, includeEntities
t.Fatalf(err.Error())
}

WriteToStorage(t, core, ActivityLogLocalPrefix+"directtokens/"+fmt.Sprint(base.Unix())+"/0", tokenData)
WriteToStorage(t, core, ActivityLogPrefix+"directtokens/"+fmt.Sprint(base.Unix())+"/0", tokenData)
}

return a, entityRecords, tokenRecords
Expand Down Expand Up @@ -1983,17 +1978,11 @@ func TestActivityLog_DeleteWorker(t *testing.T) {
"entity/1111/2",
"entity/1111/3",
"entity/1112/1",
}
for _, path := range paths {
WriteToStorage(t, core, ActivityLogPrefix+path, []byte("test"))
}

tokenPaths := []string{
"directtokens/1111/1",
"directtokens/1112/1",
}
for _, path := range tokenPaths {
WriteToStorage(t, core, ActivityLogLocalPrefix+path, []byte("test"))
for _, path := range paths {
WriteToStorage(t, core, ActivityLogPrefix+path, []byte("test"))
}

doneCh := make(chan struct{})
Expand All @@ -2009,13 +1998,13 @@ func TestActivityLog_DeleteWorker(t *testing.T) {

// Check segments still present
readSegmentFromStorage(t, core, ActivityLogPrefix+"entity/1112/1")
readSegmentFromStorage(t, core, ActivityLogLocalPrefix+"directtokens/1112/1")
readSegmentFromStorage(t, core, ActivityLogPrefix+"directtokens/1112/1")

// Check other segments not present
expectMissingSegment(t, core, ActivityLogPrefix+"entity/1111/1")
expectMissingSegment(t, core, ActivityLogPrefix+"entity/1111/2")
expectMissingSegment(t, core, ActivityLogPrefix+"entity/1111/3")
expectMissingSegment(t, core, ActivityLogLocalPrefix+"directtokens/1111/1")
expectMissingSegment(t, core, ActivityLogPrefix+"directtokens/1111/1")
}

// checkAPIWarnings ensures there is a warning if switching from enabled -> disabled,
Expand Down Expand Up @@ -2134,7 +2123,7 @@ func TestActivityLog_EnableDisable(t *testing.T) {
path = fmt.Sprintf("%ventity/%v/0", ActivityLogPrefix, seg2)
readSegmentFromStorage(t, core, path)

path = fmt.Sprintf("%vdirecttokens/%v/0", ActivityLogLocalPrefix, seg2)
path = fmt.Sprintf("%vdirecttokens/%v/0", ActivityLogPrefix, seg2)
}
readSegmentFromStorage(t, core, path)
}
Expand Down Expand Up @@ -2382,7 +2371,7 @@ func TestActivityLog_CalculatePrecomputedQueriesWithMixedTWEs(t *testing.T) {
if err != nil {
t.Fatal(err)
}
tokenPath := fmt.Sprintf("%vdirecttokens/%v/%v", ActivityLogLocalPrefix, segment.StartTime, segment.Segment)
tokenPath := fmt.Sprintf("%vdirecttokens/%v/%v", ActivityLogPrefix, segment.StartTime, segment.Segment)
WriteToStorage(t, core, tokenPath, data)
}

Expand Down Expand Up @@ -3705,7 +3694,7 @@ func TestActivityLog_Deletion(t *testing.T) {
paths[i] = append(paths[i], entityPath)
WriteToStorage(t, core, entityPath, []byte("test"))
}
tokenPath := fmt.Sprintf("%vdirecttokens/%v/0", ActivityLogLocalPrefix, start.Unix())
tokenPath := fmt.Sprintf("%vdirecttokens/%v/0", ActivityLogPrefix, start.Unix())
paths[i] = append(paths[i], tokenPath)
WriteToStorage(t, core, tokenPath, []byte("test"))

Expand Down
2 changes: 1 addition & 1 deletion vault/activity_log_util_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ func (a *ActivityLog) NewSegmentFileReader(ctx context.Context, startTime time.T
if err != nil {
return nil, err
}
tokens, err := a.newSingleTypeSegmentReader(ctx, startTime, activityTokenLocalBasePath)
tokens, err := a.newSingleTypeSegmentReader(ctx, startTime, activityTokenBasePath)
if err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions vault/activity_log_util_common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1003,7 +1003,7 @@ func writeTokenSegment(t *testing.T, core *Core, ts time.Time, index int, item *
t.Helper()
protoItem, err := proto.Marshal(item)
require.NoError(t, err)
WriteToStorage(t, core, makeSegmentPath(t, activityTokenLocalBasePath, ts, index), protoItem)
WriteToStorage(t, core, makeSegmentPath(t, activityTokenBasePath, ts, index), protoItem)
}

// makeSegmentPath formats the path for a segment at a particular time and index
Expand All @@ -1020,7 +1020,7 @@ func TestSegmentFileReader_BadData(t *testing.T) {
now := time.Now()

// write bad data that won't be able to be unmarshaled at index 0
WriteToStorage(t, core, makeSegmentPath(t, activityTokenLocalBasePath, now, 0), []byte("fake data"))
WriteToStorage(t, core, makeSegmentPath(t, activityTokenBasePath, now, 0), []byte("fake data"))
WriteToStorage(t, core, makeSegmentPath(t, activityEntityBasePath, now, 0), []byte("fake data"))

// write entity at index 1
Expand Down Expand Up @@ -1063,7 +1063,7 @@ func TestSegmentFileReader_MissingData(t *testing.T) {
now := time.Now()
// write entities and tokens at indexes 0, 1, 2
for i := 0; i < 3; i++ {
WriteToStorage(t, core, makeSegmentPath(t, activityTokenLocalBasePath, now, i), []byte("fake data"))
WriteToStorage(t, core, makeSegmentPath(t, activityTokenBasePath, now, i), []byte("fake data"))
WriteToStorage(t, core, makeSegmentPath(t, activityEntityBasePath, now, i), []byte("fake data"))

}
Expand All @@ -1084,7 +1084,7 @@ func TestSegmentFileReader_MissingData(t *testing.T) {

// delete the indexes 0, 1, 2
for i := 0; i < 3; i++ {
require.NoError(t, core.barrier.Delete(context.Background(), makeSegmentPath(t, activityTokenLocalBasePath, now, i)))
require.NoError(t, core.barrier.Delete(context.Background(), makeSegmentPath(t, activityTokenBasePath, now, i)))
require.NoError(t, core.barrier.Delete(context.Background(), makeSegmentPath(t, activityEntityBasePath, now, i)))
}

Expand Down
Loading