Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initialize compute #270

Merged
merged 73 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
9a88634
sketch command and handler.
mariacarmina Feb 19, 2024
22ec659
change file.
mariacarmina Feb 19, 2024
44cfd46
continued building initializeCompute.
mariacarmina Feb 19, 2024
5333802
finished sketching initialize compute function.
mariacarmina Feb 19, 2024
5acfc03
Merge branch 'develop' into initialize-compute
mariacarmina Feb 19, 2024
6ce2466
created test for initialize compute.
mariacarmina Feb 19, 2024
0835025
improvements.
mariacarmina Feb 19, 2024
cde960c
fix conflicts
mariacarmina Feb 26, 2024
65d469c
fix verifying order.
mariacarmina Feb 26, 2024
2dfe511
fix review
mariacarmina Feb 26, 2024
42e098e
fix review 2.
mariacarmina Feb 26, 2024
4029b4c
remove comment
mariacarmina Feb 26, 2024
2cb2b56
console log response
mariacarmina Feb 26, 2024
7e8bdad
added the suite back.
mariacarmina Feb 26, 2024
a0f177a
run compute tests.
mariacarmina Feb 26, 2024
8741ed6
added debug logs.
mariacarmina Feb 26, 2024
3f20ff1
debug
mariacarmina Feb 26, 2024
0b14453
more debug
mariacarmina Feb 26, 2024
baaac3e
fix test.
mariacarmina Feb 26, 2024
2b96899
tweak
mariacarmina Feb 26, 2024
3d9c221
fix test.
mariacarmina Feb 26, 2024
4fe4ef2
console log
mariacarmina Feb 26, 2024
4391ed3
stringify
mariacarmina Feb 26, 2024
35d62e4
debugging.
mariacarmina Feb 26, 2024
3ab0d04
more debug logs
mariacarmina Feb 26, 2024
4e547b1
add logs.
mariacarmina Feb 26, 2024
c55d7b5
improve logs.
mariacarmina Feb 26, 2024
a65dd2f
Merge branch 'develop' into initialize-compute
mariacarmina Feb 27, 2024
8f780e4
comment continue loop.
mariacarmina Feb 27, 2024
dafecb0
added more logs
mariacarmina Feb 27, 2024
de0bbd9
add regex
mariacarmina Feb 27, 2024
ee5af87
updated regex
mariacarmina Feb 27, 2024
f458993
tweaks.
mariacarmina Feb 27, 2024
d70c989
remove logs.
mariacarmina Feb 27, 2024
f1b9cdd
update key value
mariacarmina Feb 27, 2024
5599762
stringify big int
mariacarmina Feb 27, 2024
467221c
add log
mariacarmina Feb 27, 2024
1716dd7
remove indent
mariacarmina Feb 27, 2024
7571c31
print chunks
mariacarmina Feb 27, 2024
738ad34
print end and err
mariacarmina Feb 27, 2024
f8e139e
updated logs
mariacarmina Feb 27, 2024
1dbc24a
improve tests and remove logs.
mariacarmina Feb 27, 2024
4754234
fix.
mariacarmina Feb 27, 2024
32edda5
expose http interface
mariacarmina Feb 27, 2024
e1d2e66
added validation for payload.
mariacarmina Feb 27, 2024
b1341cf
create a separate function.
mariacarmina Feb 27, 2024
3d0f615
updated message.
mariacarmina Feb 27, 2024
81aea8e
added node
mariacarmina Feb 27, 2024
ef8ee51
modify the request to match backwards compatibility
mariacarmina Feb 28, 2024
74ff2f0
add console.logs
mariacarmina Feb 28, 2024
5f977c1
added logs
mariacarmina Feb 28, 2024
e16d89e
fix key
mariacarmina Feb 28, 2024
ed0bb3f
try and catch
mariacarmina Feb 28, 2024
f46dbd2
fix key.
mariacarmina Feb 28, 2024
687e30c
print algo provider fees
mariacarmina Feb 28, 2024
daf2eba
changed util function
mariacarmina Feb 28, 2024
4fde5fb
log
mariacarmina Feb 28, 2024
5745383
fix assert
mariacarmina Feb 28, 2024
2612fb3
add validation
mariacarmina Feb 28, 2024
90f39e2
convert to string
mariacarmina Feb 28, 2024
ce5711c
fix test
mariacarmina Feb 28, 2024
9df773b
fix
mariacarmina Feb 28, 2024
3464b8b
fix
mariacarmina Feb 28, 2024
e3039a9
logs
mariacarmina Feb 28, 2024
bef3794
try
mariacarmina Feb 28, 2024
8426a25
json parsing.
mariacarmina Feb 28, 2024
2c5e979
remove logs
mariacarmina Feb 28, 2024
ec8ca96
added suite back.
mariacarmina Feb 28, 2024
e7dd1d8
fix conflicts
mariacarmina Feb 29, 2024
ed4984d
fix lint
mariacarmina Feb 29, 2024
fd8e8cc
fix review
mariacarmina Feb 29, 2024
9342ab2
add db back
mariacarmina Feb 29, 2024
69a2268
fix conflicts
mariacarmina Feb 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions src/@types/C2D.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import type { MetadataAlgorithm } from './DDO/Metadata.js'
import type { Command } from './commands.js'
export interface Compute {
env: string // with hash
validUntil: number
}

