mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-11-04 11:22:10 +08:00 
			
		
		
		
	online cache
This commit is contained in:
		
							parent
							
								
									d06c323f11
								
							
						
					
					
						commit
						45b07bc3cc
					
				@ -102,13 +102,11 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) {
 | 
				
			|||||||
		var pushUserIDList []string
 | 
							var pushUserIDList []string
 | 
				
			||||||
		isSenderSync := datautil.GetSwitchFromOptions(pbData.MsgData.Options, constant.IsSenderSync)
 | 
							isSenderSync := datautil.GetSwitchFromOptions(pbData.MsgData.Options, constant.IsSenderSync)
 | 
				
			||||||
		if !isSenderSync || pbData.MsgData.SendID == pbData.MsgData.RecvID {
 | 
							if !isSenderSync || pbData.MsgData.SendID == pbData.MsgData.RecvID {
 | 
				
			||||||
			pushUserIDList, err = c.onlineCache.GetUsersOnline(ctx, []string{pbData.MsgData.RecvID})
 | 
								pushUserIDList = append(pushUserIDList, pbData.MsgData.RecvID)
 | 
				
			||||||
		} else {
 | 
							} else {
 | 
				
			||||||
			pushUserIDList, err = c.onlineCache.GetUsersOnline(ctx, []string{pbData.MsgData.RecvID, pbData.MsgData.SendID})
 | 
								pushUserIDList = append(pushUserIDList, pbData.MsgData.RecvID, pbData.MsgData.SendID)
 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		if err == nil {
 | 
					 | 
				
			||||||
			err = c.Push2User(ctx, pushUserIDList, pbData.MsgData)
 | 
					 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
							err = c.Push2User(ctx, pushUserIDList, pbData.MsgData)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		log.ZWarn(ctx, "push failed", err, "msg", pbData.String())
 | 
							log.ZWarn(ctx, "push failed", err, "msg", pbData.String())
 | 
				
			||||||
@ -129,7 +127,12 @@ func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim s
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Push2User Suitable for two types of conversations, one is SingleChatType and the other is NotificationChatType.
 | 
					// Push2User Suitable for two types of conversations, one is SingleChatType and the other is NotificationChatType.
 | 
				
			||||||
func (c *ConsumerHandler) Push2User(ctx context.Context, userIDs []string, msg *sdkws.MsgData) error {
 | 
					func (c *ConsumerHandler) Push2User(ctx context.Context, userIDs []string, msg *sdkws.MsgData) (err error) {
 | 
				
			||||||
 | 
						userIDs, err = c.onlineCache.GetUsersOnline(ctx, userIDs)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	log.ZDebug(ctx, "Get msg from msg_transfer And push msg", "userIDs", userIDs, "msg", msg.String())
 | 
						log.ZDebug(ctx, "Get msg from msg_transfer And push msg", "userIDs", userIDs, "msg", msg.String())
 | 
				
			||||||
	if err := c.webhookBeforeOnlinePush(ctx, &c.config.WebhooksConfig.BeforeOnlinePush, userIDs, msg); err != nil {
 | 
						if err := c.webhookBeforeOnlinePush(ctx, &c.config.WebhooksConfig.BeforeOnlinePush, userIDs, msg); err != nil {
 | 
				
			||||||
		return err
 | 
							return err
 | 
				
			||||||
 | 
				
			|||||||
@ -48,9 +48,10 @@ func NewOnlineCache(user rpcclient.UserRpcClient, group *GroupLocalCache, rdb re
 | 
				
			|||||||
		for message := range rdb.Subscribe(ctx, cachekey.OnlineChannel).Channel() {
 | 
							for message := range rdb.Subscribe(ctx, cachekey.OnlineChannel).Channel() {
 | 
				
			||||||
			userID, platformIDs, err := parseUserOnlineStatus(message.Payload)
 | 
								userID, platformIDs, err := parseUserOnlineStatus(message.Payload)
 | 
				
			||||||
			if err != nil {
 | 
								if err != nil {
 | 
				
			||||||
				log.ZError(ctx, "redis subscribe parseUserOnlineStatus", err, "payload", message.Payload, "channel", message.Channel)
 | 
									log.ZError(ctx, "OnlineCache redis subscribe parseUserOnlineStatus", err, "payload", message.Payload, "channel", message.Channel)
 | 
				
			||||||
				continue
 | 
									continue
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 | 
								log.ZDebug(ctx, "OnlineCache setUserOnline", "userID", userID, "platformIDs", platformIDs, "payload", message.Payload)
 | 
				
			||||||
			x.setUserOnline(userID, platformIDs)
 | 
								x.setUserOnline(userID, platformIDs)
 | 
				
			||||||
			//if err := x.setUserOnline(ctx, userID, platformIDs); err != nil {
 | 
								//if err := x.setUserOnline(ctx, userID, platformIDs); err != nil {
 | 
				
			||||||
			//	log.ZError(ctx, "redis subscribe setUserOnline", err, "payload", message.Payload, "channel", message.Channel)
 | 
								//	log.ZError(ctx, "redis subscribe setUserOnline", err, "payload", message.Payload, "channel", message.Channel)
 | 
				
			||||||
@ -95,6 +96,7 @@ func (o *OnlineCache) GetUsersOnline(ctx context.Context, userIDs []string) ([]s
 | 
				
			|||||||
			onlineUserIDs = append(onlineUserIDs, userID)
 | 
								onlineUserIDs = append(onlineUserIDs, userID)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						log.ZDebug(ctx, "OnlineCache GetUsersOnline", "userIDs", userIDs, "onlineUserIDs", onlineUserIDs)
 | 
				
			||||||
	return onlineUserIDs, nil
 | 
						return onlineUserIDs, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -113,6 +115,7 @@ func (o *OnlineCache) GetGroupOnline(ctx context.Context, groupID string) ([]str
 | 
				
			|||||||
			onlineUserIDs = append(onlineUserIDs, userID)
 | 
								onlineUserIDs = append(onlineUserIDs, userID)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						log.ZDebug(ctx, "OnlineCache GetGroupOnline", "groupID", groupID, "onlineUserIDs", onlineUserIDs)
 | 
				
			||||||
	return onlineUserIDs, nil
 | 
						return onlineUserIDs, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user