From b7d99481f84824925056e0020cb66fb851147b1d Mon Sep 17 00:00:00 2001 From: "weisu.yxd" Date: Fri, 24 Nov 2023 17:57:28 +0800 Subject: [PATCH] add ppnet --- easy_rec/python/layers/keras/__init__.py | 1 + easy_rec/python/layers/keras/mask_net.py | 3 + easy_rec/python/layers/keras/multi_task.py | 16 +- easy_rec/python/layers/keras/ppnet.py | 185 +++++++++++++++++++++ easy_rec/python/protos/keras_layer.proto | 1 + easy_rec/python/protos/layer.proto | 19 ++- easy_rec/python/tools/build_fg_model.py | 32 ++++ 7 files changed, 251 insertions(+), 6 deletions(-) create mode 100644 easy_rec/python/layers/keras/ppnet.py create mode 100644 easy_rec/python/tools/build_fg_model.py diff --git a/easy_rec/python/layers/keras/__init__.py b/easy_rec/python/layers/keras/__init__.py index 3f22f511b..cbe36b5ca 100644 --- a/easy_rec/python/layers/keras/__init__.py +++ b/easy_rec/python/layers/keras/__init__.py @@ -16,3 +16,4 @@ from .multi_task import MMoE from .numerical_embedding import AutoDisEmbedding from .numerical_embedding import PeriodicEmbedding +from .ppnet import PPNet diff --git a/easy_rec/python/layers/keras/mask_net.py b/easy_rec/python/layers/keras/mask_net.py index 507a0020d..49318df3b 100644 --- a/easy_rec/python/layers/keras/mask_net.py +++ b/easy_rec/python/layers/keras/mask_net.py @@ -82,6 +82,9 @@ def call(self, inputs, **kwargs): mask, net.shape[-1], name='%s/mask' % self.name, reuse=self.reuse) masked_net = net * mask + if not self.config.HasField('output_size'): + return masked_net + output_size = self.config.output_size hidden = tf.layers.dense( masked_net, diff --git a/easy_rec/python/layers/keras/multi_task.py b/easy_rec/python/layers/keras/multi_task.py index 35607834f..de6120da1 100644 --- a/easy_rec/python/layers/keras/multi_task.py +++ b/easy_rec/python/layers/keras/multi_task.py @@ -25,15 +25,20 @@ class MMoE(tf.keras.layers.Layer): def __init__(self, params, name='MMoE', reuse=None, **kwargs): super(MMoE, self).__init__(name, **kwargs) - params.check_required(['num_expert', 'num_task', 'expert_mlp']) + params.check_required(['num_expert', 'num_task']) self._reuse = reuse self._num_expert = params.num_expert self._num_task = params.num_task - expert_params = params.expert_mlp - self._experts = [ + if params.has_field('expert_mlp'): + expert_params = params.expert_mlp + self._has_experts = True + self._experts = [ MLP(expert_params, 'expert_%d' % i, reuse=reuse) for i in range(self._num_expert) - ] + ] + else: + self._has_experts = False + self._experts = [lambda x: x[i] for i in range(self._num_expert)] self._l2_reg = params.l2_regularizer def __call__(self, inputs, **kwargs): @@ -44,10 +49,11 @@ def __call__(self, inputs, **kwargs): expert_fea_list = [expert(inputs) for expert in self._experts] experts_fea = tf.stack(expert_fea_list, axis=1) + gate_input = inputs if self._has_experts else inputs[self._num_expert] task_input_list = [] for task_id in range(self._num_task): gate = gate_fn( - inputs, + gate_input, self._num_expert, name='gate_%d' % task_id, l2_reg=self._l2_reg, diff --git a/easy_rec/python/layers/keras/ppnet.py b/easy_rec/python/layers/keras/ppnet.py new file mode 100644 index 000000000..4da62380d --- /dev/null +++ b/easy_rec/python/layers/keras/ppnet.py @@ -0,0 +1,185 @@ +# -*- encoding:utf-8 -*- +# Copyright (c) Alibaba, Inc. and its affiliates. +"""Convenience blocks for building models.""" +import logging + +import tensorflow as tf + +from easy_rec.python.layers.keras.activation import activation_layer +from easy_rec.python.utils.tf_utils import add_elements_to_collection + +if tf.__version__ >= '2.0': + tf = tf.compat.v1 + + +class GateNN(tf.keras.layers.Layer): + def __init__(self, params, output_units=None, name='gate_nn', reuse=None, **kwargs): + super(GateNN, self).__init__(name=name, **kwargs) + output_dim = output_units if output_units is not None else params.output_dim + hidden_dim = params.get_or_default('hidden_dim', output_dim) + initializer = params.get_or_default('initializer', 'he_uniform') + do_batch_norm = params.get_or_default('use_bn', False) + activation = params.get_or_default('activation', 'relu') + dropout_rate = params.get_or_default('dropout_rate', 0.0) + + self._sub_layers = [] + dense = tf.keras.layers.Dense( + units=hidden_dim, + use_bias=not do_batch_norm, + kernel_initializer=initializer, + name=name) + self._sub_layers.append(dense) + + if do_batch_norm: + bn = tf.keras.layers.BatchNormalization( + name='%s/bn' % name, trainable=True) + self._sub_layers.append(bn) + + act_layer = activation_layer(activation) + self._sub_layers.append(act_layer) + + if 0.0 < dropout_rate < 1.0: + dropout = tf.keras.layers.Dropout(dropout_rate, name='%s/dropout' % name) + self._sub_layers.append(dropout) + elif dropout_rate >= 1.0: + raise ValueError('invalid dropout_ratio: %.3f' % dropout_rate) + + dense = tf.keras.layers.Dense( + units=output_dim, + activation='sigmoid', + use_bias=not do_batch_norm, + kernel_initializer=initializer, + name=name) + self._sub_layers.append(dense) + self._sub_layers.append(lambda x: x * 2) + + def call(self, x, training=None, **kwargs): + """Performs the forward computation of the block.""" + for layer in self._sub_layers: + cls = layer.__class__.__name__ + if cls in ('Dropout', 'BatchNormalization', 'Dice'): + x = layer(x, training=training) + if cls in ('BatchNormalization', 'Dice'): + add_elements_to_collection(layer.updates, tf.GraphKeys.UPDATE_OPS) + else: + x = layer(x) + return x + + +class PPNet(tf.keras.layers.Layer): + """PEPNet: Parameter and Embedding Personalized Network for Infusing with Personalized Prior Information. + + Attributes: + units: Sequential list of layer sizes. + use_bias: Whether to include a bias term. + activation: Type of activation to use on all except the last layer. + final_activation: Type of activation to use on last layer. + **kwargs: Extra args passed to the Keras Layer base class. + """ + + def __init__(self, params, name='ppnet', reuse=None, **kwargs): + super(PPNet, self).__init__(name=name, **kwargs) + params.check_required('mlp') + mode = params.get_or_default('mode', 'lazy') + gate_params = params.gate_params + params = params.mlp + params.check_required('hidden_units') + use_bn = params.get_or_default('use_bn', True) + use_final_bn = params.get_or_default('use_final_bn', True) + use_bias = params.get_or_default('use_bias', False) + use_final_bias = params.get_or_default('use_final_bias', False) + dropout_rate = list(params.get_or_default('dropout_ratio', [])) + activation = params.get_or_default('activation', 'relu') + initializer = params.get_or_default('initializer', 'he_uniform') + final_activation = params.get_or_default('final_activation', None) + use_bn_after_act = params.get_or_default('use_bn_after_activation', False) + units = list(params.hidden_units) + logging.info( + 'MLP(%s) units: %s, dropout: %r, activate=%s, use_bn=%r, final_bn=%r,' + ' final_activate=%s, bias=%r, initializer=%s, bn_after_activation=%r' % + (name, units, dropout_rate, activation, use_bn, use_final_bn, + final_activation, use_bias, initializer, use_bn_after_act)) + assert len(units) > 0, 'MLP(%s) takes at least one hidden units' % name + self.reuse = reuse + + num_dropout = len(dropout_rate) + self._sub_layers = [] + + if mode != 'lazy': + self._sub_layers.append(GateNN(gate_params, None, 'gate_0')) + for i, num_units in enumerate(units[:-1]): + name = 'layer_%d' % i + drop_rate = dropout_rate[i] if i < num_dropout else 0.0 + self.add_rich_layer(num_units, use_bn, drop_rate, activation, initializer, + use_bias, use_bn_after_act, name, + params.l2_regularizer) + self._sub_layers.append(GateNN(gate_params, num_units, 'gate_%d' % (i + 1))) + + n = len(units) - 1 + drop_rate = dropout_rate[n] if num_dropout > n else 0.0 + name = 'layer_%d' % n + self.add_rich_layer(units[-1], use_final_bn, drop_rate, final_activation, + initializer, use_final_bias, use_bn_after_act, name, + params.l2_regularizer) + if mode == 'lazy': + self._sub_layers.append(GateNN(gate_params, units[-1], 'gate_%d' % (n + 1))) + + def add_rich_layer(self, + num_units, + use_bn, + dropout_rate, + activation, + initializer, + use_bias, + use_bn_after_activation, + name, + l2_reg=None): + act_layer = activation_layer(activation) + if use_bn and not use_bn_after_activation: + dense = tf.keras.layers.Dense( + units=num_units, + use_bias=use_bias, + kernel_initializer=initializer, + kernel_regularizer=l2_reg, + name=name) + self._sub_layers.append(dense) + bn = tf.keras.layers.BatchNormalization( + name='%s/bn' % name, trainable=True) + self._sub_layers.append(bn) + self._sub_layers.append(act_layer) + else: + dense = tf.keras.layers.Dense( + num_units, + use_bias=use_bias, + kernel_initializer=initializer, + kernel_regularizer=l2_reg, + name=name) + self._sub_layers.append(dense) + self._sub_layers.append(act_layer) + if use_bn and use_bn_after_activation: + bn = tf.keras.layers.BatchNormalization(name='%s/bn' % name) + self._sub_layers.append(bn) + + if 0.0 < dropout_rate < 1.0: + dropout = tf.keras.layers.Dropout(dropout_rate, name='%s/dropout' % name) + self._sub_layers.append(dropout) + elif dropout_rate >= 1.0: + raise ValueError('invalid dropout_ratio: %.3f' % dropout_rate) + + def call(self, inputs, training=None, **kwargs): + """Performs the forward computation of the block.""" + x, gate_input = inputs + gate_input = tf.concat([tf.stop_gradient(x), gate_input], axis=-1) + for layer in self._sub_layers: + cls = layer.__class__.__name__ + if cls == 'GateNN': + gate = layer(gate_input) + x *= gate + elif cls in ('Dropout', 'BatchNormalization', 'Dice'): + x = layer(x, training=training) + if cls in ('BatchNormalization', 'Dice'): + add_elements_to_collection(layer.updates, tf.GraphKeys.UPDATE_OPS) + else: + x = layer(x) + return x + diff --git a/easy_rec/python/protos/keras_layer.proto b/easy_rec/python/protos/keras_layer.proto index 5f09f4515..4d2cf9213 100644 --- a/easy_rec/python/protos/keras_layer.proto +++ b/easy_rec/python/protos/keras_layer.proto @@ -24,5 +24,6 @@ message KerasLayer { BSTEncoder bst = 13; MMoELayer mmoe = 14; SequenceAugment seq_aug = 15; + PPNet ppnet = 16; } } diff --git a/easy_rec/python/protos/layer.proto b/easy_rec/python/protos/layer.proto index 52a1cbf30..e278b9f5c 100644 --- a/easy_rec/python/protos/layer.proto +++ b/easy_rec/python/protos/layer.proto @@ -49,7 +49,7 @@ message FiBiNet { message MaskBlock { optional float reduction_factor = 1; - required uint32 output_size = 2; + optional uint32 output_size = 2; optional uint32 aggregation_size = 3; optional bool input_layer_norm = 4 [default = true]; optional uint32 projection_dim = 5; @@ -69,3 +69,20 @@ message MMoELayer { // number of mmoe experts optional uint32 num_expert = 3; } + +message GateNN { + optional uint32 output_dim = 1; + optional uint32 hidden_dim = 2; + // activation function + optional string activation = 3 [default = 'relu']; + // use batch normalization + optional bool use_bn = 4 [default = false]; + optional float dropout_rate = 5; +} + +message PPNet { + required MLP mlp = 1; + required GateNN gate_params = 2; + // run mode: eager, lazy + required string mode = 3 [default = 'eager']; +} \ No newline at end of file diff --git a/easy_rec/python/tools/build_fg_model.py b/easy_rec/python/tools/build_fg_model.py new file mode 100644 index 000000000..825465eb0 --- /dev/null +++ b/easy_rec/python/tools/build_fg_model.py @@ -0,0 +1,32 @@ +import logging +import os +import json +import tensorflow as tf + + +curr_dir, _ = os.path.split(__file__) +parent_dir = os.path.dirname(curr_dir) +ops_idr = os.path.dirname(parent_dir) +ops_dir = os.path.join(ops_idr, 'ops') +if 'PAI' in tf.__version__: + ops_dir = os.path.join(ops_dir, '1.12_pai') +elif tf.__version__.startswith('1.12'): + ops_dir = os.path.join(ops_dir, '1.12') +elif tf.__version__.startswith('1.15'): + if 'IS_ON_PAI' in os.environ: + ops_dir = os.path.join(ops_dir, 'DeepRec') + else: + ops_dir = os.path.join(ops_dir, '1.15') +else: + ops_dir = None + + +def load_fg_config(fg_json): + with open(fg_json, 'r') as f: + fg = json.load(f) + features = fg['features'] + print(features[0]) + + +if __name__ == '__main__': + load_fg_config("fg.json")