记第一次推荐开发实践遇到的问题

type
Post
status
Published
summary
自从入行以来,从来没有重视过内存管理这块的东西,但这次真的是该遭的罪一点没落下的全体验了一遍。试问苍天饶过谁。由于预测时的全量用户为 1.5 亿,数据量实在太大,而且服务器上没有 GPU 资源,所以只能通过开启多进程的方式来进行预测。在 python 中开启多进程无果后,只能想办法曲线救国,最终通过 shell 脚本将数据拆分然后循环调用 python 脚本,这个过程中可以将拆分后的数据的起始行数和数据量通过参数传递给python,最终以这样的方式成功开启了多进程。但随后遇到了另外一个问题:内存溢出导致的部分进程被 kill。
slug
machine-learning-recommender-problem-recode
date
May 24, 2024
tags
机器学习
推荐系统
双塔模型
问题记录
category
机器学习
password
icon
URL
Property
May 24, 2024 10:23 AM
本次推荐算法实践,只开发了召回阶段的代码,使用的算法是经典的双塔模型(DSSM)。众所周知,理论知识和实际开发完全是两个不同的东西。在粗略的学习了相关的理论基础知识之后,我就开始投入到开发中了,因此也遇到了很多第一次遇到并且很懵的问题。不过也正是经历过这些问题之后才能感觉到自己的进步。所以将一些问题记录下来,加深记忆和回顾。
本次开发使用的是开源的三方库: Torch-RecHub

算法实践问题

问题一:在处理数据时,需要将用户数据,物品数据与交互数据相连,得到一个最终的数据,但是由于大部分用户是没有交互数据的,所以连接之后大部分用户的交互相关字段和产品相关的字段会为空。这应该怎么处理?
答:数据源有三个,用户特征、产品特征、用户订购记录,连接的时候并不是将三份数据外链接。订购记录表中的数据可以理解为全是正样本(我们可以给它们新增一个label字段,值全为 1),因此可以直接在这部分数据上拼接上用户特征和产品特征;而负样本需要我们进行筛选和创造(具体的方法后续会介绍),得到负样本之后再拼接负样本的用户特征和产品特征(同样新增一个 label 字段,值全为 0)。 这样我们就得到了完整的训练数据。
 
问题二:交互数据中每条数据有用户 ID,产品 ID,是否订购;此时对负样本进行采样,是只针对正样本的中的用户 ID 随机选择负样本产品还是需要对其他非正样本用户选择负样本产品?
答:只针对正样本中的用户 ID 选择负样本产品。如果为正样本中没有出现的用户选择负样本,那么可能会引入一些噪声,因为可能会为那些实际上可能对某些产品感兴趣的用户分配负样本。
只针对正样本中的用户 ID 选择负样本产品的好处:
  • 数据一致性:负样本与正样本来自相同的用户,保持了用户特征的一致性。
  • 减少噪声:为已知用户选择负样本可以减少数据中的噪声,提高模型的预测准确性。
  • 效率:针对已有正样本的用户生成负样本通常比对所有用户进行操作更加高效,因为它减少了需要处理的数据量。
 
