Skip to content

Commit

Permalink
Fix upload artifacts (hcengineering#7776)
Browse files Browse the repository at this point in the history
Signed-off-by: Andrey Sobolev <[email protected]>
  • Loading branch information
haiodo authored Jan 24, 2025
1 parent d6b7a38 commit df1e2f0
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 84 deletions.
15 changes: 13 additions & 2 deletions pods/backup/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,14 @@ import {
} from '@hcengineering/server-pipeline'
import { join } from 'path'

import { createMongoAdapter, createMongoDestroyAdapter, createMongoTxAdapter } from '@hcengineering/mongo'
import {
createPostgreeDestroyAdapter,
createPostgresAdapter,
createPostgresTxAdapter,
setDBExtraOptions
} from '@hcengineering/postgres'
import { readFileSync } from 'node:fs'
import { createMongoTxAdapter, createMongoAdapter, createMongoDestroyAdapter } from '@hcengineering/mongo'
import { createPostgresTxAdapter, createPostgresAdapter, createPostgreeDestroyAdapter } from '@hcengineering/postgres'
const model = JSON.parse(readFileSync(process.env.MODEL_JSON ?? 'model.json').toString()) as Tx[]

const metricsContext = initStatisticsContext('backup', {
Expand All @@ -51,6 +56,12 @@ const sentryDSN = process.env.SENTRY_DSN
configureAnalytics(sentryDSN, {})
Analytics.setTag('application', 'backup-service')

const usePrepare = process.env.DB_PREPARE === 'true'

setDBExtraOptions({
prepare: usePrepare // We override defaults
})

registerTxAdapterFactory('mongodb', createMongoTxAdapter)
registerAdapterFactory('mongodb', createMongoAdapter)
registerDestroyFactory('mongodb', createMongoDestroyAdapter)
Expand Down
13 changes: 12 additions & 1 deletion pods/fulltext/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,12 @@ import {
} from '@hcengineering/middleware'
import { createMongoAdapter, createMongoDestroyAdapter, createMongoTxAdapter } from '@hcengineering/mongo'
import { PlatformError, setMetadata, unknownError } from '@hcengineering/platform'
import { createPostgreeDestroyAdapter, createPostgresAdapter, createPostgresTxAdapter } from '@hcengineering/postgres'
import {
createPostgreeDestroyAdapter,
createPostgresAdapter,
createPostgresTxAdapter,
setDBExtraOptions
} from '@hcengineering/postgres'
import serverClientPlugin, { getTransactorEndpoint, getWorkspaceInfo } from '@hcengineering/server-client'
import serverCore, {
createContentAdapter,
Expand Down Expand Up @@ -215,6 +220,12 @@ export async function startIndexer (
): Promise<() => void> {
const closeTimeout = 5 * 60 * 1000

const usePrepare = process.env.DB_PREPARE === 'true'

setDBExtraOptions({
prepare: usePrepare // We override defaults
})

setMetadata(serverToken.metadata.Secret, opt.serverSecret)
setMetadata(serverCore.metadata.ElasticIndexName, opt.elasticIndexName)
setMetadata(serverClientPlugin.metadata.Endpoint, opt.accountsUrl)
Expand Down
7 changes: 7 additions & 0 deletions pods/server/src/__start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import { startHttpServer } from '@hcengineering/server-ws'
import { join } from 'path'
import { start } from '.'
import { profileStart, profileStop } from './inspector'
import { setDBExtraOptions } from '@hcengineering/postgres'

configureAnalytics(process.env.SENTRY_DSN, {})
Analytics.setTag('application', 'transactor')
Expand Down Expand Up @@ -58,6 +59,12 @@ setOperationLogProfiling(process.env.OPERATION_PROFILING === 'true')
const config = serverConfigFromEnv()
const storageConfig: StorageConfiguration = storageConfigFromEnv()

const usePrepare = process.env.DB_PREPARE === 'true'

setDBExtraOptions({
prepare: usePrepare // We override defaults
})

const lastNameFirst = process.env.LAST_NAME_FIRST === 'true'
setMetadata(contactPlugin.metadata.LastNameFirst, lastNameFirst)
setMetadata(serverCore.metadata.FrontUrl, config.frontUrl)
Expand Down
121 changes: 57 additions & 64 deletions server/postgres/src/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ async function * createCursorGenerator (
}
} catch (err: any) {
console.error('failed to recieve data', { err })
throw err // Rethrow the error after logging
}
}

Expand Down Expand Up @@ -156,7 +157,11 @@ class ConnectionInfo {
throw err
} finally {
if (this.released) {
reserved?.release()
try {
reserved?.release()
} catch (err: any) {
console.error('failed to release', err)
}
} else {
// after use we put into available
if (reserved !== undefined) {
Expand All @@ -168,15 +173,19 @@ class ConnectionInfo {
const toRelease = this.available.splice(1, this.available.length - 1)

for (const r of toRelease) {
r.release()
try {
r.release()
} catch (err: any) {
console.error('failed to relase', err)
}
}
}
}
}
}

release (): void {
for (const c of this.available) {
for (const c of [...this.available]) {
c.release()
}
this.available = []
Expand Down Expand Up @@ -302,7 +311,11 @@ class ConnectionMgr {
([, it]: [string, ConnectionInfo]) => it.mgrId === this.mgrId
)) {
connections.delete(k)
conn.release()
try {
conn.release()
} catch (err: any) {
console.error('failed to release connection')
}
}
}

Expand Down Expand Up @@ -1336,7 +1349,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
case '$options':
break
case '$all':
res.push(`${tkey} @> ARRAY[${value}]`)
res.push(`${tkey} @> ${vars.addArray(value, inferType(value))}`)
break
default:
res.push(`${tkey} @> '[${JSON.stringify(value)}]'`)
Expand Down Expand Up @@ -1542,64 +1555,39 @@ abstract class PostgresAdapterBase implements DbAdapter {
return ctx.with('upload', { domain }, async (ctx) => {
const schemaFields = getSchemaAndFields(domain)
const filedsWithData = [...schemaFields.fields, 'data']
const insertFields: string[] = []
const onConflict: string[] = []
for (const field of filedsWithData) {
insertFields.push(`"${field}"`)
if (handleConflicts) {
onConflict.push(`"${field}" = EXCLUDED."${field}"`)
}
}

const insertFields = filedsWithData.map((field) => `"${field}"`)
const onConflict = handleConflicts ? filedsWithData.map((field) => `"${field}" = EXCLUDED."${field}"`) : []

const insertStr = insertFields.join(', ')
const onConflictStr = onConflict.join(', ')

try {
const toUpload = [...docs]
const tdomain = translateDomain(domain)
while (toUpload.length > 0) {
const part = toUpload.splice(0, 200)
const batchSize = 200
for (let i = 0; i < docs.length; i += batchSize) {
const part = docs.slice(i, i + batchSize)
const values = new ValuesVariables()
const vars: string[] = []
const wsId = values.add(this.workspaceId.name, '::uuid')
for (let i = 0; i < part.length; i++) {
const doc = part[i]
const variables: string[] = []

for (const doc of part) {
if (!('%hash%' in doc) || doc['%hash%'] === '' || doc['%hash%'] == null) {
;(doc as any)['%hash%'] = this.curHash() // We need to set current hash
}
const d = convertDoc(domain, doc, this.workspaceId.name, schemaFields)
variables.push(wsId)
for (const field of schemaFields.fields) {
variables.push(values.add(d[field], `::${schemaFields.schema[field].type}`))
}
variables.push(values.add(d.data, '::json'))
const variables = [
wsId,
...schemaFields.fields.map((field) => values.add(d[field], `::${schemaFields.schema[field].type}`)),
values.add(d.data, '::json')
]
vars.push(`(${variables.join(', ')})`)
}

const vals = vars.join(',')
if (handleConflicts) {
await this.mgr.retry(
ctx.id,
async (client) =>
await client.unsafe(
`INSERT INTO ${tdomain} ("workspaceId", ${insertStr}) VALUES ${vals}
ON CONFLICT ("workspaceId", _id) DO UPDATE SET ${onConflictStr};`,
values.getValues(),
getPrepare()
)
)
} else {
await this.mgr.retry(
ctx.id,
async (client) =>
await client.unsafe(
`INSERT INTO ${tdomain} ("workspaceId", ${insertStr}) VALUES ${vals};`,
values.getValues(),
getPrepare()
)
)
}
const query = `INSERT INTO ${tdomain} ("workspaceId", ${insertStr}) VALUES ${vals} ${
handleConflicts ? `ON CONFLICT ("workspaceId", _id) DO UPDATE SET ${onConflictStr}` : ''
};`
await this.mgr.retry(ctx.id, async (client) => await client.unsafe(query, values.getValues(), getPrepare()))
}
} catch (err: any) {
ctx.error('failed to upload', { err })
Expand All @@ -1610,17 +1598,14 @@ abstract class PostgresAdapterBase implements DbAdapter {

async clean (ctx: MeasureContext, domain: Domain, docs: Ref<Doc>[]): Promise<void> {
const tdomain = translateDomain(domain)
const toClean = [...docs]
while (toClean.length > 0) {
const part = toClean.splice(0, 2500)
const batchSize = 2500
const query = `DELETE FROM ${tdomain} WHERE "workspaceId" = $1 AND _id = ANY($2::text[])`

for (let i = 0; i < docs.length; i += batchSize) {
const part = docs.slice(i, i + batchSize)
await ctx.with('clean', {}, () => {
return this.mgr.retry(ctx.id, (client) =>
client.unsafe(
`DELETE FROM ${tdomain} WHERE "workspaceId" = $1 AND _id = ANY($2::text[])`,
[this.workspaceId.name, part],
getPrepare()
)
)
const params = [this.workspaceId.name, part]
return this.mgr.retry(ctx.id, (client) => client.unsafe(query, params, getPrepare()))
})
}
}
Expand All @@ -1635,10 +1620,16 @@ abstract class PostgresAdapterBase implements DbAdapter {
return ctx.with('groupBy', { domain }, async (ctx) => {
try {
const vars = new ValuesVariables()
const finalSql = `SELECT DISTINCT ${key} as ${field}, Count(*) AS count FROM ${translateDomain(domain)} WHERE ${this.buildRawQuery(vars, domain, query ?? {})} GROUP BY ${key}`
const sqlChunks: string[] = [
`SELECT ${key} as ${field}, Count(*) AS count`,
`FROM ${translateDomain(domain)}`,
`WHERE ${this.buildRawQuery(vars, domain, query ?? {})}`,
`GROUP BY ${key}`
]
const finalSql = sqlChunks.join(' ')
return await this.mgr.retry(ctx.id, async (connection) => {
const result = await connection.unsafe(finalSql, vars.getValues(), getPrepare())
return new Map(result.map((r) => [r[field.toLocaleLowerCase()], parseInt(r.count)]))
return new Map(result.map((r) => [r[field.toLowerCase()], r.count]))
})
} catch (err) {
ctx.error('Error while grouping by', { domain, field })
Expand Down Expand Up @@ -1920,10 +1911,10 @@ class PostgresAdapter extends PostgresAdapterBase {
const result: TxResult[] = []
try {
const schema = getSchema(domain)
const updates = groupByArray(operations, (it) => it.fields.join(','))
for (const upds of updates.values()) {
while (upds.length > 0) {
const part = upds.splice(0, 200)
const groupedUpdates = groupByArray(operations, (it) => it.fields.join(','))
for (const groupedOps of groupedUpdates.values()) {
for (let i = 0; i < groupedOps.length; i += 200) {
const part = groupedOps.slice(i, i + 200)
let idx = 1
const indexes: string[] = []
const data: any[] = []
Expand Down Expand Up @@ -2021,7 +2012,9 @@ class PostgresTxAdapter extends PostgresAdapterBase implements TxAdapter {
async getModel (ctx: MeasureContext): Promise<Tx[]> {
const res: DBDoc[] = await this.mgr.retry(undefined, (client) => {
return client.unsafe(
`SELECT * FROM "${translateDomain(DOMAIN_MODEL_TX)}" WHERE "workspaceId" = '${this.workspaceId.name}'::uuid ORDER BY _id::text ASC, "modifiedOn"::bigint ASC`
`SELECT * FROM "${translateDomain(DOMAIN_MODEL_TX)}" WHERE "workspaceId" = '${this.workspaceId.name}'::uuid ORDER BY _id::text ASC, "modifiedOn"::bigint ASC`,
undefined,
getPrepare()
)
})

Expand Down
5 changes: 5 additions & 0 deletions server/postgres/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,11 @@ export function getDBClient (connectionString: string, database?: string): Postg
},
database,
max: 10,
min: 2,
connect_timeout: 10,
idle_timeout: 30,
max_lifetime: 300,
fetch_types: true,
transform: {
undefined: null
},
Expand Down
30 changes: 17 additions & 13 deletions server/workspace-service/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,20 +122,24 @@ export function serveWorkspaceAccount (
brandings
)

void worker.start(
measureCtx,
{
errorHandler: async (ws, err) => {
Analytics.handleError(err)
void worker
.start(
measureCtx,
{
errorHandler: async (ws, err) => {
Analytics.handleError(err)
},
force: false,
console: false,
logs: 'upgrade-logs',
waitTimeout,
backup
},
force: false,
console: false,
logs: 'upgrade-logs',
waitTimeout,
backup
},
() => canceled
)
() => canceled
)
.catch((err) => {
measureCtx.error('failed to start', { err })
})

const close = (): void => {
canceled = true
Expand Down
18 changes: 14 additions & 4 deletions server/workspace-service/src/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ import { FileModelLogger, prepareTools } from '@hcengineering/server-tool'
import path from 'path'

import { Analytics } from '@hcengineering/analytics'
import { createMongoAdapter, createMongoDestroyAdapter, createMongoTxAdapter } from '@hcengineering/mongo'
import { createPostgreeDestroyAdapter, createPostgresAdapter, createPostgresTxAdapter } from '@hcengineering/postgres'
import { doBackupWorkspace, doRestoreWorkspace } from '@hcengineering/server-backup'
import type { PipelineFactory, StorageAdapter } from '@hcengineering/server-core'
import {
Expand All @@ -54,8 +56,6 @@ import {
} from '@hcengineering/server-pipeline'
import { buildStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server-storage'
import { createWorkspace, upgradeWorkspace } from './ws-operations'
import { createMongoTxAdapter, createMongoAdapter, createMongoDestroyAdapter } from '@hcengineering/mongo'
import { createPostgresTxAdapter, createPostgresAdapter, createPostgreeDestroyAdapter } from '@hcengineering/postgres'

export interface WorkspaceOptions {
errorHandler: (workspace: BaseWorkspaceInfo, error: any) => Promise<void>
Expand Down Expand Up @@ -115,7 +115,14 @@ export class WorkspaceWorker {

ctx.info('Sending a handshake to the account service...')

await withRetryConnUntilSuccess(workerHandshake)(token, this.region, this.version, this.operation)
while (true) {
try {
await withRetryConnUntilSuccess(workerHandshake)(token, this.region, this.version, this.operation)
break
} catch (err: any) {
ctx.error('error', { err })
}
}

ctx.info('Successfully connected to the account service')

Expand Down Expand Up @@ -150,7 +157,10 @@ export class WorkspaceWorker {
}),
workspace,
opt
)
).catch((err) => {
Analytics.handleError(err)
ctx.error('error', { err })
})
})
}
}
Expand Down

0 comments on commit df1e2f0

Please sign in to comment.