mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-24 10:22:36 +08:00
msg
This commit is contained in:
parent
6530bb65e4
commit
fc3c38ab65
@ -93,8 +93,8 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
|
|||||||
storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList, modifyMsgList := och.getPushStorageMsgList(ctxMsgList)
|
storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList, modifyMsgList := och.getPushStorageMsgList(ctxMsgList)
|
||||||
log.ZDebug(ctx, "msg lens", "storageMsgList", len(storageMsgList), "notStorageMsgList", len(notStorageMsgList),
|
log.ZDebug(ctx, "msg lens", "storageMsgList", len(storageMsgList), "notStorageMsgList", len(notStorageMsgList),
|
||||||
"storageNotificationList", len(storageNotificationList), "notStorageNotificationList", len(notStorageNotificationList), "modifyMsgList", len(modifyMsgList))
|
"storageNotificationList", len(storageNotificationList), "notStorageNotificationList", len(notStorageNotificationList), "modifyMsgList", len(modifyMsgList))
|
||||||
och.handleMsg(ctx, storageMsgList, notStorageMsgList)
|
och.handleMsg(ctx, utils.GetChatConversationIDByMsg(ctxMsgList[0].message), storageMsgList, notStorageMsgList)
|
||||||
och.handleNotification(ctx, storageNotificationList, notStorageNotificationList)
|
och.handleNotification(ctx, utils.GetNotificationConversationID(ctxMsgList[0].message), storageNotificationList, notStorageNotificationList)
|
||||||
if err := och.msgDatabase.MsgToModifyMQ(ctx, msgChannelValue.conversationID, modifyMsgList); err != nil {
|
if err := och.msgDatabase.MsgToModifyMQ(ctx, msgChannelValue.conversationID, modifyMsgList); err != nil {
|
||||||
log.ZError(ctx, "msg to modify mq error", err, "conversationID", msgChannelValue.conversationID, "modifyMsgList", modifyMsgList)
|
log.ZError(ctx, "msg to modify mq error", err, "conversationID", msgChannelValue.conversationID, "modifyMsgList", modifyMsgList)
|
||||||
}
|
}
|
||||||
@ -147,8 +147,7 @@ func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(totalMsgs []
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Context, storageList, notStorageList []*sdkws.MsgData) {
|
func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Context, conversationID string, storageList, notStorageList []*sdkws.MsgData) {
|
||||||
conversationID := utils.GetConversationIDByMsg(storageList[0])
|
|
||||||
och.toPushTopic(ctx, conversationID, notStorageList)
|
och.toPushTopic(ctx, conversationID, notStorageList)
|
||||||
if len(storageList) > 0 {
|
if len(storageList) > 0 {
|
||||||
lastSeq, _, err := och.msgDatabase.BatchInsertChat2Cache(ctx, conversationID, storageList)
|
lastSeq, _, err := och.msgDatabase.BatchInsertChat2Cache(ctx, conversationID, storageList)
|
||||||
@ -168,8 +167,7 @@ func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, c
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, storageList, notStorageList []*sdkws.MsgData) {
|
func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, conversationID string, storageList, notStorageList []*sdkws.MsgData) {
|
||||||
conversationID := utils.GetConversationIDByMsg(storageList[0])
|
|
||||||
och.toPushTopic(ctx, conversationID, notStorageList)
|
och.toPushTopic(ctx, conversationID, notStorageList)
|
||||||
if len(storageList) > 0 {
|
if len(storageList) > 0 {
|
||||||
lastSeq, isNewConversation, err := och.msgDatabase.BatchInsertChat2Cache(ctx, conversationID, storageList)
|
lastSeq, isNewConversation, err := och.msgDatabase.BatchInsertChat2Cache(ctx, conversationID, storageList)
|
||||||
|
@ -175,6 +175,38 @@ func MsgIsNotification(msg *sdkws.MsgData) bool {
|
|||||||
return !options.IsNotNotification()
|
return !options.IsNotNotification()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func GetNotificationConversationID(msg *sdkws.MsgData) string {
|
||||||
|
switch msg.SessionType {
|
||||||
|
case constant.SingleChatType:
|
||||||
|
l := []string{msg.SendID, msg.RecvID}
|
||||||
|
sort.Strings(l)
|
||||||
|
return "n_" + strings.Join(l, "_")
|
||||||
|
case constant.GroupChatType:
|
||||||
|
return "n_" + msg.GroupID
|
||||||
|
case constant.SuperGroupChatType:
|
||||||
|
return "n_" + msg.GroupID
|
||||||
|
case constant.NotificationChatType:
|
||||||
|
return "n_" + msg.SendID + "_" + msg.RecvID
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetChatConversationIDByMsg(msg *sdkws.MsgData) string {
|
||||||
|
switch msg.SessionType {
|
||||||
|
case constant.SingleChatType:
|
||||||
|
l := []string{msg.SendID, msg.RecvID}
|
||||||
|
sort.Strings(l)
|
||||||
|
return "si_" + strings.Join(l, "_")
|
||||||
|
case constant.GroupChatType:
|
||||||
|
return "g_" + msg.GroupID
|
||||||
|
case constant.SuperGroupChatType:
|
||||||
|
return "sg_" + msg.GroupID
|
||||||
|
case constant.NotificationChatType:
|
||||||
|
return "sn_" + msg.SendID + "_" + msg.RecvID
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
func GetConversationIDByMsg(msg *sdkws.MsgData) string {
|
func GetConversationIDByMsg(msg *sdkws.MsgData) string {
|
||||||
options := Options(msg.Options)
|
options := Options(msg.Options)
|
||||||
switch msg.SessionType {
|
switch msg.SessionType {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user