mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-07-05 02:31:10 +08:00
添加日志
This commit is contained in:
parent
171d3365ad
commit
a5fcb4ed23
@ -257,9 +257,12 @@ func (och *OnlineHistoryRedisConsumerHandler) categorizeMessageLists(totalMsgs [
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key, conversationID string, storageList, notStorageList []*ContextMsg) {
|
func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key, conversationID string, storageList, notStorageList []*ContextMsg) {
|
||||||
log.ZInfo(ctx, "handle storage msg")
|
log.ZDebug(ctx, "handleMsg", "conversationID", conversationID, "storageList", len(storageList), "notStorageList", len(notStorageList))
|
||||||
for _, storageMsg := range storageList {
|
for _, storageMsg := range storageList {
|
||||||
log.ZDebug(ctx, "handle storage msg", "msg", storageMsg.message.String())
|
log.ZDebug(ctx, "handleMsg", "storage msg", storageMsg.message.String())
|
||||||
|
}
|
||||||
|
for _, notStorageMsg := range notStorageList {
|
||||||
|
log.ZDebug(ctx, "handleMsg", "not storage msg", notStorageMsg.message.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
och.toPushTopic(ctx, key, conversationID, notStorageList)
|
och.toPushTopic(ctx, key, conversationID, notStorageList)
|
||||||
@ -274,7 +277,9 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key
|
|||||||
log.ZWarn(ctx, "batch data insert to redis err", err, "storageMsgList", storageMessageList)
|
log.ZWarn(ctx, "batch data insert to redis err", err, "storageMsgList", storageMessageList)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
log.ZInfo(ctx, "BatchInsertChat2Cache end")
|
|
||||||
|
log.ZDebug(ctx, "handleMsg", "BatchInsertChat2Cache", "lastSeq", lastSeq, "isNewConversation", isNewConversation, "userSeqMap", userSeqMap)
|
||||||
|
|
||||||
err = och.msgTransferDatabase.SetHasReadSeqs(ctx, conversationID, userSeqMap)
|
err = och.msgTransferDatabase.SetHasReadSeqs(ctx, conversationID, userSeqMap)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.ZWarn(ctx, "SetHasReadSeqs error", err, "userSeqMap", userSeqMap, "conversationID", conversationID)
|
log.ZWarn(ctx, "SetHasReadSeqs error", err, "userSeqMap", userSeqMap, "conversationID", conversationID)
|
||||||
@ -297,7 +302,7 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key
|
|||||||
log.ZWarn(ctx, "get group member ids error", err, "conversationID",
|
log.ZWarn(ctx, "get group member ids error", err, "conversationID",
|
||||||
conversationID)
|
conversationID)
|
||||||
} else {
|
} else {
|
||||||
log.ZInfo(ctx, "GetGroupMemberIDs end")
|
log.ZDebug(ctx, "handleMsg", "GetGroupMemberIDs", "userIDs", userIDs)
|
||||||
|
|
||||||
if err := och.conversationClient.CreateGroupChatConversations(ctx, msg.GroupID, userIDs); err != nil {
|
if err := och.conversationClient.CreateGroupChatConversations(ctx, msg.GroupID, userIDs); err != nil {
|
||||||
log.ZWarn(ctx, "single chat first create conversation error", err,
|
log.ZWarn(ctx, "single chat first create conversation error", err,
|
||||||
@ -315,22 +320,22 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key
|
|||||||
log.ZWarn(ctx, "single chat or notification first create conversation error", err,
|
log.ZWarn(ctx, "single chat or notification first create conversation error", err,
|
||||||
"conversationID", conversationID, "sessionType", msg.SessionType)
|
"conversationID", conversationID, "sessionType", msg.SessionType)
|
||||||
}
|
}
|
||||||
|
log.ZDebug(ctx, "handleMsg", "CreateSingleChatConversations", "conversationID", conversationID, "sessionType", msg.SessionType, "recv", msg.RecvID, "send", msg.SendID)
|
||||||
default:
|
default:
|
||||||
log.ZWarn(ctx, "unknown session type", nil, "sessionType",
|
log.ZWarn(ctx, "unknown session type", nil, "sessionType",
|
||||||
msg.SessionType)
|
msg.SessionType)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.ZInfo(ctx, "success incr to next topic")
|
|
||||||
err = och.msgTransferDatabase.MsgToMongoMQ(ctx, key, conversationID, storageMessageList, lastSeq)
|
err = och.msgTransferDatabase.MsgToMongoMQ(ctx, key, conversationID, storageMessageList, lastSeq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.ZError(ctx, "Msg To MongoDB MQ error", err, "conversationID",
|
log.ZError(ctx, "Msg To MongoDB MQ error", err, "conversationID",
|
||||||
conversationID, "storageList", storageMessageList, "lastSeq", lastSeq)
|
conversationID, "storageList", storageMessageList, "lastSeq", lastSeq)
|
||||||
}
|
}
|
||||||
log.ZInfo(ctx, "MsgToMongoMQ end")
|
log.ZDebug(ctx, "handleMsg", "MsgToMongoMQ", "conversationID", conversationID, "storageList", len(storageList), "lastSeq", lastSeq, "key", key)
|
||||||
|
|
||||||
och.toPushTopic(ctx, key, conversationID, storageList)
|
och.toPushTopic(ctx, key, conversationID, storageList)
|
||||||
log.ZInfo(ctx, "toPushTopic end")
|
log.ZDebug(ctx, "handleMsg", "toPushTopic", "conversationID", conversationID, "storageList", len(storageList), "key", key)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -138,7 +138,7 @@ func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim s
|
|||||||
}
|
}
|
||||||
c.onlineCache.Lock.Unlock()
|
c.onlineCache.Lock.Unlock()
|
||||||
ctx := mcontext.SetOperationID(context.TODO(), strconv.FormatInt(time.Now().UnixNano()+int64(rand.Uint32()), 10))
|
ctx := mcontext.SetOperationID(context.TODO(), strconv.FormatInt(time.Now().UnixNano()+int64(rand.Uint32()), 10))
|
||||||
log.ZInfo(ctx, "begin consume messages")
|
log.ZDebug(ctx, "ConsumeClaim", "begin consume messages")
|
||||||
|
|
||||||
for msg := range claim.Messages() {
|
for msg := range claim.Messages() {
|
||||||
ctx := c.pushConsumerGroup.GetContextFromMsg(msg)
|
ctx := c.pushConsumerGroup.GetContextFromMsg(msg)
|
||||||
|
|||||||
@ -251,6 +251,7 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver
|
|||||||
return nil, errs.ErrArgs.WrapMsg("conversation must not be nil")
|
return nil, errs.ErrArgs.WrapMsg("conversation must not be nil")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.ZDebug(ctx, "SetConversations", "conversation", req.Conversation)
|
||||||
if req.Conversation.ConversationType == constant.WriteGroupChatType {
|
if req.Conversation.ConversationType == constant.WriteGroupChatType {
|
||||||
groupInfo, err := c.groupClient.GetGroupInfo(ctx, req.Conversation.GroupID)
|
groupInfo, err := c.groupClient.GetGroupInfo(ctx, req.Conversation.GroupID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -376,6 +377,7 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver
|
|||||||
needUpdateUsersList = append(needUpdateUsersList, userID)
|
needUpdateUsersList = append(needUpdateUsersList, userID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
log.ZDebug(ctx, "SetConversations", "m", m, "conversation", conversation, "needUpdateUsersList", needUpdateUsersList)
|
||||||
if len(m) != 0 && len(needUpdateUsersList) != 0 {
|
if len(m) != 0 && len(needUpdateUsersList) != 0 {
|
||||||
if err := c.conversationDatabase.SetUsersConversationFieldTx(ctx, needUpdateUsersList, &conversation, m); err != nil {
|
if err := c.conversationDatabase.SetUsersConversationFieldTx(ctx, needUpdateUsersList, &conversation, m); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user