diff --git a/cloud/blockstore/libs/storage/disk_agent/actors/session_cache_actor.cpp b/cloud/blockstore/libs/storage/disk_agent/actors/session_cache_actor.cpp index c1fb2bbbb94..cab97b14a81 100644 --- a/cloud/blockstore/libs/storage/disk_agent/actors/session_cache_actor.cpp +++ b/cloud/blockstore/libs/storage/disk_agent/actors/session_cache_actor.cpp @@ -2,7 +2,7 @@ #include #include - +#include #include #include @@ -21,87 +21,131 @@ namespace { //////////////////////////////////////////////////////////////////////////////// -class TSessionCacheActor - : public TActorBootstrapped +NProto::TError SaveSessionCache( + const TString& path, + const TVector& sessions, + TInstant deadline) +{ + try { + NProto::TDiskAgentDeviceSessionCache proto; + proto.MutableSessions()->Reserve(static_cast(sessions.size())); + + // saving only active sessions + for (const auto& session: sessions) { + if (session.GetLastActivityTs() > deadline.MicroSeconds()) { + *proto.MutableSessions()->Add() = session; + } + } + + const TString tmpPath{path + ".tmp"}; + + SerializeToTextFormat(proto, tmpPath); + + if (!NFs::Rename(tmpPath, path)) { + char buf[64] = {}; + const auto ec = errno; + + return MakeError( + MAKE_SYSTEM_ERROR(ec), + strerror_r(ec, buf, sizeof(buf))); + } + } catch (...) { + return MakeError(E_FAIL, CurrentExceptionMessage()); + } + + return {}; +} + +//////////////////////////////////////////////////////////////////////////////// + +class TSessionCacheActor: public TActorBootstrapped { private: const TString CachePath; - - TVector Sessions; - TRequestInfoPtr RequestInfo; - NActors::IEventBasePtr Response; + const TDuration ReleaseInactiveSessionsTimeout; public: TSessionCacheActor( - TVector sessions, - TString cachePath, - TRequestInfoPtr requestInfo, - NActors::IEventBasePtr response) - : CachePath {std::move(cachePath)} - , Sessions {std::move(sessions)} - , RequestInfo {std::move(requestInfo)} - , Response {std::move(response)} + TString cachePath, + TDuration releaseInactiveSessionsTimeout) + : CachePath{std::move(cachePath)} + , ReleaseInactiveSessionsTimeout{releaseInactiveSessionsTimeout} { ActivityType = TBlockStoreComponents::DISK_AGENT_WORKER; } - void Bootstrap(const TActorContext& ctx); -}; - -//////////////////////////////////////////////////////////////////////////////// + void Bootstrap(const TActorContext& ctx) + { + Become(&TThis::StateWork); -void TSessionCacheActor::Bootstrap(const TActorContext& ctx) -{ - try { - NProto::TDiskAgentDeviceSessionCache proto; - proto.MutableSessions()->Assign( - std::make_move_iterator(Sessions.begin()), - std::make_move_iterator(Sessions.end()) - ); + LOG_INFO( + ctx, + TBlockStoreComponents::DISK_AGENT_WORKER, + "Session Cache Actor started"); + } - const TString tmpPath {CachePath + ".tmp"}; +private: + STFUNC(StateWork) + { + switch (ev->GetTypeRewrite()) { + HFunc(NActors::TEvents::TEvPoisonPill, HandlePoisonPill); + + HFunc( + TEvDiskAgentPrivate::TEvUpdateSessionCacheRequest, + HandleUpdateSessionCache); + + default: + HandleUnexpectedEvent( + ev, + TBlockStoreComponents::DISK_AGENT_WORKER); + break; + } + } - SerializeToTextFormat(proto, tmpPath); + void HandlePoisonPill( + const TEvents::TEvPoisonPill::TPtr& ev, + const TActorContext& ctx) + { + Y_UNUSED(ev); - if (!NFs::Rename(tmpPath, CachePath)) { - char buf[64] = {}; + Die(ctx); + } - const auto ec = errno; - ythrow TServiceError {MAKE_SYSTEM_ERROR(ec)} - << strerror_r(ec, buf, sizeof(buf)); - } - } catch (...) { - LOG_ERROR_S( + void HandleUpdateSessionCache( + const TEvDiskAgentPrivate::TEvUpdateSessionCacheRequest::TPtr& ev, + const TActorContext& ctx) + { + LOG_INFO( ctx, - ActivityType, - "Can't update session cache: " << CurrentExceptionMessage()); + TBlockStoreComponents::DISK_AGENT_WORKER, + "Update the session cache"); - ReportDiskAgentSessionCacheUpdateError(); - } + auto* msg = ev->Get(); - NCloud::Reply( - ctx, - *RequestInfo, - std::move(Response)); + const auto deadline = ctx.Now() - ReleaseInactiveSessionsTimeout; + Y_DEBUG_ABORT_UNLESS(deadline); - Die(ctx); -} + SaveSessionCache(CachePath, msg->Sessions, deadline); + + NCloud::Reply( + ctx, + *ev, + std::make_unique< + TEvDiskAgentPrivate::TEvUpdateSessionCacheResponse>()); + } +}; } // namespace //////////////////////////////////////////////////////////////////////////////// std::unique_ptr CreateSessionCacheActor( - TVector sessions, TString cachePath, - TRequestInfoPtr requestInfo, - NActors::IEventBasePtr response) + TDuration releaseInactiveSessionsTimeout) { return std::make_unique( - std::move(sessions), std::move(cachePath), - std::move(requestInfo), - std::move(response)); + releaseInactiveSessionsTimeout); } } // namespace NCloud::NBlockStore::NStorage::NDiskAgent diff --git a/cloud/blockstore/libs/storage/disk_agent/actors/session_cache_actor.h b/cloud/blockstore/libs/storage/disk_agent/actors/session_cache_actor.h index 370716b9791..af990f54bea 100644 --- a/cloud/blockstore/libs/storage/disk_agent/actors/session_cache_actor.h +++ b/cloud/blockstore/libs/storage/disk_agent/actors/session_cache_actor.h @@ -1,7 +1,6 @@ #pragma once #include - #include #include @@ -13,9 +12,7 @@ namespace NCloud::NBlockStore::NStorage::NDiskAgent { //////////////////////////////////////////////////////////////////////////////// std::unique_ptr CreateSessionCacheActor( - TVector sessions, TString cachePath, - TRequestInfoPtr requestInfo, - NActors::IEventBasePtr response); + TDuration releaseInactiveSessionsTimeout); } // namespace NCloud::NBlockStore::NStorage::NDiskAgent diff --git a/cloud/blockstore/libs/storage/disk_agent/actors/session_cache_actor_ut.cpp b/cloud/blockstore/libs/storage/disk_agent/actors/session_cache_actor_ut.cpp new file mode 100644 index 00000000000..b7eec939173 --- /dev/null +++ b/cloud/blockstore/libs/storage/disk_agent/actors/session_cache_actor_ut.cpp @@ -0,0 +1,185 @@ +#include "session_cache_actor.h" + +#include +#include + +#include + +#include +#include + +#include + +#include + +namespace NCloud::NBlockStore::NStorage::NDiskAgent { + +using namespace NActors; +using namespace std::chrono_literals; + +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +struct TActorSystem: NActors::TTestActorRuntimeBase +{ + void Start() + { + SetDispatchTimeout(5s); + InitNodes(); + AppendToLogSettings( + TBlockStoreComponents::START, + TBlockStoreComponents::END, + GetComponentName); + } +}; + +//////////////////////////////////////////////////////////////////////////////// + +struct TFixture: public NUnitTest::TBaseFixture +{ + const TTempDir TempDir; + const TString CachedSessionsPath = + TempDir.Path() / "nbs-disk-agent-sessions.txt"; + const TDuration ReleaseInactiveSessionsTimeout = 10s; + + TActorSystem ActorSystem; + TActorId SessionCacheActor; + TActorId EdgeActor; + + void SetUp(NUnitTest::TTestContext& /*context*/) override + { + ActorSystem.Start(); + + EdgeActor = ActorSystem.AllocateEdgeActor(); + + SessionCacheActor = + ActorSystem.Register(CreateSessionCacheActor( + CachedSessionsPath, + ReleaseInactiveSessionsTimeout) + .release()); + + ActorSystem.DispatchEvents( + {.FinalEvents = {{TEvents::TSystem::Bootstrap}}}, + 10ms); + } + + void UpdateSessionCache(TVector sessions) + { + ActorSystem.Send( + SessionCacheActor, + EdgeActor, + std::make_unique( + std::move(sessions)) + .release()); + + auto response = ActorSystem.GrabEdgeEvent< + TEvDiskAgentPrivate::TEvUpdateSessionCacheResponse>(); + + UNIT_ASSERT(response); + UNIT_ASSERT_VALUES_EQUAL_C( + S_OK, + response->GetStatus(), + response->GetError()); + } + + auto LoadSessionCache() + { + NProto::TDiskAgentDeviceSessionCache proto; + + ParseProtoTextFromFileRobust(CachedSessionsPath, proto); + + return TVector( + std::make_move_iterator(proto.MutableSessions()->begin()), + std::make_move_iterator(proto.MutableSessions()->end())); + } +}; + +} // namespace + +//////////////////////////////////////////////////////////////////////////////// + +Y_UNIT_TEST_SUITE(TSessionCacheActorTest) +{ + Y_UNIT_TEST_F(ShouldUpdateSessionCache, TFixture) + { + UNIT_ASSERT(!NFs::Exists(CachedSessionsPath)); + + ActorSystem.AdvanceCurrentTime(1h); + + UpdateSessionCache({}); + + UNIT_ASSERT(NFs::Exists(CachedSessionsPath)); + UNIT_ASSERT(LoadSessionCache().empty()); + + const auto updateTs1 = ActorSystem.GetCurrentTime(); + + { + NProto::TDiskAgentDeviceSession writer; + writer.SetClientId("client-1"); + writer.SetDiskId("vol0"); + writer.SetLastActivityTs( + (updateTs1 - ReleaseInactiveSessionsTimeout).MicroSeconds()); + + NProto::TDiskAgentDeviceSession reader; + reader.SetClientId("client-2"); + reader.SetReadOnly(true); + reader.SetDiskId("vol0"); + reader.SetLastActivityTs(updateTs1.MicroSeconds()); + + UpdateSessionCache({writer, reader}); + } + + { + auto sessions = LoadSessionCache(); + + // writer session was dropped because it was stale + UNIT_ASSERT_VALUES_EQUAL(1, sessions.size()); + UNIT_ASSERT_VALUES_EQUAL("client-2", sessions[0].GetClientId()); + UNIT_ASSERT_VALUES_EQUAL( + updateTs1.MicroSeconds(), + sessions[0].GetLastActivityTs()); + } + + ActorSystem.AdvanceCurrentTime(3s); + + const auto updateTs2 = ActorSystem.GetCurrentTime(); + + { + NProto::TDiskAgentDeviceSession writer; + writer.SetClientId("client-1"); + writer.SetDiskId("vol0"); + writer.SetLastActivityTs(updateTs2.MicroSeconds()); + + NProto::TDiskAgentDeviceSession reader; + reader.SetClientId("client-2"); + reader.SetReadOnly(true); + reader.SetDiskId("vol0"); + reader.SetLastActivityTs(updateTs2.MicroSeconds()); + + UpdateSessionCache({writer, reader}); + } + + { + auto sessions = LoadSessionCache(); + + UNIT_ASSERT_VALUES_EQUAL(2, sessions.size()); + SortBy(sessions, [](auto& s) { return s.GetClientId(); }); + + UNIT_ASSERT_VALUES_EQUAL("client-1", sessions[0].GetClientId()); + UNIT_ASSERT_VALUES_EQUAL( + updateTs2.MicroSeconds(), + sessions[0].GetLastActivityTs()); + + UNIT_ASSERT_VALUES_EQUAL("client-2", sessions[1].GetClientId()); + UNIT_ASSERT_VALUES_EQUAL( + updateTs2.MicroSeconds(), + sessions[1].GetLastActivityTs()); + } + + UpdateSessionCache({}); + UNIT_ASSERT(LoadSessionCache().empty()); + } +} + +} // namespace NCloud::NBlockStore::NStorage::NDiskAgent diff --git a/cloud/blockstore/libs/storage/disk_agent/actors/ut/ya.make b/cloud/blockstore/libs/storage/disk_agent/actors/ut/ya.make new file mode 100644 index 00000000000..726914ef1df --- /dev/null +++ b/cloud/blockstore/libs/storage/disk_agent/actors/ut/ya.make @@ -0,0 +1,15 @@ +UNITTEST_FOR(cloud/blockstore/libs/storage/disk_agent/actors) + +INCLUDE(${ARCADIA_ROOT}/cloud/storage/core/tests/recipes/small.inc) + +SRCS( + session_cache_actor_ut.cpp +) + +PEERDIR( + cloud/blockstore/libs/kikimr + + contrib/ydb/library/actors/testlib +) + +END() diff --git a/cloud/blockstore/libs/storage/disk_agent/actors/ya.make b/cloud/blockstore/libs/storage/disk_agent/actors/ya.make index 55b7a0a87b7..3a98da000db 100644 --- a/cloud/blockstore/libs/storage/disk_agent/actors/ya.make +++ b/cloud/blockstore/libs/storage/disk_agent/actors/ya.make @@ -6,6 +6,7 @@ SRCS( PEERDIR( cloud/blockstore/libs/storage/disk_agent/model + cloud/blockstore/libs/storage/protos cloud/storage/core/libs/actors cloud/storage/core/protos @@ -17,4 +18,5 @@ PEERDIR( END() RECURSE_FOR_TESTS( + ut ) diff --git a/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor.cpp b/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor.cpp index 49247f3a41e..be3dbc3ea53 100644 --- a/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor.cpp +++ b/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor.cpp @@ -113,20 +113,33 @@ void TDiskAgentActor::UpdateActorStats() } } -void TDiskAgentActor::UpdateSessionCacheAndRespond( - const NActors::TActorContext& ctx, - TRequestInfoPtr requestInfo, - NActors::IEventBasePtr response) +void TDiskAgentActor::UpdateSessionCache(const TActorContext& ctx) { - LOG_INFO(ctx, TBlockStoreComponents::DISK_AGENT, "Update the session cache"); + if (!SessionCacheActor) { + return; + } + + NCloud::Send( + ctx, + SessionCacheActor, + 0, // cookie + State->GetSessions()); +} + +void TDiskAgentActor::RunSessionCacheActor(const TActorContext& ctx) +{ + auto path = GetCachedSessionsPath(); + if (path.empty()) { + return; + } auto actor = NDiskAgent::CreateSessionCacheActor( - State->GetSessions(), - GetCachedSessionsPath(), - std::move(requestInfo), - std::move(response)); + std::move(path), + AgentConfig->GetReleaseInactiveSessionsTimeout()); - ctx.Register( + // Starting SessionCacheActor on the IO pool to avoid file operations in the + // User pool + SessionCacheActor = ctx.Register( actor.release(), TMailboxType::HTSwap, NKikimr::AppData()->IOPoolId); @@ -144,7 +157,7 @@ TString TDiskAgentActor::GetCachedSessionsPath() const void TDiskAgentActor::HandleReportDelayedDiskAgentConfigMismatch( const TEvDiskAgentPrivate::TEvReportDelayedDiskAgentConfigMismatch::TPtr& ev, - const NActors::TActorContext& ctx) + const TActorContext& ctx) { Y_UNUSED(ctx); const auto* msg = ev->Get(); @@ -165,6 +178,11 @@ void TDiskAgentActor::HandlePoisonPill( StatsActor = {}; } + if (SessionCacheActor) { + NCloud::Send(ctx, SessionCacheActor); + SessionCacheActor = {}; + } + State->StopTarget(); for (auto& [uuid, pendingRequests]: SecureErasePendingRequests) { @@ -253,6 +271,8 @@ STFUNC(TDiskAgentActor::StateWork) TEvDiskAgentPrivate::TEvReportDelayedDiskAgentConfigMismatch, HandleReportDelayedDiskAgentConfigMismatch); + IgnoreFunc(TEvDiskAgentPrivate::TEvUpdateSessionCacheResponse); + default: if (!HandleRequests(ev)) { HandleUnexpectedEvent(ev, TBlockStoreComponents::DISK_AGENT); diff --git a/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor.h b/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor.h index 79c09a3a5ec..385b0b9e069 100644 --- a/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor.h +++ b/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor.h @@ -75,6 +75,8 @@ class TDiskAgentActor final THashMap RecentBlocksTrackers; TList PostponedRequests; + NActors::TActorId SessionCacheActor; + public: TDiskAgentActor( TStorageConfigPtr config, @@ -137,10 +139,8 @@ class TDiskAgentActor final TString GetCachedSessionsPath() const; - void UpdateSessionCacheAndRespond( - const NActors::TActorContext& ctx, - TRequestInfoPtr requestInfo, - NActors::IEventBasePtr response); + void UpdateSessionCache(const NActors::TActorContext& ctx); + void RunSessionCacheActor(const NActors::TActorContext& ctx); private: STFUNC(StateInit); diff --git a/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_acquire.cpp b/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_acquire.cpp index dff3e508e8d..6025da6d479 100644 --- a/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_acquire.cpp +++ b/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_acquire.cpp @@ -67,18 +67,11 @@ void TDiskAgentActor::HandleAcquireDevices( record.GetDiskId(), record.GetVolumeGeneration()); - if (!Spdk || !record.HasRateLimits()) { - // If something has changed in sessions we should update the session - // cache (if it was configured). To do this, we spawn a special actor - // that updates the session cache and responds to the acquire request. - if (updated && GetCachedSessionsPath()) { - UpdateSessionCacheAndRespond( - ctx, - std::move(requestInfo), - std::make_unique()); - return; - } + if (updated) { + UpdateSessionCache(ctx); + } + if (!Spdk || !record.HasRateLimits()) { reply(NProto::TError()); return; } diff --git a/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_init.cpp b/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_init.cpp index 7298c4bdf5f..312cb89e92b 100644 --- a/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_init.cpp +++ b/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_init.cpp @@ -133,6 +133,8 @@ void TDiskAgentActor::HandleInitAgentCompleted( SendRegisterRequest(ctx); ScheduleUpdateStats(ctx); + + RunSessionCacheActor(ctx); } } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_release.cpp b/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_release.cpp index b846000c09a..a195f55430c 100644 --- a/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_release.cpp +++ b/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_release.cpp @@ -28,19 +28,8 @@ void TDiskAgentActor::HandleReleaseDevices( record.GetDiskId(), record.GetVolumeGeneration()); - // We should update the session cache (if it was configured) with every - // release request. - if (GetCachedSessionsPath()) { - UpdateSessionCacheAndRespond( - ctx, - CreateRequestInfo( - ev->Sender, - ev->Cookie, - ev->Get()->CallContext), - std::move(response)); - - return; - } + UpdateSessionCache(ctx); + } catch (const TServiceError& e) { *response->Record.MutableError() = MakeError(e.GetCode(), e.what()); } diff --git a/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_ut.cpp b/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_ut.cpp index ebdd01c5e2a..49b2b028956 100644 --- a/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_ut.cpp +++ b/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_ut.cpp @@ -19,6 +19,8 @@ #include +#include + namespace NCloud::NBlockStore::NStorage { using namespace NActors; @@ -28,6 +30,8 @@ using namespace NThreading; using namespace NDiskAgentTest; +using namespace std::chrono_literals; + namespace { //////////////////////////////////////////////////////////////////////////////// @@ -107,7 +111,9 @@ struct TFixture : public NUnitTest::TBaseFixture { const TTempDir TempDir; - const TString CachedSessionsPath = TempDir.Path() / "nbs-disk-agent-sessions.txt"; + const TString CachedSessionsPath = + TempDir.Path() / "nbs-disk-agent-sessions.txt"; + const TDuration ReleaseInactiveSessionsTimeout = 10s; TTestBasicRuntime Runtime; NMonitoring::TDynamicCountersPtr Counters = MakeIntrusive(); @@ -137,6 +143,8 @@ struct TFixture config.SetCachedSessionsPath(CachedSessionsPath); config.SetBackend(NProto::DISK_AGENT_BACKEND_AIO); config.SetAcquireRequired(true); + config.SetReleaseInactiveSessionsTimeout( + ReleaseInactiveSessionsTimeout.MilliSeconds()); return config; } @@ -176,6 +184,27 @@ struct TFixture return response->Record.GetError(); } + auto LoadSessionCache() + { + NProto::TDiskAgentDeviceSessionCache cache; + ParseProtoTextFromFileRobust(CachedSessionsPath, cache); + + TVector sessions( + std::make_move_iterator(cache.MutableSessions()->begin()), + std::make_move_iterator(cache.MutableSessions()->end()) + ); + + SortBy(sessions, [] (auto& session) { + return session.GetClientId(); + }); + + for (auto& session: sessions) { + Sort(*session.MutableDeviceIds()); + } + + return sessions; + } + void SetUp(NUnitTest::TTestContext& /*context*/) override { InitCriticalEventsCounter(Counters); @@ -3764,19 +3793,25 @@ Y_UNIT_TEST_SUITE(TDiskAgentTest) NProto::TAgentConfig agentConfig; - Runtime.SetEventFilter([&] (auto&, TAutoPtr& event) { - if (event->GetTypeRewrite() == TEvDiskRegistry::EvRegisterAgentRequest) { - auto& msg = *event->Get(); + Runtime.SetEventFilter( + [&](auto&, TAutoPtr& event) + { + if (event->GetTypeRewrite() == + TEvDiskRegistry::EvRegisterAgentRequest) + { + auto& msg = + *event->Get(); + + agentConfig = msg.Record.GetAgentConfig(); + } - agentConfig = msg.Record.GetAgentConfig(); - } + return false; + }); - return false; - }); + auto env = + TTestEnvBuilder(Runtime).With(CreateDiskAgentConfig()).Build(); - auto env = TTestEnvBuilder(Runtime) - .With(CreateDiskAgentConfig()) - .Build(); + Runtime.AdvanceCurrentTime(1h); TDiskAgentClient diskAgent(Runtime); diskAgent.WaitReady(); @@ -3786,52 +3821,48 @@ Y_UNIT_TEST_SUITE(TDiskAgentTest) UNIT_ASSERT_VALUES_EQUAL(4, agentConfig.DevicesSize()); TVector devices; - for (auto& d: agentConfig.GetDevices()) { + for (const auto& d: agentConfig.GetDevices()) { devices.push_back(d.GetDeviceUUID()); } Sort(devices); // acquire a bunch of devices + const auto acquireTs = Runtime.GetCurrentTime(); + diskAgent.AcquireDevices( - TVector { devices[0], devices[1] }, + TVector{devices[0], devices[1]}, "writer-1", NProto::VOLUME_ACCESS_READ_WRITE, - 42, // MountSeqNumber + 42, // MountSeqNumber "vol0", - 1000); // VolumeGeneration + 1000); // VolumeGeneration diskAgent.AcquireDevices( - TVector { devices[0], devices[1] }, + TVector{devices[0], devices[1]}, "reader-1", NProto::VOLUME_ACCESS_READ_ONLY, - -1, // MountSeqNumber + -1, // MountSeqNumber "vol0", - 1001); // VolumeGeneration + 1001); // VolumeGeneration diskAgent.AcquireDevices( - TVector { devices[2], devices[3] }, + TVector{devices[2], devices[3]}, "reader-2", NProto::VOLUME_ACCESS_READ_ONLY, - -1, // MountSeqNumber + -1, // MountSeqNumber "vol1", - 2000); // VolumeGeneration + 2000); // VolumeGeneration // validate the cache file { - NProto::TDiskAgentDeviceSessionCache cache; - ParseProtoTextFromFileRobust(CachedSessionsPath, cache); + auto sessions = LoadSessionCache(); - UNIT_ASSERT_VALUES_EQUAL(3, cache.SessionsSize()); - - SortBy(*cache.MutableSessions(), [] (auto& session) { - return session.GetClientId(); - }); + UNIT_ASSERT_VALUES_EQUAL(3, sessions.size()); { - auto& session = *cache.MutableSessions(0); - Sort(*session.MutableDeviceIds()); + auto& session = sessions[0]; UNIT_ASSERT_VALUES_EQUAL("reader-1", session.GetClientId()); UNIT_ASSERT_VALUES_EQUAL(2, session.DeviceIdsSize()); @@ -3843,12 +3874,14 @@ Y_UNIT_TEST_SUITE(TDiskAgentTest) UNIT_ASSERT_VALUES_EQUAL(0, session.GetMountSeqNumber()); UNIT_ASSERT_VALUES_EQUAL("vol0", session.GetDiskId()); UNIT_ASSERT_VALUES_EQUAL(1001, session.GetVolumeGeneration()); - UNIT_ASSERT_VALUES_EQUAL(0, session.GetLastActivityTs()); + UNIT_ASSERT_VALUES_EQUAL( + acquireTs.MicroSeconds(), + session.GetLastActivityTs()); UNIT_ASSERT(session.GetReadOnly()); } { - auto& session = *cache.MutableSessions(1); + auto& session = sessions[1]; UNIT_ASSERT_VALUES_EQUAL("reader-2", session.GetClientId()); UNIT_ASSERT_VALUES_EQUAL(2, session.DeviceIdsSize()); @@ -3860,13 +3893,14 @@ Y_UNIT_TEST_SUITE(TDiskAgentTest) UNIT_ASSERT_VALUES_EQUAL(0, session.GetMountSeqNumber()); UNIT_ASSERT_VALUES_EQUAL("vol1", session.GetDiskId()); UNIT_ASSERT_VALUES_EQUAL(2000, session.GetVolumeGeneration()); - UNIT_ASSERT_VALUES_EQUAL(0, session.GetLastActivityTs()); + UNIT_ASSERT_VALUES_EQUAL( + acquireTs.MicroSeconds(), + session.GetLastActivityTs()); UNIT_ASSERT(session.GetReadOnly()); } { - auto& session = *cache.MutableSessions(2); - Sort(*session.MutableDeviceIds()); + auto& session = sessions[2]; UNIT_ASSERT_VALUES_EQUAL("writer-1", session.GetClientId()); UNIT_ASSERT_VALUES_EQUAL(2, session.DeviceIdsSize()); @@ -3879,33 +3913,35 @@ Y_UNIT_TEST_SUITE(TDiskAgentTest) // VolumeGeneration was updated by reader-1 UNIT_ASSERT_VALUES_EQUAL(1001, session.GetVolumeGeneration()); - UNIT_ASSERT_VALUES_EQUAL(0, session.GetLastActivityTs()); + UNIT_ASSERT_VALUES_EQUAL( + acquireTs.MicroSeconds(), + session.GetLastActivityTs()); UNIT_ASSERT(!session.GetReadOnly()); } } + Runtime.AdvanceCurrentTime(5s); + diskAgent.ReleaseDevices( - TVector { devices[0], devices[1] }, + TVector{devices[0], devices[1]}, "writer-1", "vol0", 1001); diskAgent.ReleaseDevices( - TVector { devices[2], devices[3] }, + TVector{devices[2], devices[3]}, "reader-2", "vol1", 2000); // check the session cache again { - NProto::TDiskAgentDeviceSessionCache cache; - ParseProtoTextFromFileRobust(CachedSessionsPath, cache); + auto sessions = LoadSessionCache(); // now we have only one session - UNIT_ASSERT_VALUES_EQUAL(1, cache.SessionsSize()); + UNIT_ASSERT_VALUES_EQUAL(1, sessions.size()); - auto& session = *cache.MutableSessions(0); - Sort(*session.MutableDeviceIds()); + auto& session = sessions[0]; UNIT_ASSERT_VALUES_EQUAL("reader-1", session.GetClientId()); UNIT_ASSERT_VALUES_EQUAL(2, session.DeviceIdsSize()); @@ -3917,7 +3953,9 @@ Y_UNIT_TEST_SUITE(TDiskAgentTest) UNIT_ASSERT_VALUES_EQUAL(0, session.GetMountSeqNumber()); UNIT_ASSERT_VALUES_EQUAL("vol0", session.GetDiskId()); UNIT_ASSERT_VALUES_EQUAL(1001, session.GetVolumeGeneration()); - UNIT_ASSERT_VALUES_EQUAL(0, session.GetLastActivityTs()); + UNIT_ASSERT_VALUES_EQUAL( + acquireTs.MicroSeconds(), + session.GetLastActivityTs()); UNIT_ASSERT(session.GetReadOnly()); } } @@ -3931,6 +3969,8 @@ Y_UNIT_TEST_SUITE(TDiskAgentTest) "e85cd1d217c3239507fc0cd180a075fd" }; + const auto initialTs = Now(); + { NProto::TDiskAgentDeviceSessionCache cache; auto& writeSession = *cache.AddSessions(); @@ -3941,6 +3981,7 @@ Y_UNIT_TEST_SUITE(TDiskAgentTest) writeSession.SetMountSeqNumber(42); writeSession.SetDiskId("vol0"); writeSession.SetVolumeGeneration(1000); + writeSession.SetLastActivityTs(initialTs.MicroSeconds()); auto& reader1 = *cache.AddSessions(); reader1.SetClientId("reader-1"); @@ -3949,6 +3990,7 @@ Y_UNIT_TEST_SUITE(TDiskAgentTest) reader1.SetReadOnly(true); reader1.SetDiskId("vol0"); reader1.SetVolumeGeneration(1000); + reader1.SetLastActivityTs(initialTs.MicroSeconds()); auto& reader2 = *cache.AddSessions(); reader2.SetClientId("reader-2"); @@ -3957,6 +3999,7 @@ Y_UNIT_TEST_SUITE(TDiskAgentTest) reader2.SetReadOnly(true); reader2.SetDiskId("vol1"); reader2.SetVolumeGeneration(2000); + reader2.SetLastActivityTs(initialTs.MicroSeconds()); SerializeToTextFormat(cache, CachedSessionsPath); } @@ -3971,6 +4014,8 @@ Y_UNIT_TEST_SUITE(TDiskAgentTest) .With(CreateDiskAgentConfig()) .Build(); + Runtime.UpdateCurrentTime(initialTs + 3s); + TDiskAgentClient diskAgent(Runtime); diskAgent.WaitReady(); @@ -4030,6 +4075,41 @@ Y_UNIT_TEST_SUITE(TDiskAgentTest) auto error = Read(diskAgent, "reader-2", devices[3]); UNIT_ASSERT_EQUAL_C(S_OK, error.GetCode(), error); } + + // make all sessions stale + Runtime.AdvanceCurrentTime(ReleaseInactiveSessionsTimeout); + + const auto acquireTs = Runtime.GetCurrentTime(); + + diskAgent.AcquireDevices( + TVector{devices[0], devices[1]}, + "writer-1", + NProto::VOLUME_ACCESS_READ_WRITE, + 42, // MountSeqNumber + "vol0", + 1000); // VolumeGeneration + + { + auto sessions = LoadSessionCache(); + + // now we have only one session + UNIT_ASSERT_VALUES_EQUAL(1, sessions.size()); + + auto& session = sessions[0]; + + UNIT_ASSERT_VALUES_EQUAL("writer-1", session.GetClientId()); + UNIT_ASSERT_VALUES_EQUAL(2, session.DeviceIdsSize()); + ASSERT_VECTORS_EQUAL( + TVector({devices[0], devices[1]}), + session.GetDeviceIds()); + + UNIT_ASSERT_VALUES_EQUAL(42, session.GetMountSeqNumber()); + UNIT_ASSERT_VALUES_EQUAL("vol0", session.GetDiskId()); + UNIT_ASSERT_VALUES_EQUAL(1000, session.GetVolumeGeneration()); + UNIT_ASSERT_VALUES_EQUAL( + acquireTs.MicroSeconds(), + session.GetLastActivityTs()); + } } Y_UNIT_TEST_F(ShouldHandleBrokenCacheSessions, TFixture) diff --git a/cloud/blockstore/libs/storage/disk_agent/disk_agent_private.h b/cloud/blockstore/libs/storage/disk_agent/disk_agent_private.h index 9bdf4c6228f..4d8058d5a4a 100644 --- a/cloud/blockstore/libs/storage/disk_agent/disk_agent_private.h +++ b/cloud/blockstore/libs/storage/disk_agent/disk_agent_private.h @@ -4,6 +4,7 @@ #include "storage_with_stats.h" +#include #include #include #include @@ -129,6 +130,24 @@ struct TEvDiskAgentPrivate {} }; + // + // UpdateSessionCache + // + + struct TUpdateSessionCacheRequest + { + TVector Sessions; + + TUpdateSessionCacheRequest() = default; + explicit TUpdateSessionCacheRequest( + TVector sessions) + : Sessions(std::move(sessions)) + {} + }; + + struct TUpdateSessionCacheResponse + {}; + // // Events declaration // @@ -144,6 +163,8 @@ struct TEvDiskAgentPrivate EvWriteOrZeroCompleted, EvReportDelayedDiskAgentConfigMismatch, + BLOCKSTORE_DECLARE_EVENT_IDS(UpdateSessionCache) + EvEnd }; @@ -167,6 +188,8 @@ struct TEvDiskAgentPrivate using TEvReportDelayedDiskAgentConfigMismatch = TResponseEvent< TReportDelayedDiskAgentConfigMismatch, EvReportDelayedDiskAgentConfigMismatch>; + + BLOCKSTORE_DECLARE_EVENTS(UpdateSessionCache) }; } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/disk_agent/disk_agent_state.cpp b/cloud/blockstore/libs/storage/disk_agent/disk_agent_state.cpp index ef50f7d51e2..9537d6acd96 100644 --- a/cloud/blockstore/libs/storage/disk_agent/disk_agent_state.cpp +++ b/cloud/blockstore/libs/storage/disk_agent/disk_agent_state.cpp @@ -869,9 +869,8 @@ void TDiskAgentState::RestoreSessions(TDeviceClient& client) const uuids, session.GetClientId(), TInstant::MicroSeconds(session.GetLastActivityTs()), - session.GetReadOnly() - ? NProto::VOLUME_ACCESS_READ_ONLY - : NProto::VOLUME_ACCESS_READ_WRITE, + session.GetReadOnly() ? NProto::VOLUME_ACCESS_READ_ONLY + : NProto::VOLUME_ACCESS_READ_WRITE, session.GetMountSeqNumber(), session.GetDiskId(), session.GetVolumeGeneration()); diff --git a/cloud/blockstore/libs/storage/disk_agent/model/device_client.cpp b/cloud/blockstore/libs/storage/disk_agent/model/device_client.cpp index f2a460fa7f0..33a96993599 100644 --- a/cloud/blockstore/libs/storage/disk_agent/model/device_client.cpp +++ b/cloud/blockstore/libs/storage/disk_agent/model/device_client.cpp @@ -39,6 +39,7 @@ TVector TDeviceClient::GetSessions() const session.SetDiskId(state->DiskId); session.SetVolumeGeneration(state->VolumeGeneration); session.SetMountSeqNumber(ws.MountSeqNumber); + session.SetLastActivityTs(ws.LastActivityTs.MicroSeconds()); } *session.AddDeviceIds() = id; } @@ -51,6 +52,7 @@ TVector TDeviceClient::GetSessions() const session.SetDiskId(state->DiskId); session.SetVolumeGeneration(state->VolumeGeneration); session.SetMountSeqNumber(rs.MountSeqNumber); + session.SetLastActivityTs(rs.LastActivityTs.MicroSeconds()); } *session.AddDeviceIds() = id; } @@ -154,12 +156,17 @@ TResultOrError TDeviceClient::AcquireDevices( somethingHasChanged = true; } + // a new session or activation of a stale session + if (s == ds.ReaderSessions.end() || + s->LastActivityTs + ReleaseInactiveSessionsTimeout <= now) + { + somethingHasChanged = true; + } + if (s == ds.ReaderSessions.end()) { ds.ReaderSessions.push_back({clientId, now}); STORAGE_INFO("Device %s was acquired by client %s for reading.", uuid.Quote().c_str(), clientId.c_str()); - - somethingHasChanged = true; } else if (now > s->LastActivityTs) { s->LastActivityTs = now; } @@ -179,7 +186,11 @@ TResultOrError TDeviceClient::AcquireDevices( somethingHasChanged = true; } - if (ds.WriterSession.MountSeqNumber != mountSeqNumber) { + if (ds.WriterSession.MountSeqNumber != mountSeqNumber || + ds.WriterSession.LastActivityTs + + ReleaseInactiveSessionsTimeout <= + now) + { somethingHasChanged = true; } diff --git a/cloud/blockstore/libs/storage/disk_agent/model/device_client_ut.cpp b/cloud/blockstore/libs/storage/disk_agent/model/device_client_ut.cpp index 07645498fcd..05be345c90a 100644 --- a/cloud/blockstore/libs/storage/disk_agent/model/device_client_ut.cpp +++ b/cloud/blockstore/libs/storage/disk_agent/model/device_client_ut.cpp @@ -8,8 +8,12 @@ #include +#include + namespace NCloud::NBlockStore::NStorage { +using namespace std::chrono_literals; + namespace { //////////////////////////////////////////////////////////////////////////////// @@ -130,13 +134,15 @@ struct TDeviceClientParams struct TFixture : public NUnitTest::TBaseFixture { + const TDuration ReleaseInactiveSessionsTimeout = 10s; + ILoggingServicePtr Logging = CreateLoggingService("console"); TDeviceClient CreateClient(TDeviceClientParams params = {}) { return TDeviceClient( - TDuration::Seconds(10), - params.Devices, + ReleaseInactiveSessionsTimeout, + std::move(params.Devices), Logging->CreateLog("BLOCKSTORE_DISK_AGENT")); } }; @@ -601,13 +607,15 @@ Y_UNIT_TEST_SUITE(TDeviceClientTest) UNIT_ASSERT_VALUES_EQUAL(0, client.GetSessions().size()); + const auto acquireWriterTs = TInstant::Seconds(42); + AcquireDevices( client, TAcquireParamsBuilder() .SetUuids({"uuid2", "uuid1"}) .SetClientId("writer") .SetDiskId("vol1") - .SetNow(TInstant::Seconds(42)) + .SetNow(acquireWriterTs) .SetVolumeGeneration(1)); { @@ -618,12 +626,16 @@ Y_UNIT_TEST_SUITE(TDeviceClientTest) UNIT_ASSERT_VALUES_EQUAL("writer", session.GetClientId()); UNIT_ASSERT_VALUES_EQUAL(1, session.GetVolumeGeneration()); UNIT_ASSERT(!session.GetReadOnly()); - UNIT_ASSERT_VALUES_EQUAL(0, session.GetLastActivityTs()); + UNIT_ASSERT_VALUES_EQUAL( + acquireWriterTs.MicroSeconds(), + session.GetLastActivityTs()); UNIT_ASSERT_VALUES_EQUAL(2, session.DeviceIdsSize()); UNIT_ASSERT_VALUES_EQUAL("uuid1", session.GetDeviceIds(0)); UNIT_ASSERT_VALUES_EQUAL("uuid2", session.GetDeviceIds(1)); } + const auto acquireReaderTs = TInstant::Seconds(42); + AcquireDevices( client, TAcquireParamsBuilder() @@ -631,7 +643,7 @@ Y_UNIT_TEST_SUITE(TDeviceClientTest) .SetClientId("reader") .SetDiskId("vol1") .SetAccessMode(NProto::VOLUME_ACCESS_READ_ONLY) - .SetNow(TInstant::Seconds(100)) + .SetNow(acquireReaderTs) .SetVolumeGeneration(2)); { @@ -642,7 +654,9 @@ Y_UNIT_TEST_SUITE(TDeviceClientTest) UNIT_ASSERT_VALUES_EQUAL("reader", session.GetClientId()); UNIT_ASSERT_VALUES_EQUAL(2, session.GetVolumeGeneration()); UNIT_ASSERT(session.GetReadOnly()); - UNIT_ASSERT_VALUES_EQUAL(0, session.GetLastActivityTs()); + UNIT_ASSERT_VALUES_EQUAL( + acquireReaderTs.MicroSeconds(), + session.GetLastActivityTs()); UNIT_ASSERT_VALUES_EQUAL(2, session.DeviceIdsSize()); UNIT_ASSERT_VALUES_EQUAL("uuid1", session.GetDeviceIds(0)); UNIT_ASSERT_VALUES_EQUAL("uuid2", session.GetDeviceIds(1)); @@ -653,7 +667,9 @@ Y_UNIT_TEST_SUITE(TDeviceClientTest) UNIT_ASSERT_VALUES_EQUAL("writer", session.GetClientId()); UNIT_ASSERT_VALUES_EQUAL(2, session.GetVolumeGeneration()); UNIT_ASSERT(!session.GetReadOnly()); - UNIT_ASSERT_VALUES_EQUAL(0, session.GetLastActivityTs()); + UNIT_ASSERT_VALUES_EQUAL( + acquireWriterTs.MicroSeconds(), + session.GetLastActivityTs()); UNIT_ASSERT_VALUES_EQUAL(2, session.DeviceIdsSize()); UNIT_ASSERT_VALUES_EQUAL("uuid1", session.GetDeviceIds(0)); UNIT_ASSERT_VALUES_EQUAL("uuid2", session.GetDeviceIds(1)); @@ -669,11 +685,13 @@ Y_UNIT_TEST_SUITE(TDeviceClientTest) UNIT_ASSERT_VALUES_EQUAL(0, client.GetSessions().size()); + TInstant now = Now(); + { auto [updated, error] = client.AcquireDevices( {"uuid2", "uuid1"}, // uuids "writer", // ClientId - TInstant::Seconds(10), // now + now, NProto::VOLUME_ACCESS_READ_WRITE, 1, // MountSeqNumber "vol0", // DiskId @@ -684,11 +702,33 @@ Y_UNIT_TEST_SUITE(TDeviceClientTest) UNIT_ASSERT(updated); // new write session } + // make the writer session stale + now += ReleaseInactiveSessionsTimeout; + + { + auto [updated, error] = client.AcquireDevices( + {"uuid2", "uuid1"}, // uuids + "writer", // ClientId + now, + NProto::VOLUME_ACCESS_READ_WRITE, + 1, // MountSeqNumber + "vol0", // DiskId + 1 // VolumeGeneration + ); + + UNIT_ASSERT_C(!HasError(error), error); + // writer session was activated + UNIT_ASSERT(updated); + } + + // writer session still active + now += 5s; + { auto [updated, error] = client.AcquireDevices( {"uuid2", "uuid1"}, // uuids "writer", // ClientId - TInstant::Seconds(100), // now + now, NProto::VOLUME_ACCESS_READ_WRITE, 1, // MountSeqNumber "vol0", // DiskId @@ -696,14 +736,17 @@ Y_UNIT_TEST_SUITE(TDeviceClientTest) ); UNIT_ASSERT_C(!HasError(error), error); + // nothing was changed UNIT_ASSERT(!updated); } + now += 5s; + { auto [updated, error] = client.AcquireDevices( {"uuid2", "uuid1"}, // uuids "writer", // ClientId - TInstant::Seconds(200), // now + now, NProto::VOLUME_ACCESS_READ_WRITE, 1, // MountSeqNumber "vol0", // DiskId @@ -714,11 +757,13 @@ Y_UNIT_TEST_SUITE(TDeviceClientTest) UNIT_ASSERT(updated); // new volumeGeneration } + now += 5s; + { auto [updated, error] = client.AcquireDevices( {"uuid2", "uuid1"}, // uuids "reader", // ClientId - TInstant::Seconds(300), // now + now, NProto::VOLUME_ACCESS_READ_ONLY, 1, // MountSeqNumber "vol0", // DiskId @@ -729,11 +774,13 @@ Y_UNIT_TEST_SUITE(TDeviceClientTest) UNIT_ASSERT(updated); // new read session } + now += 5s; + { auto [updated, error] = client.AcquireDevices( {"uuid2", "uuid1"}, // uuids "reader2", // ClientId - TInstant::Seconds(300), // now + now, NProto::VOLUME_ACCESS_READ_ONLY, 1, // MountSeqNumber "vol0", // DiskId diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.cpp b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.cpp index 7bac6c95ad0..254c81cd825 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.cpp +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.cpp @@ -754,6 +754,9 @@ STFUNC(TDiskRegistryActor::StateReadOnly) TEvDiskRegistryPrivate::TEvDiskRegistryAgentListExpiredParamsCleanup, TDiskRegistryActor::HandleDiskRegistryAgentListExpiredParamsCleanupReadOnly); + HFunc(TEvDiskRegistryPrivate::TEvCleanupDisksResponse, + HandleCleanupDisksResponse); + default: if (!RejectRequests(ev)) { LogUnexpectedEvent(