diff --git a/package.yaml b/package.yaml index 5517b9e..04d0afe 100644 --- a/package.yaml +++ b/package.yaml @@ -44,7 +44,9 @@ dependencies: - bytestring - bytes - case-insensitive +- cborg - clock +- conduit - containers - cookie - cryptonite @@ -94,7 +96,10 @@ dependencies: - servant-auth - servant-client - servant-server +- servant-conduit +- serialise - stm +- stm-chans - text - these - time diff --git a/share-api.cabal b/share-api.cabal index 4ecdde1..9b41547 100644 --- a/share-api.cabal +++ b/share-api.cabal @@ -94,6 +94,7 @@ library Share.Utils.Logging.Types Share.Utils.Postgres Share.Utils.Servant + Share.Utils.Servant.CBOR Share.Utils.Servant.Client Share.Utils.Servant.PathInfo Share.Utils.Servant.RawRequest @@ -152,6 +153,9 @@ library Share.Web.UCM.Sync.HashJWT Share.Web.UCM.Sync.Impl Share.Web.UCM.Sync.Types + Share.Web.UCM.SyncV2.API + Share.Web.UCM.SyncV2.Impl + Share.Web.UCM.SyncV2.Queries Unison.PrettyPrintEnvDecl.Postgres Unison.Server.NameSearch.Postgres Unison.Server.Share.Definitions @@ -208,7 +212,9 @@ library , bytes , bytestring , case-insensitive + , cborg , clock + , conduit , containers , cookie , cryptonite @@ -252,13 +258,16 @@ library , raven-haskell , safe , semialign + , serialise , servant , servant-auth , servant-client + , servant-conduit , servant-server , share-auth , share-utils , stm + , stm-chans , text , these , time @@ -351,7 +360,9 @@ executable share-api , bytes , bytestring , case-insensitive + , cborg , clock + , conduit , containers , cookie , cryptonite @@ -395,14 +406,17 @@ executable share-api , raven-haskell , safe , semialign + , serialise , servant , servant-auth , servant-client + , servant-conduit , servant-server , share-api , share-auth , share-utils , stm + , stm-chans , text , these , time diff --git a/sql/2024-09-00-00_sync_v2.sql b/sql/2024-09-00-00_sync_v2.sql new file mode 100644 index 0000000..9c4e7fb --- /dev/null +++ b/sql/2024-09-00-00_sync_v2.sql @@ -0,0 +1,36 @@ +CREATE TABLE serialized_components ( + -- The user the term is sandboxed to. + user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE, + component_hash_id INTEGER NOT NULL REFERENCES component_hashes(id) ON DELETE CASCADE, + + -- The serialized component + bytes_id INTEGER NOT NULL REFERENCES bytes(id) ON DELETE NO ACTION, + + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + + PRIMARY KEY (user_id, component_hash_id) INCLUDE (bytes_id) +); + +-- namespaces don't need to be sandboxed to user. +CREATE TABLE serialized_namespaces ( + namespace_hash_id INTEGER NOT NULL REFERENCES branch_hashes(id) ON DELETE NO ACTION, + + -- The serialized namespace + bytes_id INTEGER NOT NULL REFERENCES bytes(id) ON DELETE NO ACTION, + + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + + PRIMARY KEY (namespace_hash_id) INCLUDE (bytes_id) +); + +CREATE TABLE serialized_patches ( + patch_id INTEGER NOT NULL REFERENCES patches(id) ON DELETE CASCADE, + bytes_id INTEGER NOT NULL REFERENCES bytes(id) ON DELETE NO ACTION, + created_at TIMESTAMP NOT NULL DEFAULT NOW() +); + +CREATE TABLE serialized_causals ( + causal_id INTEGER NOT NULL REFERENCES causals(id) ON DELETE CASCADE, + bytes_id INTEGER NOT NULL REFERENCES bytes(id) ON DELETE NO ACTION, + created_at TIMESTAMP NOT NULL DEFAULT NOW() +); diff --git a/src/Share/Postgres.hs b/src/Share/Postgres.hs index e301a66..c99b017 100644 --- a/src/Share/Postgres.hs +++ b/src/Share/Postgres.hs @@ -19,7 +19,6 @@ module Share.Postgres Interp.DecodeValue (..), Interp.DecodeRow (..), Interp.DecodeField, - RawBytes (..), Only (..), QueryA (..), QueryM (..), @@ -101,6 +100,7 @@ import Share.Utils.Logging qualified as Logging import Share.Web.App import Share.Web.Errors (ErrorID (..), SomeServerError, ToServerError (..), internalServerError, respondError, someServerError) import System.CPUTime (getCPUTime) +import Unison.Debug qualified as UDebug data TransactionError e = Unrecoverable SomeServerError @@ -334,7 +334,9 @@ instance QueryA (Transaction e) where statement q s = do transactionStatement q s - unrecoverableError e = Transaction (pure (Left (Unrecoverable (someServerError e)))) + unrecoverableError e = do + UDebug.debugM UDebug.Temp "Unrecoverable error in transaction: " e + Transaction (pure (Left (Unrecoverable (someServerError e)))) instance QueryM (Transaction e) where transactionUnsafeIO io = Transaction (Right <$> liftIO io) @@ -343,7 +345,9 @@ instance QueryA (Session e) where statement q s = do lift $ Session.statement q s - unrecoverableError e = throwError (Unrecoverable (someServerError e)) + unrecoverableError e = do + UDebug.debugM UDebug.Temp "Unrecoverable error in transaction: " e + throwError (Unrecoverable (someServerError e)) instance QueryM (Session e) where transactionUnsafeIO io = lift $ liftIO io @@ -356,7 +360,8 @@ instance QueryA (Pipeline e) where instance (QueryM m) => QueryA (ReaderT e m) where statement q s = lift $ statement q s - unrecoverableError e = lift $ unrecoverableError e + unrecoverableError e = do + lift $ unrecoverableError e instance (QueryM m) => QueryM (ReaderT e m) where transactionUnsafeIO io = lift $ transactionUnsafeIO io @@ -469,17 +474,6 @@ cachedForOf trav s f = do cachedFor :: (Traversable t, Monad m, Ord a) => t a -> (a -> m b) -> m (t b) cachedFor = cachedForOf traversed --- | Preferably you should use custom newtypes for your bytes, but you can use this with --- deriving via to get the encoding/decoding instances. -newtype RawBytes = RawBytes {unRawBytes :: ByteString} - deriving stock (Show, Eq, Ord) - -instance Interp.EncodeValue RawBytes where - encodeValue = contramap unRawBytes Encoders.bytea - -instance Interp.DecodeValue RawBytes where - decodeValue = RawBytes <$> Decoders.bytea - -- | Useful when running queries using a join over `toTable` which may be empty. -- Without explicitly handling the empty case we'll waste time sending a query to PG -- that we know can't return any results. diff --git a/src/Share/Postgres/Definitions/Queries.hs b/src/Share/Postgres/Definitions/Queries.hs index 3bce6ad..d6b2574 100644 --- a/src/Share/Postgres/Definitions/Queries.hs +++ b/src/Share/Postgres/Definitions/Queries.hs @@ -39,6 +39,7 @@ import Data.Set qualified as Set import Data.Text qualified as Text import Data.Vector (Vector) import Data.Vector qualified as Vector +import Servant (err500) import Share.Codebase.Types (CodebaseEnv (..), CodebaseM) import Share.IDs import Share.Postgres @@ -49,9 +50,8 @@ import Share.Postgres.Hashes.Queries qualified as HashQ import Share.Postgres.IDs import Share.Prelude import Share.Utils.Logging qualified as Logging -import Share.Utils.Postgres (OrdBy) +import Share.Utils.Postgres (OrdBy, RawBytes (..)) import Share.Web.Errors (ErrorID (..), InternalServerError (InternalServerError), ToServerError (..)) -import Servant (err500) import U.Codebase.Decl qualified as Decl import U.Codebase.Decl qualified as V2 hiding (Type) import U.Codebase.Decl qualified as V2Decl @@ -270,7 +270,7 @@ expectTypeComponent componentRef = do -- | This isn't in CodebaseM so that we can run it in a normal transaction to build the Code -- Lookup. -loadTermById :: QueryM m => UserId -> TermId -> m (Maybe (V2.Term Symbol, V2.Type Symbol)) +loadTermById :: (QueryM m) => UserId -> TermId -> m (Maybe (V2.Term Symbol, V2.Type Symbol)) loadTermById codebaseUser termId = runMaybeT $ do (TermComponentElement trm typ) <- MaybeT $ @@ -288,7 +288,7 @@ loadTermById codebaseUser termId = runMaybeT $ do localIds = LocalIds.LocalIds {textLookup = Vector.fromList textLookup, defnLookup = Vector.fromList defnLookup} pure $ s2cTermWithType (localIds, trm, typ) -termLocalTextReferences :: QueryM m => TermId -> m [Text] +termLocalTextReferences :: (QueryM m) => TermId -> m [Text] termLocalTextReferences termId = queryListCol [sql| @@ -299,7 +299,7 @@ termLocalTextReferences termId = ORDER BY local_index ASC |] -termLocalComponentReferences :: QueryM m => TermId -> m [ComponentHash] +termLocalComponentReferences :: (QueryM m) => TermId -> m [ComponentHash] termLocalComponentReferences termId = queryListCol [sql| @@ -342,10 +342,10 @@ resolveConstructorTypeLocalIds (LocalIds.LocalIds {textLookup, defnLookup}) = substText i = textLookup ^?! ix (fromIntegral i) substHash i = unComponentHash $ (defnLookup ^?! ix (fromIntegral i)) -loadDeclKind :: PG.QueryM m => Reference.Id -> m (Maybe CT.ConstructorType) +loadDeclKind :: (PG.QueryM m) => Reference.Id -> m (Maybe CT.ConstructorType) loadDeclKind = loadDeclKindsOf id -loadDeclKindsOf :: PG.QueryM m => Traversal s t Reference.Id (Maybe CT.ConstructorType) -> s -> m t +loadDeclKindsOf :: (PG.QueryM m) => Traversal s t Reference.Id (Maybe CT.ConstructorType) -> s -> m t loadDeclKindsOf trav s = s & unsafePartsOf trav %%~ \refIds -> do @@ -517,7 +517,7 @@ constructorReferentsByPrefix prefix mayComponentIndex mayConstructorIndex = do -- -- This is intentionally not in CodebaseM because this method is used to build the -- CodebaseEnv. -loadCachedEvalResult :: QueryM m => UserId -> Reference.Id -> m (Maybe (V2.Term Symbol)) +loadCachedEvalResult :: (QueryM m) => UserId -> Reference.Id -> m (Maybe (V2.Term Symbol)) loadCachedEvalResult codebaseOwnerUserId (Reference.Id hash compIndex) = runMaybeT do let compIndex' = pgComponentIndex compIndex (evalResultId :: EvalResultId, EvalResultTerm term) <- @@ -557,12 +557,12 @@ loadCachedEvalResult codebaseOwnerUserId (Reference.Id hash compIndex) = runMayb pure $ resolveTermLocalIds localIds term -- | Get text ids for all provided texts, inserting any that don't already exist. -ensureTextIds :: QueryM m => Traversable t => t Text -> m (t TextId) +ensureTextIds :: (QueryM m) => (Traversable t) => t Text -> m (t TextId) ensureTextIds = ensureTextIdsOf traversed -- | Efficiently saves all Text's focused by the provided traversal into the database and -- replaces them with their corresponding Ids. -ensureTextIdsOf :: QueryM m => Traversal s t Text TextId -> s -> m t +ensureTextIdsOf :: (QueryM m) => Traversal s t Text TextId -> s -> m t ensureTextIdsOf trav s = do s & unsafePartsOf trav %%~ \texts -> do @@ -589,12 +589,12 @@ ensureTextIdsOf trav s = do else pure results -- | Get text ids for all provided texts, inserting any that don't already exist. -ensureBytesIds :: QueryM m => Traversable t => t BS.ByteString -> m (t BytesId) +ensureBytesIds :: (QueryM m) => (Traversable t) => t BS.ByteString -> m (t BytesId) ensureBytesIds = ensureBytesIdsOf traversed -- | Efficiently saves all Text's focused by the provided traversal into the database and -- replaces them with their corresponding Ids. -ensureBytesIdsOf :: QueryM m => Traversal s t BS.ByteString BytesId -> s -> m t +ensureBytesIdsOf :: (QueryM m) => Traversal s t BS.ByteString BytesId -> s -> m t ensureBytesIdsOf trav s = do s & unsafePartsOf trav %%~ \bytestrings -> do @@ -621,7 +621,7 @@ ensureBytesIdsOf trav s = do else pure results -- | Efficiently loads Texts for all TextIds focused by the provided traversal. -expectTextsOf :: QueryM m => Traversal s t TextId Text -> s -> m t +expectTextsOf :: (QueryM m) => Traversal s t TextId Text -> s -> m t expectTextsOf trav = unsafePartsOf trav %%~ \textIds -> do let numberedTextIds = zip [0 :: Int32 ..] textIds @@ -649,7 +649,7 @@ localizeTerm tm = do -- | Replace all references in a term with local references. _localizeTermAndType :: - HasCallStack => + (HasCallStack) => V2.Term Symbol -> V2.Type Symbol -> Transaction e (PgLocalIds, TermFormat.Term, TermFormat.Type) @@ -997,7 +997,7 @@ resolveLocalIdsOf trav s = do >>= HashQ.expectComponentHashesOf (traversed . LocalIds.h_) -- | Fetch term tags for all the provided Referents. -termTagsByReferentsOf :: HasCallStack => Traversal s t Referent.Referent Tags.TermTag -> s -> Transaction e t +termTagsByReferentsOf :: (HasCallStack) => Traversal s t Referent.Referent Tags.TermTag -> s -> Transaction e t termTagsByReferentsOf trav s = do s & unsafePartsOf trav %%~ \refs -> do @@ -1080,7 +1080,7 @@ termTagsByReferentsOf trav s = do (refTagRow Tags.Test Decls.testResultListRef) ] -typeTagsByReferencesOf :: HasCallStack => Traversal s t TypeReference Tags.TypeTag -> s -> Transaction e t +typeTagsByReferencesOf :: (HasCallStack) => Traversal s t TypeReference Tags.TypeTag -> s -> Transaction e t typeTagsByReferencesOf trav s = do s & unsafePartsOf trav %%~ \refs -> do diff --git a/src/Share/Postgres/Definitions/Types.hs b/src/Share/Postgres/Definitions/Types.hs index 5cb6895..78522cc 100644 --- a/src/Share/Postgres/Definitions/Types.hs +++ b/src/Share/Postgres/Definitions/Types.hs @@ -24,13 +24,13 @@ module Share.Postgres.Definitions.Types ) where -import Share.Postgres qualified as PG -import Share.Postgres.Serialization qualified as S -import Share.Prelude import Hasql.Decoders qualified as Decoders import Hasql.Decoders qualified as Hasql import Hasql.Encoders qualified as Encoders import Hasql.Interpolate (DecodeValue (..), EncodeValue (..)) +import Share.Postgres.Serialization qualified as S +import Share.Prelude +import Share.Utils.Postgres qualified as PG import U.Codebase.Decl qualified as DD import U.Codebase.Decl qualified as Decl import U.Codebase.Reference qualified as Reference diff --git a/src/Share/Postgres/Orphans.hs b/src/Share/Postgres/Orphans.hs index 461d415..381214f 100644 --- a/src/Share/Postgres/Orphans.hs +++ b/src/Share/Postgres/Orphans.hs @@ -18,6 +18,7 @@ import Servant (err500) import Servant.API import Share.Prelude import Share.Utils.Logging qualified as Logging +import Share.Utils.Postgres (RawLazyBytes (..)) import Share.Web.Errors (ErrorID (..), ToServerError (..)) import U.Codebase.HashTags (BranchHash (..), CausalHash (..), ComponentHash (..), PatchHash (..)) import U.Codebase.Reference (Id' (Id), Reference' (..)) @@ -34,6 +35,7 @@ import Unison.Hash32 (Hash32) import Unison.Hash32 qualified as Hash32 import Unison.Name (Name) import Unison.NameSegment.Internal (NameSegment (..)) +import Unison.SyncV2.Types (CBORBytes (..)) import Unison.Syntax.Name qualified as Name -- Orphans for 'Hash' @@ -215,6 +217,8 @@ instance Hasql.DecodeValue SqliteTermEdit.Typing where _ -> Nothing ) +deriving via RawLazyBytes instance Hasql.DecodeValue (CBORBytes t) + instance ToServerError Hasql.SessionError where toServerError _ = (ErrorID "query-error", err500) diff --git a/src/Share/Postgres/Search/DefinitionSearch/Queries.hs b/src/Share/Postgres/Search/DefinitionSearch/Queries.hs index 875ee88..6acb98f 100644 --- a/src/Share/Postgres/Search/DefinitionSearch/Queries.hs +++ b/src/Share/Postgres/Search/DefinitionSearch/Queries.hs @@ -36,6 +36,7 @@ import Unison.Name (Name) import Unison.Name qualified as Name import Unison.Server.Types (TermTag (..), TypeTag (..)) import Unison.ShortHash (ShortHash) +import Unison.ShortHash qualified as SH import Unison.Syntax.Name qualified as Name import Unison.Syntax.NameSegment qualified as NameSegment @@ -191,11 +192,11 @@ searchTokenToText shouldAddWildcards = \case makeSearchToken typeMentionTypeByNameType (Text.toLower (reversedNameText name)) (Just occ) & addWildCard TypeMentionToken (Right sh) occ -> - makeSearchToken typeMentionTypeByHashType (into @Text @ShortHash sh) (Just occ) + makeSearchToken typeMentionTypeByHashType (SH.toText sh) (Just occ) & addWildCard TypeVarToken varId occ -> makeSearchToken typeVarType (varIdText varId) (Just occ) HashToken sh -> - makeSearchToken hashType (into @Text sh) Nothing + makeSearchToken hashType (SH.toText sh) Nothing & addWildCard TermTagToken termTag -> makeSearchToken tagType (termTagText termTag) Nothing TypeTagToken typTag -> makeSearchToken tagType (typeTagText typTag) Nothing diff --git a/src/Share/Postgres/Sync/Queries.hs b/src/Share/Postgres/Sync/Queries.hs index da10bf7..d093ff5 100644 --- a/src/Share/Postgres/Sync/Queries.hs +++ b/src/Share/Postgres/Sync/Queries.hs @@ -1,3 +1,5 @@ +{-# LANGUAGE StandaloneDeriving #-} + -- | Queries related to sync and temp entities. module Share.Postgres.Sync.Queries ( expectEntity, @@ -21,6 +23,7 @@ import Data.Set qualified as Set import Data.Set.NonEmpty (NESet) import Data.Set.NonEmpty qualified as NESet import Data.Vector qualified as Vector +import Servant (ServerError (..), err500) import Share.Codebase.Types (CodebaseM) import Share.Codebase.Types qualified as Codebase import Share.IDs @@ -39,7 +42,6 @@ import Share.Prelude import Share.Utils.Logging qualified as Logging import Share.Web.Errors (InternalServerError (..), ToServerError (..), Unimplemented (Unimplemented)) import Share.Web.UCM.Sync.Types -import Servant (ServerError (..), err500) import U.Codebase.Branch qualified as V2 import U.Codebase.Causal qualified as U import U.Codebase.Sqlite.Branch.Format (LocalBranchBytes (LocalBranchBytes)) @@ -51,6 +53,7 @@ import U.Codebase.Sqlite.Decode qualified as Decoders import U.Codebase.Sqlite.Entity qualified as Entity import U.Codebase.Sqlite.LocalIds qualified as LocalIds import U.Codebase.Sqlite.LocalizeObject qualified as Localize +import U.Codebase.Sqlite.Patch.Format (PatchLocalIds' (patchDefnLookup)) import U.Codebase.Sqlite.Patch.Format qualified as PatchFormat import U.Codebase.Sqlite.Patch.Full qualified as PatchFull import U.Codebase.Sqlite.Queries qualified as Share @@ -61,6 +64,8 @@ import Unison.Hash32 import Unison.Hash32 qualified as Hash32 import Unison.Sync.Common qualified as Share import Unison.Sync.Types qualified as Share +import Unison.SyncV2.Types (CBORBytes (..), EntityKind (..)) +import Unison.SyncV2.Types qualified as SyncV2 data SyncQError = InvalidNamespaceBytes @@ -82,7 +87,7 @@ instance Logging.Loggable SyncQError where toLog = Logging.withSeverity Logging.Error . Logging.showLog -- | Read an entity out of the database that we know is in main storage. -expectEntity :: HasCallStack => Hash32 -> CodebaseM e (Share.Entity Text Hash32 Hash32) +expectEntity :: (HasCallStack) => Hash32 -> CodebaseM e (Share.Entity Text Hash32 Hash32) expectEntity hash = do expectEntityKindForHash hash >>= \case CausalEntity -> Share.C <$> expectCausalEntity (CausalHash . Hash32.toHash $ hash) @@ -92,7 +97,7 @@ expectEntity hash = do PatchEntity -> Share.P <$> expectPatchEntity (PatchHash . Hash32.toHash $ hash) where -expectCausalEntity :: HasCallStack => CausalHash -> CodebaseM e (Share.Causal Hash32) +expectCausalEntity :: (HasCallStack) => CausalHash -> CodebaseM e (Share.Causal Hash32) expectCausalEntity hash = do causalId <- CausalQ.expectCausalIdByHash hash U.Causal {valueHash, parents} <- CausalQ.expectCausalNamespace causalId @@ -103,7 +108,7 @@ expectCausalEntity hash = do } ) -expectNamespaceEntity :: HasCallStack => BranchHash -> CodebaseM e (Share.Namespace Text Hash32) +expectNamespaceEntity :: (HasCallStack) => BranchHash -> CodebaseM e (Share.Namespace Text Hash32) expectNamespaceEntity bh = do bhId <- HashQ.expectBranchHashId bh v2Branch <- CausalQ.expectNamespace bhId @@ -122,17 +127,17 @@ expectNamespaceEntity bh = do bytes = bytes } -expectTermComponentEntity :: HasCallStack => ComponentHash -> CodebaseM e (Share.TermComponent Text Hash32) +expectTermComponentEntity :: (HasCallStack) => ComponentHash -> CodebaseM e (Share.TermComponent Text Hash32) expectTermComponentEntity hash = do chId <- HashQ.expectComponentHashId hash DefnQ.expectShareTermComponent chId -expectTypeComponentEntity :: HasCallStack => ComponentHash -> CodebaseM e (Share.DeclComponent Text Hash32) +expectTypeComponentEntity :: (HasCallStack) => ComponentHash -> CodebaseM e (Share.DeclComponent Text Hash32) expectTypeComponentEntity hash = do chId <- HashQ.expectComponentHashId hash DefnQ.expectShareTypeComponent chId -expectPatchEntity :: HasCallStack => PatchHash -> CodebaseM e (Share.Patch Text Hash32 Hash32) +expectPatchEntity :: (HasCallStack) => PatchHash -> CodebaseM e (Share.Patch Text Hash32 Hash32) expectPatchEntity patchHash = do patchId <- HashQ.expectPatchIdsOf id patchHash v2Patch <- PatchQ.expectPatch patchId @@ -154,7 +159,7 @@ expectPatchEntity patchHash = do & pure -- | Determine the kind of an arbitrary hash. -expectEntityKindForHash :: HasCallStack => Hash32 -> CodebaseM e EntityKind +expectEntityKindForHash :: (HasCallStack) => Hash32 -> CodebaseM e EntityKind expectEntityKindForHash h = do queryExpect1Row @@ -317,7 +322,7 @@ entityLocations sortedEntities = do -- | Save a temp entity to the temp entities table, also tracking its missing dependencies. -- You can pass ALL the dependencies of the temp entity, the query will determine which ones -- are missing. -saveTempEntities :: Foldable f => f (Hash32, Share.Entity Text Hash32 Hash32) -> CodebaseM e () +saveTempEntities :: (Foldable f) => f (Hash32, Share.Entity Text Hash32 Hash32) -> CodebaseM e () saveTempEntities entities = do codebaseOwnerUserId <- asks Codebase.codebaseOwner let tempEntities = @@ -401,10 +406,12 @@ clearTempDependencies hash = do -- | Save a temp entity to main storage, and clear any missing dependency rows for it, and -- return the set of hashes which dependended on it and _might_ now be ready to flush. -saveTempEntityInMain :: forall e. HasCallStack => Hash32 -> TempEntity -> CodebaseM e (Set Hash32) +saveTempEntityInMain :: forall e. (HasCallStack) => Hash32 -> TempEntity -> CodebaseM e (Set Hash32) saveTempEntityInMain hash entity = do saveEntity entity - clearTempDependencies hash + dependencies <- clearTempDependencies hash + saveSerializedEntities [(hash, entity)] + pure dependencies where saveEntity :: TempEntity -> CodebaseM e () saveEntity = \case @@ -573,3 +580,55 @@ getEntitiesReadyToFlush = do AND missing_dep.user_id = #{codebaseOwnerUserId} ) |] + +saveSerializedEntities :: (Foldable f) => f (Hash32, TempEntity) -> CodebaseM e () +saveSerializedEntities entities = do + for_ entities \(hash, entity) -> do + let serialised = SyncV2.serialiseCBORBytes entity + case entity of + Entity.TC {} -> saveSerializedComponent hash serialised + Entity.DC {} -> saveSerializedComponent hash serialised + Entity.P {} -> saveSerializedPatch hash serialised + Entity.C {} -> saveSerializedCausal hash serialised + Entity.N {} -> saveSerializedNamespace hash serialised + +saveSerializedComponent :: Hash32 -> CBORBytes TempEntity -> CodebaseM e () +saveSerializedComponent hash (CBORBytes bytes) = do + codebaseOwnerUserId <- asks Codebase.codebaseOwner + bytesId <- DefnQ.ensureBytesIdsOf id (BL.toStrict bytes) + execute_ + [sql| + INSERT INTO serialized_components (user_id, component_hash_id, bytes_id) + VALUES (#{codebaseOwnerUserId}, (SELECT ch.id FROM component_hashes ch where ch.base32 = #{hash}), #{bytesId}) + ON CONFLICT DO NOTHING + |] + +saveSerializedPatch :: Hash32 -> CBORBytes TempEntity -> CodebaseM e () +saveSerializedPatch hash (CBORBytes bytes) = do + bytesId <- DefnQ.ensureBytesIdsOf id (BL.toStrict bytes) + execute_ + [sql| + INSERT INTO serialized_patches (patch_id, bytes_id) + VALUES ((SELECT p.id FROM patches p where p.hash = #{hash}), #{bytesId}) + ON CONFLICT DO NOTHING + |] + +saveSerializedCausal :: Hash32 -> CBORBytes TempEntity -> CodebaseM e () +saveSerializedCausal hash (CBORBytes bytes) = do + bytesId <- DefnQ.ensureBytesIdsOf id (BL.toStrict bytes) + execute_ + [sql| + INSERT INTO serialized_causals (causal_id, bytes_id) + VALUES ((SELECT c.id FROM causals c where c.hash = #{hash}), #{bytesId}) + ON CONFLICT DO NOTHING + |] + +saveSerializedNamespace :: Hash32 -> CBORBytes TempEntity -> CodebaseM e () +saveSerializedNamespace hash (CBORBytes bytes) = do + bytesId <- DefnQ.ensureBytesIdsOf id (BL.toStrict bytes) + execute_ + [sql| + INSERT INTO serialized_namespaces (namespace_hash_id, bytes_id) + VALUES ((SELECT bh.id FROM branch_hashes bh where bh.base32 = #{hash}), #{bytesId}) + ON CONFLICT DO NOTHING + |] diff --git a/src/Share/Postgres/Sync/Types.hs b/src/Share/Postgres/Sync/Types.hs index fc23e04..077fa51 100644 --- a/src/Share/Postgres/Sync/Types.hs +++ b/src/Share/Postgres/Sync/Types.hs @@ -1,9 +1,9 @@ module Share.Postgres.Sync.Types (TypedTempEntity (..)) where +import Hasql.Interpolate qualified as Hasql import Share.Postgres (decodeField) -import Share.Postgres qualified as PG import Share.Postgres.Serialization qualified as S -import Hasql.Interpolate qualified as Hasql +import Share.Utils.Postgres (RawBytes (..)) import U.Codebase.Sqlite.TempEntity (TempEntity) -- | Helper for deserializing typed temp entities. @@ -14,7 +14,7 @@ newtype TypedTempEntity = TypedTempEntity {unTypedTempEntity :: TempEntity} instance Hasql.DecodeRow TypedTempEntity where decodeRow = do entityType <- decodeField - PG.RawBytes entityBytes <- decodeField + RawBytes entityBytes <- decodeField case S.decodeTypedTempEntity entityType entityBytes of Left err -> fail (show err) Right tempEntity -> pure (TypedTempEntity tempEntity) diff --git a/src/Share/Utils/Postgres.hs b/src/Share/Utils/Postgres.hs index 227f7d3..0830552 100644 --- a/src/Share/Utils/Postgres.hs +++ b/src/Share/Utils/Postgres.hs @@ -1,17 +1,22 @@ module Share.Utils.Postgres ( OrdBy (..), ordered, + RawBytes (..), + RawLazyBytes (..), ) where -import Share.Postgres qualified as PG +import Data.ByteString.Lazy qualified as BL +import Hasql.Decoders qualified as Decoders +import Hasql.Encoders qualified as Encoders +import Hasql.Interpolate qualified as Hasql import Share.Prelude -- | A type for propagating an application-code ordering through a database query. -- We can't trust the order returned by PG, so we make sure to order things explicitly. newtype OrdBy = OrdBy {unOrdBy :: Int32} deriving stock (Eq, Ord, Show) - deriving (PG.DecodeValue, PG.EncodeValue) via Int32 + deriving (Hasql.DecodeValue, Hasql.EncodeValue) via Int32 instance From Int OrdBy where from = OrdBy . fromIntegral @@ -21,3 +26,25 @@ instance From Int32 OrdBy where ordered :: [a] -> [(OrdBy, a)] ordered = zip (OrdBy <$> [0 ..]) + +-- | Preferably you should use custom newtypes for your bytes, but you can use this with +-- deriving via to get the encoding/decoding instances. +newtype RawBytes = RawBytes {unRawBytes :: ByteString} + deriving stock (Show, Eq, Ord) + +instance Hasql.EncodeValue RawBytes where + encodeValue = contramap unRawBytes Encoders.bytea + +instance Hasql.DecodeValue RawBytes where + decodeValue = RawBytes <$> Decoders.bytea + +-- | Preferably you should use custom newtypes for your bytes, but you can use this with +-- deriving via to get the encoding/decoding instances. +newtype RawLazyBytes = RawLazyBytes {unLazyRawBytes :: BL.ByteString} + deriving stock (Show, Eq, Ord) + +instance Hasql.EncodeValue RawLazyBytes where + encodeValue = contramap (BL.toStrict . unLazyRawBytes) Encoders.bytea + +instance Hasql.DecodeValue RawLazyBytes where + decodeValue = RawLazyBytes . BL.fromStrict <$> Decoders.bytea diff --git a/src/Share/Utils/Servant/CBOR.hs b/src/Share/Utils/Servant/CBOR.hs new file mode 100644 index 0000000..85fa517 --- /dev/null +++ b/src/Share/Utils/Servant/CBOR.hs @@ -0,0 +1,43 @@ +-- | Servant configuration for the CBOR media type +-- +-- Adapted from https://hackage.haskell.org/package/servant-serialization-0.3/docs/Servant-API-ContentTypes-SerialiseCBOR.html via MIT license +module Share.Utils.Servant.CBOR (CBOR) where + +import Codec.CBOR.Read (DeserialiseFailure (..)) +import Codec.Serialise (Serialise, deserialiseOrFail, serialise) +import Data.List.NonEmpty qualified as NonEmpty +import Network.HTTP.Media.MediaType qualified as MediaType +import Servant + +-- | Content-type for encoding and decoding objects as their CBOR representations +data CBOR + +-- | Mime-type for CBOR +instance Accept CBOR where + contentTypes Proxy = + NonEmpty.singleton ("application" MediaType.// "cbor") + +-- | +-- +-- >>> mimeRender (Proxy :: Proxy CBOR) ("Hello" :: String) +-- "eHello" +instance (Serialise a) => MimeRender CBOR a where + mimeRender Proxy = serialise + +-- | +-- +-- >>> let bsl = mimeRender (Proxy :: Proxy CBOR) (3.14 :: Float) +-- >>> mimeUnrender (Proxy :: Proxy CBOR) bsl :: Either String Float +-- Right 3.14 +-- +-- >>> mimeUnrender (Proxy :: Proxy CBOR) (bsl <> "trailing garbage") :: Either String Float +-- Right 3.14 +-- +-- >>> mimeUnrender (Proxy :: Proxy CBOR) ("preceding garbage" <> bsl) :: Either String Float +-- Left "Codec.Serialise.deserialiseOrFail: expected float at byte-offset 0" +instance (Serialise a) => MimeUnrender CBOR a where + mimeUnrender Proxy = mapLeft prettyErr . deserialiseOrFail + where + mapLeft f = either (Left . f) Right + prettyErr (DeserialiseFailure offset err) = + "Codec.Serialise.deserialiseOrFail: " ++ err ++ " at byte-offset " ++ show offset diff --git a/src/Share/Web/API.hs b/src/Share/Web/API.hs index d188e3e..b1fbace 100644 --- a/src/Share/Web/API.hs +++ b/src/Share/Web/API.hs @@ -13,6 +13,7 @@ import Share.Web.Share.API qualified as Share import Share.Web.Share.Projects.API qualified as Projects import Share.Web.Support.API qualified as Support import Share.Web.Types +import Share.Web.UCM.SyncV2.API qualified as SyncV2 import Unison.Share.API.Projects qualified as UCMProjects import Unison.Sync.API qualified as Unison.Sync @@ -36,6 +37,7 @@ type API = :<|> ("sync" :> MaybeAuthenticatedSession :> Unison.Sync.API) :<|> ("ucm" :> "v1" :> "sync" :> MaybeAuthenticatedSession :> Unison.Sync.API) :<|> ("ucm" :> "v1" :> "projects" :> MaybeAuthenticatedSession :> UCMProjects.ProjectsAPI) + :<|> ("ucm" :> "v2" :> "sync" :> SyncV2.API) :<|> ("admin" :> Admin.API) api :: Proxy API diff --git a/src/Share/Web/App.hs b/src/Share/Web/App.hs index bccf764..f3aa70e 100644 --- a/src/Share/Web/App.hs +++ b/src/Share/Web/App.hs @@ -3,6 +3,7 @@ module Share.Web.App ( RequestCtx (..), WebApp, + WebAppServer, ReqTagsVar, localRequestCtx, withLocalTag, @@ -23,6 +24,7 @@ import Control.Monad.Reader import Data.Map qualified as Map import Network.URI import Servant +import Servant.Server.Generic (AsServerT) import Share.App import Share.Env import Share.Env qualified as Env @@ -34,6 +36,8 @@ import UnliftIO.STM type WebApp = AppM RequestCtx +type WebAppServer = AsServerT WebApp + type ReqTagsVar = TVar (Map Text Text) -- | Context which is local to a single request. diff --git a/src/Share/Web/Impl.hs b/src/Share/Web/Impl.hs index 7228424..04b576f 100644 --- a/src/Share/Web/Impl.hs +++ b/src/Share/Web/Impl.hs @@ -24,6 +24,7 @@ import Share.Web.Support.Impl qualified as Support import Share.Web.Types import Share.Web.UCM.Projects.Impl qualified as UCMProjects import Share.Web.UCM.Sync.Impl qualified as Sync +import Share.Web.UCM.SyncV2.Impl qualified as SyncV2 discoveryEndpoint :: WebApp DiscoveryDocument discoveryEndpoint = do @@ -72,4 +73,5 @@ server = :<|> Sync.server -- Deprecated path :<|> Sync.server :<|> UCMProjects.server + :<|> SyncV2.server :<|> Admin.server diff --git a/src/Share/Web/UCM/Sync/Impl.hs b/src/Share/Web/UCM/Sync/Impl.hs index 2bd7131..c03cd0f 100644 --- a/src/Share/Web/UCM/Sync/Impl.hs +++ b/src/Share/Web/UCM/Sync/Impl.hs @@ -9,6 +9,7 @@ module Share.Web.UCM.Sync.Impl -- This export can be removed once we've migrated away from sqlite. insertEntitiesToCodebase, ensureCausalIsFlushed, + repoInfoKind, ) where @@ -48,7 +49,7 @@ import Share.Web.Authentication qualified as AuthN import Share.Web.Authorization qualified as AuthZ import Share.Web.Errors import Share.Web.UCM.Sync.HashJWT qualified as HashJWT -import Share.Web.UCM.Sync.Types (EntityBunch (..), EntityKind (..), entityKind) +import Share.Web.UCM.Sync.Types (EntityBunch (..), RepoInfoKind (..), entityKind) import U.Codebase.Causal qualified as Causal import U.Codebase.Sqlite.Orphans () import Unison.Codebase.Path qualified as Path @@ -63,14 +64,9 @@ import Unison.Sync.EntityValidation qualified as Sync import Unison.Sync.Types (DownloadEntitiesError (..), DownloadEntitiesRequest (..), DownloadEntitiesResponse (..), GetCausalHashByPathRequest (..), GetCausalHashByPathResponse (..), NeedDependencies (..), RepoInfo (..), UploadEntitiesError (..), UploadEntitiesRequest (..), UploadEntitiesResponse (..)) import Unison.Sync.Types qualified as Share import Unison.Sync.Types qualified as Sync +import Unison.SyncV2.Types (EntityKind (..)) import UnliftIO qualified -data RepoInfoKind - = RepoInfoUser UserHandle - | RepoInfoProjectBranch ProjectBranchShortHand - | RepoInfoProjectRelease ProjectReleaseShortHand - deriving stock (Show) - -- | Parse a `RepoInfo` into the correct codebase view, e.g. -- -- >>> repoInfoKind (RepoInfo "@unison") diff --git a/src/Share/Web/UCM/Sync/Types.hs b/src/Share/Web/UCM/Sync/Types.hs index 69b919e..26d5e38 100644 --- a/src/Share/Web/UCM/Sync/Types.hs +++ b/src/Share/Web/UCM/Sync/Types.hs @@ -1,12 +1,14 @@ module Share.Web.UCM.Sync.Types ( EntityBunch (..), - EntityKind (..), entityKind, + RepoInfoKind (..), ) where +import Share.IDs (ProjectBranchShortHand, ProjectReleaseShortHand, UserHandle) import Share.Prelude import Unison.Sync.Types qualified as Share +import Unison.SyncV2.Types qualified as SyncV2 -- | Helper type for handling entities of different types. data EntityBunch a = EntityBunch @@ -25,20 +27,18 @@ instance Semigroup (EntityBunch a) where instance Monoid (EntityBunch a) where mempty = EntityBunch [] [] [] [] [] -data EntityKind - = CausalEntity - | NamespaceEntity - | TermEntity - | TypeEntity - | PatchEntity - deriving (Show, Eq, Ord) - -entityKind :: HasCallStack => Share.Entity text hash hash' -> EntityKind +entityKind :: (HasCallStack) => Share.Entity text hash hash' -> SyncV2.EntityKind entityKind = \case - Share.C _ -> CausalEntity - Share.N _ -> NamespaceEntity + Share.C _ -> SyncV2.CausalEntity + Share.N _ -> SyncV2.NamespaceEntity Share.ND _ -> error "entityKind: Unsupported Entity Kind: NamespaceDiff" - Share.TC _ -> TermEntity - Share.DC _ -> TypeEntity - Share.P _ -> PatchEntity + Share.TC _ -> SyncV2.TermEntity + Share.DC _ -> SyncV2.TypeEntity + Share.P _ -> SyncV2.PatchEntity Share.PD _ -> error "entityKind: Unsupported Entity Kind: PatchDiff" + +data RepoInfoKind + = RepoInfoUser UserHandle + | RepoInfoProjectBranch ProjectBranchShortHand + | RepoInfoProjectRelease ProjectReleaseShortHand + deriving stock (Show) diff --git a/src/Share/Web/UCM/SyncV2/API.hs b/src/Share/Web/UCM/SyncV2/API.hs new file mode 100644 index 0000000..5f689c4 --- /dev/null +++ b/src/Share/Web/UCM/SyncV2/API.hs @@ -0,0 +1,10 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE TypeOperators #-} + +module Share.Web.UCM.SyncV2.API (API) where + +import Servant +import Share.OAuth.Session (MaybeAuthenticatedUserId) +import Unison.SyncV2.API qualified as SyncV2 + +type API = MaybeAuthenticatedUserId :> NamedRoutes SyncV2.Routes diff --git a/src/Share/Web/UCM/SyncV2/Impl.hs b/src/Share/Web/UCM/SyncV2/Impl.hs new file mode 100644 index 0000000..096753a --- /dev/null +++ b/src/Share/Web/UCM/SyncV2/Impl.hs @@ -0,0 +1,150 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE TypeOperators #-} + +module Share.Web.UCM.SyncV2.Impl (server) where + +import Conduit qualified as C +import Control.Concurrent.STM qualified as STM +import Control.Concurrent.STM.TBMQueue qualified as STM +import Control.Monad.Except (ExceptT (ExceptT)) +import Control.Monad.Trans.Except (runExceptT) +import Data.Conduit.Combinators qualified as C +import Data.List.NonEmpty qualified as NEL +import Servant +import Servant.Conduit (ConduitToSourceIO (..)) +import Servant.Types.SourceT (SourceT (..)) +import Servant.Types.SourceT qualified as SourceT +import Share.Codebase qualified as Codebase +import Share.IDs (ProjectBranchShortHand (..), ProjectReleaseShortHand (..), ProjectShortHand (..), UserHandle, UserId) +import Share.IDs qualified as IDs +import Share.Postgres qualified as PG +import Share.Postgres.Causal.Queries qualified as CausalQ +import Share.Postgres.Cursors qualified as Cursor +import Share.Postgres.Queries qualified as PGQ +import Share.Prelude +import Share.Project (Project (..)) +import Share.User (User (..)) +import Share.Utils.Logging qualified as Logging +import Share.Utils.Unison (hash32ToCausalHash) +import Share.Web.App +import Share.Web.Authorization qualified as AuthZ +import Share.Web.Errors +import Share.Web.UCM.Sync.HashJWT qualified as HashJWT +import Share.Web.UCM.SyncV2.Queries qualified as SSQ +import U.Codebase.Sqlite.Orphans () +import Unison.Debug qualified as Debug +import Unison.Hash32 (Hash32) +import Unison.Share.API.Hash (HashJWTClaims (..)) +import Unison.SyncV2.API qualified as SyncV2 +import Unison.SyncV2.Types (DownloadEntitiesChunk (..), EntityChunk (..), ErrorChunk (..), StreamInitInfo (..)) +import Unison.SyncV2.Types qualified as SyncV2 +import UnliftIO qualified +import UnliftIO.Async qualified as Async + +batchSize :: Int32 +batchSize = 1000 + +streamSettings :: Hash32 -> Maybe SyncV2.BranchRef -> StreamInitInfo +streamSettings rootCausalHash rootBranchRef = StreamInitInfo {version = SyncV2.Version 1, entitySorting = SyncV2.Unsorted, numEntities = Nothing, rootCausalHash, rootBranchRef} + +server :: Maybe UserId -> SyncV2.Routes WebAppServer +server mayUserId = + SyncV2.Routes + { downloadEntitiesStream = downloadEntitiesStreamImpl mayUserId + } + +parseBranchRef :: SyncV2.BranchRef -> Either Text (Either ProjectReleaseShortHand ProjectBranchShortHand) +parseBranchRef (SyncV2.BranchRef branchRef) = + case parseRelease <|> parseBranch of + Just a -> Right a + Nothing -> Left $ "Invalid repo info: " <> branchRef + where + parseBranch :: Maybe (Either ProjectReleaseShortHand ProjectBranchShortHand) + parseBranch = fmap Right . eitherToMaybe $ IDs.fromText @ProjectBranchShortHand branchRef + parseRelease :: Maybe (Either ProjectReleaseShortHand ProjectBranchShortHand) + parseRelease = fmap Left . eitherToMaybe $ IDs.fromText @ProjectReleaseShortHand branchRef + +downloadEntitiesStreamImpl :: Maybe UserId -> SyncV2.DownloadEntitiesRequest -> WebApp (SourceIO SyncV2.DownloadEntitiesChunk) +downloadEntitiesStreamImpl mayCallerUserId (SyncV2.DownloadEntitiesRequest {causalHash = causalHashJWT, branchRef, knownHashes = _todo}) = do + either emitErr id <$> runExceptT do + addRequestTag "branch-ref" (SyncV2.unBranchRef branchRef) + HashJWTClaims {hash = causalHash} <- lift (HashJWT.verifyHashJWT mayCallerUserId causalHashJWT >>= either respondError pure) + codebase <- + case parseBranchRef branchRef of + Left err -> throwError (SyncV2.DownloadEntitiesInvalidBranchRef err branchRef) + Right (Left (ProjectReleaseShortHand {userHandle, projectSlug})) -> do + let projectShortHand = ProjectShortHand {userHandle, projectSlug} + (Project {ownerUserId = projectOwnerUserId}, contributorId) <- ExceptT . PG.tryRunTransaction $ do + project <- PGQ.projectByShortHand projectShortHand `whenNothingM` throwError (SyncV2.DownloadEntitiesProjectNotFound $ IDs.toText @ProjectShortHand projectShortHand) + pure (project, Nothing) + authZToken <- lift AuthZ.checkDownloadFromProjectBranchCodebase `whenLeftM` \_err -> throwError (SyncV2.DownloadEntitiesNoReadPermission branchRef) + let codebaseLoc = Codebase.codebaseLocationForProjectBranchCodebase projectOwnerUserId contributorId + pure $ Codebase.codebaseEnv authZToken codebaseLoc + Right (Right (ProjectBranchShortHand {userHandle, projectSlug, contributorHandle})) -> do + let projectShortHand = ProjectShortHand {userHandle, projectSlug} + (Project {ownerUserId = projectOwnerUserId}, contributorId) <- ExceptT . PG.tryRunTransaction $ do + project <- (PGQ.projectByShortHand projectShortHand) `whenNothingM` throwError (SyncV2.DownloadEntitiesProjectNotFound $ IDs.toText @ProjectShortHand projectShortHand) + mayContributorUserId <- for contributorHandle \ch -> fmap user_id $ (PGQ.userByHandle ch) `whenNothingM` throwError (SyncV2.DownloadEntitiesUserNotFound $ IDs.toText @UserHandle ch) + pure (project, mayContributorUserId) + authZToken <- lift AuthZ.checkDownloadFromProjectBranchCodebase `whenLeftM` \_err -> throwError (SyncV2.DownloadEntitiesNoReadPermission branchRef) + let codebaseLoc = Codebase.codebaseLocationForProjectBranchCodebase projectOwnerUserId contributorId + pure $ Codebase.codebaseEnv authZToken codebaseLoc + q <- UnliftIO.atomically $ do + q <- STM.newTBMQueue 10 + STM.writeTBMQueue q (NEL.singleton $ InitialC $ streamSettings causalHash (Just branchRef)) + pure q + streamResults <- lift $ UnliftIO.toIO do + Logging.logInfoText "Starting download entities stream" + Codebase.runCodebaseTransaction codebase $ do + Debug.debugM Debug.Temp "Getting IDs for:" causalHash + (_bhId, causalId) <- CausalQ.expectCausalIdsOf id (hash32ToCausalHash causalHash) + Debug.debugM Debug.Temp "Getting deps of" causalId + cursor <- SSQ.allSerializedDependenciesOfCausalCursor causalId + Debug.debugLogM Debug.Temp "Got cursor" + Cursor.foldBatched cursor batchSize \batch -> do + Debug.debugLogM Debug.Temp "Emitting batch" + let entityChunkBatch = batch <&> \(entityCBOR, hash) -> EntityC (EntityChunk {hash, entityCBOR}) + PG.transactionUnsafeIO $ STM.atomically $ STM.writeTBMQueue q entityChunkBatch + PG.transactionUnsafeIO $ STM.atomically $ STM.closeTBMQueue q + pure $ sourceIOWithAsync streamResults $ conduitToSourceIO do + stream q + where + stream :: STM.TBMQueue (NonEmpty DownloadEntitiesChunk) -> C.ConduitT () DownloadEntitiesChunk IO () + stream q = do + let loop :: C.ConduitT () DownloadEntitiesChunk IO () + loop = do + Debug.debugLogM Debug.Temp "Waiting for batch..." + liftIO (STM.atomically (STM.readTBMQueue q)) >>= \case + -- The queue is closed. + Nothing -> do + Debug.debugLogM Debug.Temp "Queue closed. finishing up!" + pure () + Just batch -> do + Debug.debugLogM Debug.Temp $ "Emitting chunk of " <> show (length batch) <> " entities" + C.yieldMany batch + loop + + loop + Debug.debugLogM Debug.Temp "Done!" + + emitErr :: SyncV2.DownloadEntitiesError -> SourceIO SyncV2.DownloadEntitiesChunk + emitErr err = SourceT.source [ErrorC (ErrorChunk err)] + +-- | Run an IO action in the background while streaming the results. +-- +-- Servant doesn't provide any easier way to do bracketing like this, all the IO must be +-- inside the SourceIO somehow. +sourceIOWithAsync :: IO a -> SourceIO r -> SourceIO r +sourceIOWithAsync action (SourceT k) = + SourceT \k' -> + Async.withAsync action \_ -> k k' + +-- debug the output pipe. +_tap :: (Monad m) => (C.ConduitT a DownloadEntitiesChunk m ()) -> (C.ConduitT a DownloadEntitiesChunk m ()) +_tap s = + s + C..| ( C.iterM \case + InitialC init -> Debug.debugM Debug.Temp "Initial " init + EntityC ec -> Debug.debugM Debug.Temp "Chunk " ec + ErrorC err -> Debug.debugM Debug.Temp "Error " err + ) diff --git a/src/Share/Web/UCM/SyncV2/Queries.hs b/src/Share/Web/UCM/SyncV2/Queries.hs new file mode 100644 index 0000000..60ae871 --- /dev/null +++ b/src/Share/Web/UCM/SyncV2/Queries.hs @@ -0,0 +1,348 @@ +module Share.Web.UCM.SyncV2.Queries + ( allHashDependenciesOfCausalCursor, + allSerializedDependenciesOfCausalCursor, + ) +where + +import Control.Monad.Reader +import Share.Codebase (CodebaseM, codebaseOwner) +import Share.Postgres +import Share.Postgres.Cursors (PGCursor) +import Share.Postgres.Cursors qualified as PGCursor +import Share.Postgres.IDs +import Share.Prelude +import U.Codebase.Sqlite.TempEntity (TempEntity) +import Unison.Hash32 (Hash32) +import Unison.SyncV2.Types (CBORBytes) + +allHashDependenciesOfCausalCursor :: CausalId -> CodebaseM e (PGCursor Text) +allHashDependenciesOfCausalCursor cid = do + ownerUserId <- asks codebaseOwner + PGCursor.newColCursor + "causal_dependencies" + [sql| + WITH RECURSIVE transitive_causals(causal_id, causal_namespace_hash_id) AS ( + SELECT causal.id, causal.namespace_hash_id + FROM causals causal + WHERE causal.id = #{cid} + -- AND NOT EXISTS (SELECT FROM causal_ownership co WHERE co.user_id = to_codebase_user_id AND co.causal_id = causal.id) + AND EXISTS (SELECT FROM causal_ownership co WHERE co.user_id = #{ownerUserId} AND co.causal_id = causal.id) + UNION + -- This nested CTE is required because RECURSIVE CTEs can't refer + -- to the recursive table more than once. + ( WITH rec AS ( + SELECT causal_id, causal_namespace_hash_id + FROM transitive_causals tc + ) + SELECT ancestor_causal.id, ancestor_causal.namespace_hash_id + FROM causal_ancestors ca + JOIN rec tc ON ca.causal_id = tc.causal_id + JOIN causals ancestor_causal ON ca.ancestor_id = ancestor_causal.id + -- WHERE NOT EXISTS (SELECT FROM causal_ownership co WHERE co.user_id = to_codebase_user_id AND co.causal_id = ancestor_causal.id) + UNION + SELECT child_causal.id, child_causal.namespace_hash_id + FROM rec tc + JOIN namespace_children nc ON tc.causal_namespace_hash_id = nc.parent_namespace_hash_id + JOIN causals child_causal ON nc.child_causal_id = child_causal.id + -- WHERE NOT EXISTS (SELECT FROM causal_ownership co WHERE co.user_id = to_codebase_user_id AND co.causal_id = child_causal.id) + ) + ), all_namespaces(namespace_hash_id) AS ( + SELECT DISTINCT causal_namespace_hash_id AS namespace_hash_id + FROM transitive_causals + -- WHERE NOT EXISTS (SELECT FROM namespace_ownership no WHERE no.user_id = to_codebase_user_id AND no.namespace_hash_id = causal_namespace_hash_id) + ), all_patches(patch_id) AS ( + SELECT DISTINCT patch.id + FROM all_namespaces an + JOIN namespace_patches np ON an.namespace_hash_id = np.namespace_hash_id + JOIN patches patch ON np.patch_id = patch.id + -- WHERE NOT EXISTS (SELECT FROM patch_ownership po WHERE po.user_id = to_codebase_user_id AND po.patch_id = patch.id) + ), + -- term components to start transitively joining dependencies to + base_term_components(component_hash_id) AS ( + SELECT DISTINCT term.component_hash_id + FROM all_namespaces an + JOIN namespace_terms nt ON an.namespace_hash_id = nt.namespace_hash_id + JOIN terms term ON nt.term_id = term.id + -- WHERE NOT EXISTS (SELECT FROM sandboxed_terms st WHERE st.user_id = to_codebase_user_id AND st.term_id = term.id) + UNION + SELECT DISTINCT term.component_hash_id + FROM all_patches ap + JOIN patch_term_mappings ptm ON ap.patch_id = ptm.patch_id + JOIN terms term ON ptm.to_term_id = term.id + -- WHERE NOT EXISTS (SELECT FROM sandboxed_terms st WHERE st.user_id = to_codebase_user_id AND st.term_id = term.id) + UNION + -- term metadata + SELECT DISTINCT term.component_hash_id + FROM all_namespaces an + JOIN namespace_terms nt ON an.namespace_hash_id = nt.namespace_hash_id + JOIN namespace_term_metadata meta ON nt.id = meta.named_term + JOIN terms term ON meta.metadata_term_id = term.id + -- WHERE NOT EXISTS (SELECT FROM sandboxed_terms st WHERE st.user_id = to_codebase_user_id AND st.term_id = term.id) + UNION + -- type metadata + SELECT DISTINCT term.component_hash_id + FROM all_namespaces an + JOIN namespace_types nt ON an.namespace_hash_id = nt.namespace_hash_id + JOIN namespace_type_metadata meta ON nt.id = meta.named_type + JOIN terms term ON meta.metadata_term_id = term.id + -- WHERE NOT EXISTS (SELECT FROM sandboxed_terms st WHERE st.user_id = to_codebase_user_id AND st.term_id = term.id) + ), + -- type components to start transitively joining dependencies to + base_type_components(component_hash_id) AS ( + SELECT DISTINCT typ.component_hash_id + FROM all_namespaces an + JOIN namespace_types nt ON an.namespace_hash_id = nt.namespace_hash_id + JOIN types typ ON nt.type_id = typ.id + -- WHERE NOT EXISTS (SELECT FROM sandboxed_types st WHERE st.user_id = to_codebase_user_id AND st.type_id = typ.id) + UNION + SELECT DISTINCT typ.component_hash_id + FROM all_namespaces an + JOIN namespace_terms nt ON an.namespace_hash_id = nt.namespace_hash_id + JOIN constructors con ON nt.constructor_id = con.id + JOIN types typ ON con.type_id = typ.id + -- WHERE NOT EXISTS (SELECT FROM sandboxed_types st WHERE st.user_id = to_codebase_user_id AND st.type_id = typ.id) + UNION + SELECT DISTINCT typ.component_hash_id + FROM all_patches ap + JOIN patch_type_mappings ptm ON ap.patch_id = ptm.patch_id + JOIN types typ ON ptm.to_type_id = typ.id + -- WHERE NOT EXISTS (SELECT FROM sandboxed_types st WHERE st.user_id = to_codebase_user_id AND st.type_id = typ.id) + UNION + SELECT DISTINCT typ.component_hash_id + FROM all_patches ap + JOIN patch_constructor_mappings pcm ON ap.patch_id = pcm.patch_id + JOIN constructors con ON pcm.to_constructor_id = con.id + JOIN types typ ON con.type_id = typ.id + -- WHERE NOT EXISTS (SELECT FROM sandboxed_types st WHERE st.user_id = to_codebase_user_id AND st.type_id = typ.id) + ), + -- All the dependencies we join in transitively from the known term & type components we depend on. + -- Unfortunately it's not possible to know which hashes are terms vs types :'( + transitive_components(component_hash_id) AS ( + SELECT DISTINCT btc.component_hash_id + FROM base_term_components btc + UNION + SELECT DISTINCT btc.component_hash_id + FROM base_type_components btc + UNION + ( WITH rec AS ( + SELECT component_hash_id + FROM transitive_components tc + ) + -- recursively union in term dependencies + SELECT DISTINCT ref.component_hash_id + FROM rec atc + -- This joins in ALL the terms from the component, not just the one that caused the dependency on the + -- component + JOIN terms term ON atc.component_hash_id = term.component_hash_id + JOIN term_local_component_references ref ON term.id = ref.term_id + UNION + -- recursively union in type dependencies + SELECT DISTINCT ref.component_hash_id + FROM rec atc + -- This joins in ALL the types from the component, not just the one that caused the dependency on the + -- component + JOIN types typ ON atc.component_hash_id = typ.component_hash_id + JOIN type_local_component_references ref ON typ.id = ref.type_id + ) + ), copied_causals(causal_id) AS ( + SELECT DISTINCT tc.causal_id + FROM transitive_causals tc + ), copied_namespaces(namespace_hash_id) AS ( + SELECT DISTINCT an.namespace_hash_id + FROM all_namespaces an + ), copied_patches(patch_id) AS ( + SELECT DISTINCT ap.patch_id + FROM all_patches ap + ), copied_term_components AS ( + SELECT DISTINCT term.id, copy.bytes_id + FROM transitive_components tc + JOIN terms term ON tc.component_hash_id = term.component_hash_id + JOIN sandboxed_terms copy ON term.id = copy.term_id + WHERE copy.user_id = #{ownerUserId} + ), copied_type_components AS ( + SELECT DISTINCT typ.id, copy.bytes_id + FROM transitive_components tc + JOIN types typ ON tc.component_hash_id = typ.component_hash_id + JOIN sandboxed_types copy ON typ.id = copy.type_id + WHERE copy.user_id = #{ownerUserId} + ) SELECT causal.hash + FROM copied_causals cc + JOIN causals causal ON cc.causal_id = causal.id + UNION ALL + SELECT branch_hashes.base32 + FROM copied_namespaces cn + JOIN branch_hashes ON cn.namespace_hash_id = branch_hashes.id + UNION ALL + SELECT patch.hash + FROM copied_patches cp + JOIN patches patch ON cp.patch_id = patch.id + UNION ALL + SELECT component_hashes.base32 + FROM transitive_components tc + JOIN component_hashes ON tc.component_hash_id = component_hashes.id + |] + +allSerializedDependenciesOfCausalCursor :: CausalId -> CodebaseM e (PGCursor (CBORBytes TempEntity, Hash32)) +allSerializedDependenciesOfCausalCursor cid = do + ownerUserId <- asks codebaseOwner + PGCursor.newRowCursor + "causal_dependencies" + [sql| + WITH RECURSIVE transitive_causals(causal_id, causal_hash, causal_namespace_hash_id) AS ( + SELECT causal.id, causal.hash, causal.namespace_hash_id + FROM causals causal + WHERE causal.id = #{cid} + -- AND NOT EXISTS (SELECT FROM causal_ownership co WHERE co.user_id = to_codebase_user_id AND co.causal_id = causal.id) + AND EXISTS (SELECT FROM causal_ownership co WHERE co.user_id = #{ownerUserId} AND co.causal_id = causal.id) + UNION + -- This nested CTE is required because RECURSIVE CTEs can't refer + -- to the recursive table more than once. + ( WITH rec AS ( + SELECT causal_id, causal_namespace_hash_id + FROM transitive_causals tc + ) + SELECT ancestor_causal.id, ancestor_causal.hash, ancestor_causal.namespace_hash_id + FROM causal_ancestors ca + JOIN rec tc ON ca.causal_id = tc.causal_id + JOIN causals ancestor_causal ON ca.ancestor_id = ancestor_causal.id + -- WHERE NOT EXISTS (SELECT FROM causal_ownership co WHERE co.user_id = to_codebase_user_id AND co.causal_id = ancestor_causal.id) + UNION + SELECT child_causal.id, child_causal.hash, child_causal.namespace_hash_id + FROM rec tc + JOIN namespace_children nc ON tc.causal_namespace_hash_id = nc.parent_namespace_hash_id + JOIN causals child_causal ON nc.child_causal_id = child_causal.id + -- WHERE NOT EXISTS (SELECT FROM causal_ownership co WHERE co.user_id = to_codebase_user_id AND co.causal_id = child_causal.id) + ) + ), all_namespaces(namespace_hash_id, namespace_hash) AS ( + SELECT DISTINCT tc.causal_namespace_hash_id AS namespace_hash_id, bh.base32 as namespace_hash + FROM transitive_causals tc + JOIN branch_hashes bh ON tc.causal_namespace_hash_id = bh.id + -- WHERE NOT EXISTS (SELECT FROM namespace_ownership no WHERE no.user_id = to_codebase_user_id AND no.namespace_hash_id = tc.causal_namespace_hash_id) + ), all_patches(patch_id, patch_hash) AS ( + SELECT DISTINCT patch.id, patch.hash + FROM all_namespaces an + JOIN namespace_patches np ON an.namespace_hash_id = np.namespace_hash_id + JOIN patches patch ON np.patch_id = patch.id + -- WHERE NOT EXISTS (SELECT FROM patch_ownership po WHERE po.user_id = to_codebase_user_id AND po.patch_id = patch.id) + ), + -- term components to start transitively joining dependencies to + base_term_components(component_hash_id) AS ( + SELECT DISTINCT term.component_hash_id + FROM all_namespaces an + JOIN namespace_terms nt ON an.namespace_hash_id = nt.namespace_hash_id + JOIN terms term ON nt.term_id = term.id + -- WHERE NOT EXISTS (SELECT FROM sandboxed_terms st WHERE st.user_id = to_codebase_user_id AND st.term_id = term.id) + UNION + SELECT DISTINCT term.component_hash_id + FROM all_patches ap + JOIN patch_term_mappings ptm ON ap.patch_id = ptm.patch_id + JOIN terms term ON ptm.to_term_id = term.id + -- WHERE NOT EXISTS (SELECT FROM sandboxed_terms st WHERE st.user_id = to_codebase_user_id AND st.term_id = term.id) + UNION + -- term metadata + SELECT DISTINCT term.component_hash_id + FROM all_namespaces an + JOIN namespace_terms nt ON an.namespace_hash_id = nt.namespace_hash_id + JOIN namespace_term_metadata meta ON nt.id = meta.named_term + JOIN terms term ON meta.metadata_term_id = term.id + -- WHERE NOT EXISTS (SELECT FROM sandboxed_terms st WHERE st.user_id = to_codebase_user_id AND st.term_id = term.id) + UNION + -- type metadata + SELECT DISTINCT term.component_hash_id + FROM all_namespaces an + JOIN namespace_types nt ON an.namespace_hash_id = nt.namespace_hash_id + JOIN namespace_type_metadata meta ON nt.id = meta.named_type + JOIN terms term ON meta.metadata_term_id = term.id + -- WHERE NOT EXISTS (SELECT FROM sandboxed_terms st WHERE st.user_id = to_codebase_user_id AND st.term_id = term.id) + ), + -- type components to start transitively joining dependencies to + base_type_components(component_hash_id) AS ( + SELECT DISTINCT typ.component_hash_id + FROM all_namespaces an + JOIN namespace_types nt ON an.namespace_hash_id = nt.namespace_hash_id + JOIN types typ ON nt.type_id = typ.id + -- WHERE NOT EXISTS (SELECT FROM sandboxed_types st WHERE st.user_id = to_codebase_user_id AND st.type_id = typ.id) + UNION + SELECT DISTINCT typ.component_hash_id + FROM all_namespaces an + JOIN namespace_terms nt ON an.namespace_hash_id = nt.namespace_hash_id + JOIN constructors con ON nt.constructor_id = con.id + JOIN types typ ON con.type_id = typ.id + -- WHERE NOT EXISTS (SELECT FROM sandboxed_types st WHERE st.user_id = to_codebase_user_id AND st.type_id = typ.id) + UNION + SELECT DISTINCT typ.component_hash_id + FROM all_patches ap + JOIN patch_type_mappings ptm ON ap.patch_id = ptm.patch_id + JOIN types typ ON ptm.to_type_id = typ.id + -- WHERE NOT EXISTS (SELECT FROM sandboxed_types st WHERE st.user_id = to_codebase_user_id AND st.type_id = typ.id) + UNION + SELECT DISTINCT typ.component_hash_id + FROM all_patches ap + JOIN patch_constructor_mappings pcm ON ap.patch_id = pcm.patch_id + JOIN constructors con ON pcm.to_constructor_id = con.id + JOIN types typ ON con.type_id = typ.id + -- WHERE NOT EXISTS (SELECT FROM sandboxed_types st WHERE st.user_id = to_codebase_user_id AND st.type_id = typ.id) + ), + -- All the dependencies we join in transitively from the known term & type components we depend on. + -- Unfortunately it's not possible to know which hashes are terms vs types :'( + transitive_components(component_hash_id) AS ( + SELECT DISTINCT btc.component_hash_id + FROM base_term_components btc + UNION + SELECT DISTINCT btc.component_hash_id + FROM base_type_components btc + UNION + ( WITH rec AS ( + SELECT component_hash_id + FROM transitive_components tc + ) + -- recursively union in term dependencies + SELECT DISTINCT ref.component_hash_id + FROM rec atc + -- This joins in ALL the terms from the component, not just the one that caused the dependency on the + -- component + JOIN terms term ON atc.component_hash_id = term.component_hash_id + JOIN term_local_component_references ref ON term.id = ref.term_id + UNION + -- recursively union in type dependencies + SELECT DISTINCT ref.component_hash_id + FROM rec atc + -- This joins in ALL the types from the component, not just the one that caused the dependency on the + -- component + JOIN types typ ON atc.component_hash_id = typ.component_hash_id + JOIN type_local_component_references ref ON typ.id = ref.type_id + ) + ) + (SELECT bytes.bytes, ch.base32 + FROM transitive_components tc + JOIN serialized_components sc ON tc.component_hash_id = sc.component_hash_id + JOIN bytes ON sc.bytes_id = bytes.id + JOIN component_hashes ch ON tc.component_hash_id = ch.id + -- Reverse the ordering so deeper components come first + ORDER BY row_number() OVER () DESC + ) + UNION ALL + (SELECT bytes.bytes, ap.patch_hash + FROM all_patches ap + JOIN serialized_patches sp ON ap.patch_id = sp.patch_id + JOIN bytes ON sp.bytes_id = bytes.id + -- Reverse the ordering so deeper components come first + ORDER BY row_number() OVER () DESC + ) + UNION ALL + (SELECT bytes.bytes, an.namespace_hash + FROM all_namespaces an + JOIN serialized_namespaces sn ON an.namespace_hash_id = sn.namespace_hash_id + JOIN bytes ON sn.bytes_id = bytes.id + -- Reverse the ordering so deeper components come first + ORDER BY row_number() OVER () DESC + ) + UNION ALL + (SELECT bytes.bytes, tc.causal_hash + FROM transitive_causals tc + JOIN serialized_causals sc ON tc.causal_id = sc.causal_id + JOIN bytes ON sc.bytes_id = bytes.id + -- Reverse the ordering so deeper components come first + ORDER BY row_number() OVER () DESC + ) + |] diff --git a/unison b/unison index 40eac6a..439360b 160000 --- a/unison +++ b/unison @@ -1 +1 @@ -Subproject commit 40eac6a121a5edb8e083523b769aa656df0fa54e +Subproject commit 439360b37d568afb521c6a497599b5d9c672b773