Skip to content

Commit

Permalink
add ppnet
Browse files Browse the repository at this point in the history
  • Loading branch information
yangxudong committed Nov 24, 2023
1 parent 62ddbc1 commit b7d9948
Show file tree
Hide file tree
Showing 7 changed files with 251 additions and 6 deletions.
1 change: 1 addition & 0 deletions easy_rec/python/layers/keras/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@
from .multi_task import MMoE
from .numerical_embedding import AutoDisEmbedding
from .numerical_embedding import PeriodicEmbedding
from .ppnet import PPNet
3 changes: 3 additions & 0 deletions easy_rec/python/layers/keras/mask_net.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ def call(self, inputs, **kwargs):
mask, net.shape[-1], name='%s/mask' % self.name, reuse=self.reuse)
masked_net = net * mask

if not self.config.HasField('output_size'):
return masked_net

output_size = self.config.output_size
hidden = tf.layers.dense(
masked_net,
Expand Down
16 changes: 11 additions & 5 deletions easy_rec/python/layers/keras/multi_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,20 @@ class MMoE(tf.keras.layers.Layer):

def __init__(self, params, name='MMoE', reuse=None, **kwargs):
super(MMoE, self).__init__(name, **kwargs)
params.check_required(['num_expert', 'num_task', 'expert_mlp'])
params.check_required(['num_expert', 'num_task'])
self._reuse = reuse
self._num_expert = params.num_expert
self._num_task = params.num_task
expert_params = params.expert_mlp
self._experts = [
if params.has_field('expert_mlp'):
expert_params = params.expert_mlp
self._has_experts = True
self._experts = [
MLP(expert_params, 'expert_%d' % i, reuse=reuse)
for i in range(self._num_expert)
]
]
else:
self._has_experts = False
self._experts = [lambda x: x[i] for i in range(self._num_expert)]
self._l2_reg = params.l2_regularizer

def __call__(self, inputs, **kwargs):
Expand All @@ -44,10 +49,11 @@ def __call__(self, inputs, **kwargs):
expert_fea_list = [expert(inputs) for expert in self._experts]
experts_fea = tf.stack(expert_fea_list, axis=1)

gate_input = inputs if self._has_experts else inputs[self._num_expert]
task_input_list = []
for task_id in range(self._num_task):
gate = gate_fn(
inputs,
gate_input,
self._num_expert,
name='gate_%d' % task_id,
l2_reg=self._l2_reg,
Expand Down
185 changes: 185 additions & 0 deletions easy_rec/python/layers/keras/ppnet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
# -*- encoding:utf-8 -*-
# Copyright (c) Alibaba, Inc. and its affiliates.
"""Convenience blocks for building models."""
import logging

import tensorflow as tf

from easy_rec.python.layers.keras.activation import activation_layer
from easy_rec.python.utils.tf_utils import add_elements_to_collection

if tf.__version__ >= '2.0':
tf = tf.compat.v1


class GateNN(tf.keras.layers.Layer):
def __init__(self, params, output_units=None, name='gate_nn', reuse=None, **kwargs):
super(GateNN, self).__init__(name=name, **kwargs)
output_dim = output_units if output_units is not None else params.output_dim
hidden_dim = params.get_or_default('hidden_dim', output_dim)
initializer = params.get_or_default('initializer', 'he_uniform')
do_batch_norm = params.get_or_default('use_bn', False)
activation = params.get_or_default('activation', 'relu')
dropout_rate = params.get_or_default('dropout_rate', 0.0)

self._sub_layers = []
dense = tf.keras.layers.Dense(
units=hidden_dim,
use_bias=not do_batch_norm,
kernel_initializer=initializer,
name=name)
self._sub_layers.append(dense)

if do_batch_norm:
bn = tf.keras.layers.BatchNormalization(
name='%s/bn' % name, trainable=True)
self._sub_layers.append(bn)

act_layer = activation_layer(activation)
self._sub_layers.append(act_layer)

if 0.0 < dropout_rate < 1.0:
dropout = tf.keras.layers.Dropout(dropout_rate, name='%s/dropout' % name)
self._sub_layers.append(dropout)
elif dropout_rate >= 1.0:
raise ValueError('invalid dropout_ratio: %.3f' % dropout_rate)

dense = tf.keras.layers.Dense(
units=output_dim,
activation='sigmoid',
use_bias=not do_batch_norm,
kernel_initializer=initializer,
name=name)
self._sub_layers.append(dense)
self._sub_layers.append(lambda x: x * 2)

def call(self, x, training=None, **kwargs):
"""Performs the forward computation of the block."""
for layer in self._sub_layers:
cls = layer.__class__.__name__
if cls in ('Dropout', 'BatchNormalization', 'Dice'):
x = layer(x, training=training)
if cls in ('BatchNormalization', 'Dice'):
add_elements_to_collection(layer.updates, tf.GraphKeys.UPDATE_OPS)
else:
x = layer(x)
return x


class PPNet(tf.keras.layers.Layer):
"""PEPNet: Parameter and Embedding Personalized Network for Infusing with Personalized Prior Information.
Attributes:
units: Sequential list of layer sizes.
use_bias: Whether to include a bias term.
activation: Type of activation to use on all except the last layer.
final_activation: Type of activation to use on last layer.
**kwargs: Extra args passed to the Keras Layer base class.
"""

def __init__(self, params, name='ppnet', reuse=None, **kwargs):
super(PPNet, self).__init__(name=name, **kwargs)
params.check_required('mlp')
mode = params.get_or_default('mode', 'lazy')
gate_params = params.gate_params
params = params.mlp
params.check_required('hidden_units')
use_bn = params.get_or_default('use_bn', True)
use_final_bn = params.get_or_default('use_final_bn', True)
use_bias = params.get_or_default('use_bias', False)
use_final_bias = params.get_or_default('use_final_bias', False)
dropout_rate = list(params.get_or_default('dropout_ratio', []))
activation = params.get_or_default('activation', 'relu')
initializer = params.get_or_default('initializer', 'he_uniform')
final_activation = params.get_or_default('final_activation', None)
use_bn_after_act = params.get_or_default('use_bn_after_activation', False)
units = list(params.hidden_units)
logging.info(
'MLP(%s) units: %s, dropout: %r, activate=%s, use_bn=%r, final_bn=%r,'
' final_activate=%s, bias=%r, initializer=%s, bn_after_activation=%r' %
(name, units, dropout_rate, activation, use_bn, use_final_bn,
final_activation, use_bias, initializer, use_bn_after_act))
assert len(units) > 0, 'MLP(%s) takes at least one hidden units' % name
self.reuse = reuse

num_dropout = len(dropout_rate)
self._sub_layers = []

if mode != 'lazy':
self._sub_layers.append(GateNN(gate_params, None, 'gate_0'))
for i, num_units in enumerate(units[:-1]):
name = 'layer_%d' % i
drop_rate = dropout_rate[i] if i < num_dropout else 0.0
self.add_rich_layer(num_units, use_bn, drop_rate, activation, initializer,
use_bias, use_bn_after_act, name,
params.l2_regularizer)
self._sub_layers.append(GateNN(gate_params, num_units, 'gate_%d' % (i + 1)))

n = len(units) - 1
drop_rate = dropout_rate[n] if num_dropout > n else 0.0
name = 'layer_%d' % n
self.add_rich_layer(units[-1], use_final_bn, drop_rate, final_activation,
initializer, use_final_bias, use_bn_after_act, name,
params.l2_regularizer)
if mode == 'lazy':
self._sub_layers.append(GateNN(gate_params, units[-1], 'gate_%d' % (n + 1)))

def add_rich_layer(self,
num_units,
use_bn,
dropout_rate,
activation,
initializer,
use_bias,
use_bn_after_activation,
name,
l2_reg=None):
act_layer = activation_layer(activation)
if use_bn and not use_bn_after_activation:
dense = tf.keras.layers.Dense(
units=num_units,
use_bias=use_bias,
kernel_initializer=initializer,
kernel_regularizer=l2_reg,
name=name)
self._sub_layers.append(dense)
bn = tf.keras.layers.BatchNormalization(
name='%s/bn' % name, trainable=True)
self._sub_layers.append(bn)
self._sub_layers.append(act_layer)
else:
dense = tf.keras.layers.Dense(
num_units,
use_bias=use_bias,
kernel_initializer=initializer,
kernel_regularizer=l2_reg,
name=name)
self._sub_layers.append(dense)
self._sub_layers.append(act_layer)
if use_bn and use_bn_after_activation:
bn = tf.keras.layers.BatchNormalization(name='%s/bn' % name)
self._sub_layers.append(bn)

if 0.0 < dropout_rate < 1.0:
dropout = tf.keras.layers.Dropout(dropout_rate, name='%s/dropout' % name)
self._sub_layers.append(dropout)
elif dropout_rate >= 1.0:
raise ValueError('invalid dropout_ratio: %.3f' % dropout_rate)

def call(self, inputs, training=None, **kwargs):
"""Performs the forward computation of the block."""
x, gate_input = inputs
gate_input = tf.concat([tf.stop_gradient(x), gate_input], axis=-1)
for layer in self._sub_layers:
cls = layer.__class__.__name__
if cls == 'GateNN':
gate = layer(gate_input)
x *= gate
elif cls in ('Dropout', 'BatchNormalization', 'Dice'):
x = layer(x, training=training)
if cls in ('BatchNormalization', 'Dice'):
add_elements_to_collection(layer.updates, tf.GraphKeys.UPDATE_OPS)
else:
x = layer(x)
return x

1 change: 1 addition & 0 deletions easy_rec/python/protos/keras_layer.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ message KerasLayer {
BSTEncoder bst = 13;
MMoELayer mmoe = 14;
SequenceAugment seq_aug = 15;
PPNet ppnet = 16;
}
}
19 changes: 18 additions & 1 deletion easy_rec/python/protos/layer.proto
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ message FiBiNet {

message MaskBlock {
optional float reduction_factor = 1;
required uint32 output_size = 2;
optional uint32 output_size = 2;
optional uint32 aggregation_size = 3;
optional bool input_layer_norm = 4 [default = true];
optional uint32 projection_dim = 5;
Expand All @@ -69,3 +69,20 @@ message MMoELayer {
// number of mmoe experts
optional uint32 num_expert = 3;
}

message GateNN {
optional uint32 output_dim = 1;
optional uint32 hidden_dim = 2;
// activation function
optional string activation = 3 [default = 'relu'];
// use batch normalization
optional bool use_bn = 4 [default = false];
optional float dropout_rate = 5;
}

message PPNet {
required MLP mlp = 1;
required GateNN gate_params = 2;
// run mode: eager, lazy
required string mode = 3 [default = 'eager'];
}
32 changes: 32 additions & 0 deletions easy_rec/python/tools/build_fg_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import logging
import os
import json
import tensorflow as tf


curr_dir, _ = os.path.split(__file__)
parent_dir = os.path.dirname(curr_dir)
ops_idr = os.path.dirname(parent_dir)
ops_dir = os.path.join(ops_idr, 'ops')
if 'PAI' in tf.__version__:
ops_dir = os.path.join(ops_dir, '1.12_pai')
elif tf.__version__.startswith('1.12'):
ops_dir = os.path.join(ops_dir, '1.12')
elif tf.__version__.startswith('1.15'):
if 'IS_ON_PAI' in os.environ:
ops_dir = os.path.join(ops_dir, 'DeepRec')
else:
ops_dir = os.path.join(ops_dir, '1.15')
else:
ops_dir = None


def load_fg_config(fg_json):
with open(fg_json, 'r') as f:
fg = json.load(f)
features = fg['features']
print(features[0])


if __name__ == '__main__':
load_fg_config("fg.json")

0 comments on commit b7d9948

Please sign in to comment.