export interface ComputeAsset {
url?: string
documentId: string
serviceId: string
transferTxId?: string
userdata?: { [key: string]: any }
}

export interface ComputeAlgorithm {
documentId?: string
serviceId?: string
url?: string
meta?: MetadataAlgorithm
transferTxId?: string
algocustomdata?: { [key: string]: any }
userdata?: { [key: string]: any }
}

export interface InitializeComputeCommand extends Command {
datasets: [ComputeAsset]
algorithm: ComputeAlgorithm
compute: Compute
consumerAddress: string
chainId: number
}
2 changes: 1 addition & 1 deletion src/@types/DDO/Metadata.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export interface MetadataAlgorithm {
* Object describing the Docker container image.
* @type {Object}
*/
container: {
container?: {
/**
* The command to execute, or script to run inside the Docker image.
* @type {string}
Expand Down
88 changes: 87 additions & 1 deletion src/components/core/compute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,18 @@ import { P2PCommandResponse } from '../../@types'
import { CORE_LOGGER } from '../../utils/logging/common.js'
import { Handler } from './handler.js'
import { GetEnvironmentsCommand } from '../../@types/commands.js'
import { InitializeComputeCommand } from '../../@types/C2D'
import { getConfiguration } from '../../utils/config.js'
import { PROTOCOL_COMMANDS } from '../../utils/constants.js'
import { streamToString } from '../../utils/util.js'
import axios from 'axios'
import { validateProviderFeesForDatasets } from './utils/initializeCompute.js'

export class GetEnvironmentsHandler extends Handler {
async handle(task: GetEnvironmentsCommand): Promise<P2PCommandResponse> {
try {
CORE_LOGGER.logMessage(
'File Info Request recieved with arguments: ' + JSON.stringify(task, null, 2),
'Get C2D Envs Request recieved with arguments: ' + JSON.stringify(task, null, 2),
true
)
const response: any[] = []
Expand Down Expand Up @@ -53,3 +57,85 @@ export class GetEnvironmentsHandler extends Handler {
}
}
}

export class InitializeComputeHandler extends Handler {
validateTimestamp(value: number) {
// in miliseconds
const timestampNow = new Date().getTime() / 1000
const validUntil = new Date(value).getTime() / 1000

return validUntil > timestampNow
}

checksC2DEnv(computeEnv: string, c2dEnvsWithHash: any[]): boolean {
for (const c of c2dEnvsWithHash) {
if (c.id === computeEnv) {
return true
}
}
return false
}

async handle(task: InitializeComputeCommand): Promise<P2PCommandResponse> {
try {
CORE_LOGGER.logMessage(
'Initialize Compute Request recieved with arguments: ' +
JSON.stringify(task, null, 2),
true
)

const { validUntil } = task.compute
if (!this.validateTimestamp(validUntil)) {
const errorMsg = `Error validating validUntil ${validUntil}. It is not in the future.`
CORE_LOGGER.error(errorMsg)
return {
stream: null,
status: {
httpStatus: 400,
error: errorMsg
}
}
}

const c2dEnvTask: GetEnvironmentsCommand = {
chainId: task.chainId,
command: PROTOCOL_COMMANDS.GET_COMPUTE_ENVIRONMENTS
}

const req = await new GetEnvironmentsHandler(this.getOceanNode()).handle(c2dEnvTask)

const resp = await streamToString(req.stream as Readable)
const c2dEnvs = JSON.parse(resp)

if (!this.checksC2DEnv(task.compute.env, c2dEnvs)) {
const errorMsg = `Compute env was not found.`
CORE_LOGGER.error(errorMsg)
return {
stream: null,
status: {
httpStatus: 400,
error: errorMsg
}
}
}
return await validateProviderFeesForDatasets(
this.getOceanNode(),
task.datasets,
task.algorithm,
task.chainId,
task.compute.env,
task.compute.validUntil,
task.consumerAddress
)
} catch (error) {
CORE_LOGGER.error(error.message)
return {
stream: null,
status: {
httpStatus: 500,
error: error.message
}
}
}
}
}
6 changes: 5 additions & 1 deletion src/components/core/coreHandlersRegistry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import { StatusHandler } from './statusHandler.js'
import { ReindexHandler } from './reindexHandler.js'
import { OceanNode } from '../../OceanNode.js'
import { Command } from '../../@types/commands.js'
import { GetEnvironmentsHandler } from './compute.js'
import { GetEnvironmentsHandler, InitializeComputeHandler } from './compute.js'

export type HandlerRegistry = {
handlerName: string // name of the handler
Expand Down Expand Up @@ -74,6 +74,10 @@ export class CoreHandlersRegistry {
PROTOCOL_COMMANDS.GET_COMPUTE_ENVIRONMENTS,
new GetEnvironmentsHandler(node)
)
this.registerCoreHandler(
PROTOCOL_COMMANDS.INITIALIZE_COMPUTE,
new InitializeComputeHandler(node)
)
}

public static getInstance(node: OceanNode): CoreHandlersRegistry {
Expand Down
77 changes: 34 additions & 43 deletions src/components/core/utils/feesHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ import { verifyMessage } from '../../../utils/blockchain.js'
import { getConfiguration } from '../../../utils/config.js'
import { CORE_LOGGER } from '../../../utils/logging/common.js'
import { LOG_LEVELS_STR } from '../../../utils/logging/Logger.js'
import { findEventByKey } from '../../Indexer/utils.js'
import axios from 'axios'
import { getOceanArtifactsAdresses } from '../../../utils/address.js'
import ERC20Template from '@oceanprotocol/contracts/artifacts/contracts/templates/ERC20TemplateEnterprise.sol/ERC20TemplateEnterprise.json' assert { type: 'json' }
import { C2DClusterInfo } from '../../../@types'
import { verifyComputeProviderFees } from '../validateTransaction.js'

export async function getC2DEnvs(asset: DDO): Promise<Array<any>> {
try {
Expand Down Expand Up @@ -159,56 +159,47 @@ export async function calculateComputeProviderFee(
return providerFee
}

async function getEventData(
provider: JsonRpcApiProvider,
transactionHash: string,
abi: any
): Promise<ethers.LogDescription> {
const iface = new ethers.Interface(abi)
const receipt = await provider.getTransactionReceipt(transactionHash)
const eventObj = {
topics: receipt.logs[0].topics as string[],
data: receipt.logs[0].data
}
return iface.parseLog(eventObj)
}
// It will be used in #230 for initializeCompute
export async function validateComputeProviderFee(
provider: JsonRpcApiProvider,
tx: string,
computeEnv: string, // with hash
asset: DDO,
service: Service,
validUntil: number
validUntil: number,
userAddress: string
): Promise<[boolean, ProviderFeeData | {}]> {
const txReceipt = await provider.getTransactionReceipt(tx)
const { logs } = txReceipt
const timestampNow = new Date().getTime() / 1000
for (const log of logs) {
const event = findEventByKey(log.topics[0])
if (event && event.type === 'ProviderFee') {
const decodedEventData = await getEventData(provider, tx, ERC20Template.abi)
const validUntilContract = parseInt(decodedEventData.args[7].toString())
if (timestampNow >= validUntilContract) {
// provider fee expired -> reuse order
CORE_LOGGER.log(
LOG_LEVELS_STR.LEVEL_INFO,
`Provider fees for this env have expired -> reuse order.`,
true
)
const envId = computeEnv.split('-')[1]
const newProviderFee = await calculateComputeProviderFee(
asset,
validUntil,
envId,
service,
provider
)
return [false, newProviderFee]
} else {
return [true, {}]
}
try {
const timestampNow = new Date().getTime() / 1000
const validationResult = await verifyComputeProviderFees(
tx,
userAddress,
provider,
timestampNow,
service.timeout
)

if (validationResult.isValid === false) {
// provider fee expired or tx id is not provided -> reuse order
CORE_LOGGER.log(
LOG_LEVELS_STR.LEVEL_INFO,
`${validationResult.message} -> create new provider fees.`,
true
)
const regex = /[^-]*-(ocean-[^-]*)/
const envId = computeEnv.match(regex)[1]
const newProviderFee = await calculateComputeProviderFee(
asset,
validUntil,
envId,
service,
provider
)
return [false, newProviderFee]
} else {
return [true, validationResult.message]
}
} catch (err) {
CORE_LOGGER.logMessage(`Validation for compute provider fees failed due to: ${err}`)
mariacarmina marked this conversation as resolved.
Show resolved Hide resolved
}
}
// equiv to get_provider_fees
Expand Down
92 changes: 92 additions & 0 deletions src/components/core/utils/initializeCompute.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import { P2PCommandResponse } from '../../../@types'
import { ComputeAlgorithm, ComputeAsset } from '../../../@types/C2D.js'
import { CORE_LOGGER } from '../../../utils/logging/common.js'
import { DDO } from '../../../@types/DDO/DDO.js'
import { getJsonRpcProvider } from '../../../utils/blockchain.js'
import { validateComputeProviderFee } from './feesHandler.js'
import { Readable } from 'stream'
import { OceanNode } from '../../../OceanNode'
import { Service } from '../../../@types/DDO/Service.js'

export function getServiceById(ddo: DDO, serviceId: string): Service {
try {
return ddo.services.filter((service) => service.id === serviceId)[0]
} catch (err) {
CORE_LOGGER.error(`Service was not found: ${err}`)
}
}

export async function validateProviderFeesForDatasets(
node: OceanNode,
datasets: [ComputeAsset],
algorithm: ComputeAlgorithm,
chainId: number,
env: string,
validUntil: number,
consumerAddress: string
): Promise<P2PCommandResponse> {
const listOfAssest = [...datasets, ...[algorithm]]
const approvedParams: any = {
algorithm: {},
datasets: []
}
const provider = await getJsonRpcProvider(chainId)

for (const asset of listOfAssest) {
try {
const ddo = (await node.getDatabase().ddo.retrieve(asset.documentId)) as DDO
if (ddo.id === algorithm.documentId) {
if (ddo.metadata.type !== 'algorithm') {
const errorMsg = `DID is not a valid algorithm`
CORE_LOGGER.error(errorMsg)
return {
stream: null,
status: {
httpStatus: 400,
error: errorMsg
}
}
}
}
const service = getServiceById(ddo, asset.serviceId)

const resultValidation = await validateComputeProviderFee(
provider,
asset.transferTxId,
env,
ddo,
service,
validUntil,
consumerAddress
)
if (ddo.metadata.type === 'algorithm') {
approvedParams.algorithm = {
datatoken: service.datatokenAddress,
providerFee: resultValidation[1],
validOrder: resultValidation[0]
}
} else {
approvedParams.datasets.push({
datatoken: service.datatokenAddress,
providerFee: resultValidation[1],
validOrder: resultValidation[0]
})
}
} catch (error) {
CORE_LOGGER.error(`Unable to get compute provider fees: ${error}`)
}
}

const result = JSON.stringify(approvedParams, (key, value) => {
if (typeof value === 'bigint') {
return value.toString()
}
return value
})
return {
stream: Readable.from(result),
status: {
httpStatus: 200
}
}
}
Loading
Loading