问题三:召回的测试样本中,需不需要负样本?
  • 如果需要评估召回质量则需要负样本:可以帮助你评估模型是否能有效地区分用户感兴趣和不感兴趣的项目。通过计算如精确率、召回率、F1分数等指标,可以更全面地了解召回模型的性能。
  • 如果只关注召回覆盖率则不一定需要负样本:如果主要目标是测试召回模型是否能覆盖到用户实际感兴趣的项目,那么负样本的存在不是必须的。通过看评分指标的公式即可得知,评估指标在计算时用不到负样本(负样本没什么用,并且只会增大评分指标的分母)(本次采用的方法
  • 实践建议:在实际应用中,包含负样本在测试集中可以提供更全面的评估,尤其是在召回和排序(精排)阶段结合使用的场景下。负样本可以帮助你评估模型是否只是随机召回项目,还是真的学到了用户的偏好。此外,包含负样本也有助于模拟真实场景,因为在实际的推荐系统中,用户会遇到他们不感兴趣的项目。
 
问题四:torch-rechub 训练之后如何预测的问题
答:训练过程中torch-rechub库的MatchTrainer.fit 方法会自动将模型保存到指定文件夹,在调用MatchTrainer.inference_embedding方法时,会自动加载模型,只需要将用户特征的数据处理好,喂给模型的MatchTrainer.inference_embedding方法,就会生成对应的用户 embedding,然后直接进行检索就好了。
 
问题五:数据变动引发的报错(某个类别特征的类别数量变动,编码超过嵌入维度,预测时配置特征维度与训练时不统一)
  • 问题背景:模型在输入数据时,会分为稀疏特征和稠密特征,稀疏特征会进行编码并嵌入成一个张量,张量的大小为:(特征词典大小:嵌入维度),特征词典大小的意思是这个特征有多少个值,嵌入维度是认为指定的参数,16/32 等。
  • 报错 1:在特征嵌入时会提示:IndexError: index out of range in self
    • 报错信息
      IndexError Traceback (most recent call last) File /Users/ayd/Nutstore Files/坚果云/工作/亚信/项目管理/广东/智能推荐/5-模型开发/dssm_recall_infer_V4.py:305 303 print('\n\n产品和用户开始嵌入成向量! >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>') 304 # 产品嵌入 --> 305 item_embedding = trainer.inference_embedding(model=model, mode="item", data_loader=x_prdct_dl, model_path=save_dir) 307 # # -------------单进程用户嵌入 308 # x_user_ = df_to_dict(user_features_df) 309 # x_user_dl_set_ = MatchDataGenerator(x=x_user_).dataset (...) 312 # print('=========单进程 >',user_embedding.shape) 313 # -------------多进程用户嵌入 314 start_time_ = time.time() File ~/miniconda3/envs/rec-deploy/lib/python3.8/site-packages/torch_rechub/trainers/match_trainer.py:169, in MatchTrainer.inference_embedding(self, model, mode, data_loader, model_path) 167 for i, x_dict in enumerate(tk0): 168 x_dict = {k: v.to(self.device) for k, v in x_dict.items()} --> 169 y_pred = model(x_dict) 170 predicts.append(y_pred.data) 171 return torch.cat(predicts, dim=0) File ~/miniconda3/envs/rec-deploy/lib/python3.8/site-packages/torch/nn/modules/module.py:1511, in Module._wrapped_call_impl(self, *args, **kwargs) 1509 return self._compiled_call_impl(*args, **kwargs) # type: ignore[misc] 1510 else: -> 1511 return self._call_impl(*args, **kwargs) File ~/miniconda3/envs/rec-deploy/lib/python3.8/site-packages/torch/nn/modules/module.py:1520, in Module._call_impl(self, *args, **kwargs) 1515 # If we don't have any hooks, we want to skip the rest of the logic in 1516 # this function, and just call forward. 1517 if not (self._backward_hooks or self._backward_pre_hooks or self._forward_hooks or self._forward_pre_hooks 1518 or _global_backward_pre_hooks or _global_backward_hooks 1519 or _global_forward_hooks or _global_forward_pre_hooks): -> 1520 return forward_call(*args, **kwargs) 1522 try: 1523 result = None File ~/miniconda3/envs/rec-deploy/lib/python3.8/site-packages/torch_rechub/models/matching/dssm.py:41, in DSSM.forward(self, x) 39 def forward(self, x): 40 user_embedding = self.user_tower(x) ---> 41 item_embedding = self.item_tower(x) 42 if self.mode == "user": 43 return user_embedding File ~/miniconda3/envs/rec-deploy/lib/python3.8/site-packages/torch_rechub/models/matching/dssm.py:63, in DSSM.item_tower(self, x) 61 if self.mode == "user": 62 return None ---> 63 input_item = self.embedding(x, self.item_features, squeeze_dim=True) #[batch_size, num_features*embed_dim] 64 item_embedding = self.item_mlp(input_item) #[batch_size, item_params["dims"][-1]] 65 item_embedding = F.normalize(item_embedding, p=2, dim=1) File ~/miniconda3/envs/rec-deploy/lib/python3.8/site-packages/torch/nn/modules/module.py:1511, in Module._wrapped_call_impl(self, *args, **kwargs) 1509 return self._compiled_call_impl(*args, **kwargs) # type: ignore[misc] 1510 else: -> 1511 return self._call_impl(*args, **kwargs) File ~/miniconda3/envs/rec-deploy/lib/python3.8/site-packages/torch/nn/modules/module.py:1520, in Module._call_impl(self, *args, **kwargs) 1515 # If we don't have any hooks, we want to skip the rest of the logic in 1516 # this function, and just call forward. 1517 if not (self._backward_hooks or self._backward_pre_hooks or self._forward_hooks or self._forward_pre_hooks 1518 or _global_backward_pre_hooks or _global_backward_hooks 1519 or _global_forward_hooks or _global_forward_pre_hooks): -> 1520 return forward_call(*args, **kwargs) 1522 try: 1523 result = None File ~/miniconda3/envs/rec-deploy/lib/python3.8/site-packages/torch_rechub/basic/layers.py:70, in EmbeddingLayer.forward(self, x, features, squeeze_dim) 68 if isinstance(fea, SparseFeature): 69 if fea.shared_with == None: ---> 70 sparse_emb.append(self.embed_dict[fea.name](x[fea.name].long()).unsqueeze(1)) 71 else: 72 sparse_emb.append(self.embed_dict[fea.shared_with](x[fea.name].long()).unsqueeze(1)) File ~/miniconda3/envs/rec-deploy/lib/python3.8/site-packages/torch/nn/modules/module.py:1511, in Module._wrapped_call_impl(self, *args, **kwargs) 1509 return self._compiled_call_impl(*args, **kwargs) # type: ignore[misc] 1510 else: -> 1511 return self._call_impl(*args, **kwargs) File ~/miniconda3/envs/rec-deploy/lib/python3.8/site-packages/torch/nn/modules/module.py:1520, in Module._call_impl(self, *args, **kwargs) 1515 # If we don't have any hooks, we want to skip the rest of the logic in 1516 # this function, and just call forward. 1517 if not (self._backward_hooks or self._backward_pre_hooks or self._forward_hooks or self._forward_pre_hooks 1518 or _global_backward_pre_hooks or _global_backward_hooks 1519 or _global_forward_hooks or _global_forward_pre_hooks): -> 1520 return forward_call(*args, **kwargs) 1522 try: 1523 result = None File ~/miniconda3/envs/rec-deploy/lib/python3.8/site-packages/torch/nn/modules/sparse.py:163, in Embedding.forward(self, input) 162 def forward(self, input: Tensor) -> Tensor: --> 163 return F.embedding( 164 input, self.weight, self.padding_idx, self.max_norm, 165 self.norm_type, self.scale_grad_by_freq, self.sparse) File ~/miniconda3/envs/rec-deploy/lib/python3.8/site-packages/torch/nn/functional.py:2237, in embedding(input, weight, padding_idx, max_norm, norm_type, scale_grad_by_freq, sparse) 2231 # Note [embedding_renorm set_grad_enabled] 2232 # XXX: equivalent to 2233 # with torch.no_grad(): 2234 # torch.embedding_renorm_ 2235 # remove once script supports set_grad_enabled 2236 _no_grad_embedding_renorm_(weight, input, max_norm, norm_type) -> 2237 return torch.embedding(weight, input, padding_idx, scale_grad_by_freq, sparse) IndexError: index out of range in self
       
    • 报错原因1:如果某一个特征 A 的值的个数比训练时的值要多,那么在嵌入这个特征的时候,多出来的编码号码拿去模型对应的嵌入层去检索就会导致检索下标越界
    • 报错原因2:如果某一个特征 A 的值的个数正常(没有超过词典大小),但是其中的某些数值比嵌入维度的最大值要大,也可能会导致这个报错。比如嵌入维度是(n*16),n 是词表大小,如果预测的时候对 A 进行编码时,编码后的部分数值大于 n(即使个数没有超过),则也会导致报此错误。这个时候可能是编码阶段出了问题。
  • 报错 2:在加载模型时会提示:size mismatch for embedding.embed_dict.u_user_star_level.weight: copying a param with shape torch.Size([6, 16]) from checkpoint, the shape in current model is torch.Size([7, 16]).
    • 报错信息
      RuntimeError Traceback (most recent call last) Cell In[23], line 2 1 print("inference embedding") ----> 2 item_embedding = trainer.inference_embedding(model=model, mode="item", data_loader=x_prdct_dl, model_path=save_dir) 3 user_embedding2 = trainer.inference_embedding(model=model, mode="user", data_loader=x_user_dl, model_path=save_dir) 4 # # 召回 File ~/miniconda3/envs/rec-deploy/lib/python3.8/site-packages/torch_rechub/trainers/match_trainer.py:161, in MatchTrainer.inference_embedding(self, model, mode, data_loader, model_path) 159 assert mode in ["user", "item"], "Invalid mode={}.".format(mode) 160 model.mode = mode --> 161 model.load_state_dict(torch.load(os.path.join(model_path, "model.pth"))) 162 model = model.to(self.device) 163 model.eval() File ~/miniconda3/envs/rec-deploy/lib/python3.8/site-packages/torch/nn/modules/module.py:2153, in Module.load_state_dict(self, state_dict, strict, assign) 2148 error_msgs.insert( 2149 0, 'Missing key(s) in state_dict: {}. '.format( 2150 ', '.join(f'"{k}"' for k in missing_keys))) 2152 if len(error_msgs) > 0: -> 2153 raise RuntimeError('Error(s) in loading state_dict for {}:\n\t{}'.format( 2154 self.__class__.__name__, "\n\t".join(error_msgs))) 2155 return _IncompatibleKeys(missing_keys, unexpected_keys) RuntimeError: Error(s) in loading state_dict for DSSM: size mismatch for embedding.embed_dict.u_user_star_level.weight: copying a param with shape torch.Size([6, 16]) from checkpoint, the shape in current model is torch.Size([7, 16]). size mismatch for embedding.embed_dict.user_id.weight: copying a param with shape torch.Size([42, 16]) from checkpoint, the shape in current model is torch.Size([198, 16]). size mismatch for embedding.embed_dict.u_main_plan_id.weight: copying a param with shape torch.Size([21, 16]) from checkpoint, the shape in current model is torch.Size([41, 16]). size mismatch for embedding.embed_dict.u_prodlist_in_use.weight: copying a param with shape torch.Size([21, 16]) from checkpoint, the shape in current model is torch.Size([41, 16]). size mismatch for embedding.embed_dict.p_product_name.weight: copying a param with shape torch.Size([50, 16]) from checkpoint, the shape in current model is torch.Size([55, 16]). size mismatch for embedding.embed_dict.product_id.weight: copying a param with shape torch.Size([50, 16]) from checkpoint, the shape in current model is torch.Size([55, 16]).
       
    • 报错原因:如果某一个特征 A 的值比训练时的值要少,那么在嵌入这个特征的时候不会报错,但是此时的特征词典大小就与之前训练的时候不一致了,所以嵌入后的特征张量在维度上就与之前训练好的不一致,就会导致尺寸匹配不上
  • 解决方法:这个问题本质上就是训练时特征值与预测数据的特征值有差异(预测时某些特征遇到了新的值或者新数据中少了一些值)最本质的方法就是(在全量数据上)人工处理稀疏特征编码将其保存,在预测阶段使用同样的特征编码。并且特征词典大小也保持一致。
    • 代码如下
      ############ 对类别特征进行编码处理,并保存 ########### feature_max_idx = {} # 记录每个字段的最大索引 feature_encoder = {} # 记录每个字段的编码 feature_decoder = {} # 记录每个字段的编码 sparse_features_original = user_multiClass_cols_original+prod_multiClass_cols_original+['u_is_unlimited_gprs','u_gprs_restrain','p_stackable'] # 後面這三個理論上是布爾值,但是被填充了字符串,歸一化處理不了,會導致後面報錯,所以當作多類別處理 sparse_features_original = [i for i in sparse_features_original if i in list(user_info.columns)+list(prdct_info.columns)] # 过滤一遍,避免错误 # sparse_features_original = [i for i in sparse_features_original if i not in ['user_id','product_id']] # 去除用户和产品 id for feat in sparse_features_original: if feat in list(user_info.columns): user_info[feat] = user_info[feat].fillna('999') # 先填充再编码 feat_encoder = {v:k+1 for k,v in enumerate(list(user_info[feat].unique()))} feat_encoder['unknown'] = len(list(user_info[feat].unique())) + 1 feature_max_idx[feat] = len(list(user_info[feat].unique())) + 2 else: prdct_info[feat] = prdct_info[feat].fillna('999') feat_encoder = {v:k+1 for k,v in enumerate(list(prdct_info[feat].unique()))} feat_encoder['unknown'] = len(list(prdct_info[feat].unique())) + 1 feature_max_idx[feat] = len(list(prdct_info[feat].unique())) + 2 feature_encoder[feat] = feat_encoder feat_decoder = {v:k for k,v in feat_encoder.items()} feature_decoder[feat] = feat_decoder with open('./result/feature_encoder.pkl', 'wb') as file: pickle.dump(feature_encoder, file) with open('./result/feature_decoder.pkl', 'wb') as file: pickle.dump(feature_decoder, file) with open('./result/feature_max_idx.pkl', 'wb') as file: pickle.dump(feature_max_idx, file) ########### 预测时加载并使用 ########### # 加载文件 with open('./result/feature_max_idx.pkl', 'rb') as file: feature_max_idx = pickle.load(file) with open('./result/feature_encoder.pkl', 'rb') as file: feature_encoder = pickle.load(file) with open('./result/feature_decoder.pkl', 'rb') as file: feature_decoder = pickle.load(file) # 稀疏字段处理 sparse_features = user_multiClass_cols+prod_multiClass_cols+['u_is_unlimited_gprs','u_gprs_restrain','p_stackable'] # 後面這三個理論上是布爾值,但是被填充了字符串,歸一化處理不了,會導致後面報錯,所以當作多類別處理 sparse_features = [i for i in sparse_features if i in samples_features_df_columns] # 过滤一遍,避免错误 # print('稀疏特征:',sparse_features) for feat in sparse_features: if feat == 'user_id' or feat == 'product_id': # lbe.classes_的值会随着 lbe.fit_transform 处理的数据而变化,有对应关系;leb.classes_是类属性 lbe = LabelEncoder() samples_features_df[feat] = lbe.fit_transform(samples_features_df[feat]) id_map = {encode_id: raw_id for encode_id, raw_id in enumerate(lbe.classes_)} #encode user id: raw user id else: feat_encoder = feature_encoder[feat] samples_features_df[feat] = samples_features_df[feat].map(lambda x: feat_encoder.get(x, feat_encoder['unknown'])) # 用字典映射 feature_max_idx[feat] = samples_features_df[feat].max() + 1 # 这里会作为词典大小,加1是为未知数据做保留
      此外,由于用户 id 和物品 id 时随时变化的,所以只要求保持单次预测的数量在训练数据的特征字典之内就行,而且需要在预测完之后对其进行还原,所以对这两个字段单独使用LabelEncoder方法来编码
 
问题六:Dataloader引发死锁
  • 问题现象:并没有实际报错,但是运行的程序会卡住,一个进程开了几个子进程,子进程在开着但是很快就不工作了,也不占用 cpu 资源
  • 发现问题:通过日志发现,运行输出是卡在训练那一步,结果一直以为是训练那里出了问题;还以为是服务器会限制进程开启子进程的数量,一顿乱找结果毫无进展。后面发现训练并不会在cpu 上开启多进程;发现训练调用的函数里使用了pytorch 的数据加载 Dataloader,其中有个 num_workers 参数会指定加载数据的进程数,最终确定是这里的问题,
  • 解决方法:
    • possible deadlock in dataloader #1355(【pytorch】由dataloader引发的死锁)
    • 将num_workers设置为 0 或者 2 解决问题,我是设置成了 2(设置成 2 可以运行也是需要运气,成功率比较随机)。
    • 或者设置参数persistent_workers=True;persistent_workers (bool, optional) – If True, the data loader will not shutdown the worker processes after a dataset has been consumed once. This allows to maintain the workers Dataset instances alive. (default: False)(貌似也不行(dataloader在循环里,不知道在没有循环的情况下行不行))
 
问题七:在 python 中无法开启多进程(是否成功比较随机),即使开启了多进程之间也是串行关系
  • 暂未解决,不过怀疑是服务器系统的原因(华为欧拉)
 
问题八:使用向量数据库annoy.query(v=user_emb, n=topk)召回时,返回的是余弦距离而不是余弦相似度(torch_rechub库源码中是这样的),所以最终的结果是在 0 到 2 之间,且值越小越相似;0 表示两个向量在同一方向上完全相同;1 表示两个向量在正交(垂直)方向上无关。;2 表示两个向量在相反方向上完全相反。
而余弦相似度是在 -1 到 1 之间,且值越大越相似。1 表示两个向量在同一方向上完全相同;0 表示两个向量在正交(垂直)方向上无关;-1 表示两个向量在相反方向上完全相反。

内存管理问题

由于预测时的全量用户为 1.5 亿,数据量实在太大,而且服务器上没有 GPU 资源,所以只能通过开启多进程的方式来进行预测。在 python 中开启多进程无果后,只能想办法曲线救国,最终通过 shell 脚本将数据拆分然后循环调用 python 脚本,这个过程中可以将拆分后的数据的起始行数和数据量通过参数传递给python,最终以这样的方式成功开启了多进程。但随后遇到了另外一个问题:内存溢出导致的部分进程被 kill。
经过一系列的尝试:一开始开了 10 个进程,有两个或三个进程被 kill;一开始我还以为是进程开太多了,多个进程一起跑起来内存压力很大所有系统会有 kill 进程的行为。但是后来减少到 9 个,8 个都有不同数量的进程被 kill,最后减少到 5 个进程依然会被 kill;而且通过日志可以发现这些被 kill 的进程都是代码运行到同一个地方的时候被停止了,这就很好定位问题了。
找到这行代码发现,这行代码是在使用 pd.concat 方法合并一个列表中的 dataframe 数据;这列表中有 8 个 df,使用concat方法一起合并的话,需要的瞬时内存峰值会很大,而此时还有其他进程也在运行且需要内存的,所以很有可能是在这里瞬间占用大量内存的时候被系统大大给 kill 了。有了这样的猜想之后就有方向了,既然瞬时内存峰值太高,那我就两两合并试试看,并且在循环中不新增变量来额外增加内存负担。不管是不是先改了运行再说。
# 原来的处理 match_res_df = pd.concat(match_res_df_l, ignore_index=True) # 改进后的处理 match_res_df = match_res_df_l[0] for df in match_res_df_l[1:]: match_res_df = pd.concat([match_res_df, df], ignore_index=True)
此外,还对代码中的其他使用完的中间变量进行了删除,并在合并数据之前调用了python 的垃圾回收机制,来强行回收内存。
del user_info_ del user_info_df_l # 删除中间数据,减少内存占用 import gc gc.collect() # 强制进行垃圾回收
经过以上的改动,多进程成功运行到寿终正寝,终于再没有短命的进程了,它真的我哭死(ㄒoㄒ)。
自从入行以来,从来没有重视过内存管理这块的东西,但这次真的是该遭的罪一点没落下的全体验了一遍。试问苍天饶过谁。
If you have any questions, please contact me.