This commit is contained in:
withchao 2023-02-13 18:14:26 +08:00
parent a9dec0c832
commit d7ea41fa1a
10 changed files with 82 additions and 71 deletions

View File

@ -2,16 +2,20 @@ package check
import ( import (
sdkws "Open_IM/pkg/proto/sdkws" sdkws "Open_IM/pkg/proto/sdkws"
"context"
"errors" "errors"
) )
type GroupChecker struct { type GroupChecker struct{}
func NewGroupChecker() GroupChecker {
return GroupChecker{}
} }
func NewGroupChecker() *GroupChecker { func (GroupChecker) GetGroupInfo(ctx context.Context, groupID string) (*sdkws.GroupInfo, error) {
return &GroupChecker{} return nil, errors.New("TODO:GetUserInfo")
} }
func (g *GroupChecker) GetGroupInfo(groupID string) (*sdkws.GroupInfo, error) { func (GroupChecker) GetGroupMemberInfo(ctx context.Context, groupID string, userID string) (*sdkws.GroupMemberFullInfo, error) {
return nil, errors.New("TODO:GetUserInfo") return nil, errors.New("TODO:GetUserInfo")
} }

View File

@ -10,28 +10,28 @@ import (
// return nil, errors.New("TODO:GetUserInfo") // return nil, errors.New("TODO:GetUserInfo")
//} //}
func NewUserCheck() *UserCheck { func NewUserCheck() UserCheck {
return &UserCheck{} return UserCheck{}
} }
type UserCheck struct{} type UserCheck struct{}
func (u *UserCheck) GetUsersInfos(ctx context.Context, userIDs []string, complete bool) ([]*sdkws.UserInfo, error) { func (UserCheck) GetUsersInfos(ctx context.Context, userIDs []string, complete bool) ([]*sdkws.UserInfo, error) {
return nil, errors.New("todo") return nil, errors.New("todo")
} }
func (u *UserCheck) GetUsersInfoMap(ctx context.Context, userIDs []string, complete bool) (map[string]*sdkws.UserInfo, error) { func (UserCheck) GetUsersInfoMap(ctx context.Context, userIDs []string, complete bool) (map[string]*sdkws.UserInfo, error) {
return nil, errors.New("todo") return nil, errors.New("todo")
} }
func (u *UserCheck) GetPublicUserInfo(ctx context.Context, userID string) (*sdkws.PublicUserInfo, error) { func (UserCheck) GetPublicUserInfo(ctx context.Context, userID string) (*sdkws.PublicUserInfo, error) {
return nil, errors.New("todo") return nil, errors.New("todo")
} }
func (u *UserCheck) GetPublicUserInfos(ctx context.Context, userIDs []string, complete bool) ([]*sdkws.PublicUserInfo, error) { func (UserCheck) GetPublicUserInfos(ctx context.Context, userIDs []string, complete bool) ([]*sdkws.PublicUserInfo, error) {
return nil, errors.New("todo") return nil, errors.New("todo")
} }
func (u *UserCheck) GetPublicUserInfoMap(ctx context.Context, userIDs []string, complete bool) (map[string]*sdkws.PublicUserInfo, error) { func (UserCheck) GetPublicUserInfoMap(ctx context.Context, userIDs []string, complete bool) (map[string]*sdkws.PublicUserInfo, error) {
return nil, errors.New("todo") return nil, errors.New("todo")
} }

View File

@ -1,4 +1,4 @@
package msg package notification
import ( import (
"Open_IM/pkg/common/config" "Open_IM/pkg/common/config"
@ -70,7 +70,7 @@ func ConversationChangeNotification(ctx context.Context, userID string) {
SetConversationNotification(operationID, userID, userID, constant.ConversationOptChangeNotification, ConversationChangedTips, tips) SetConversationNotification(operationID, userID, userID, constant.ConversationOptChangeNotification, ConversationChangedTips, tips)
} }
//会话未读数同步 // 会话未读数同步
func ConversationUnreadChangeNotification(context context.Context, userID, conversationID string, updateUnreadCountTime int64) { func ConversationUnreadChangeNotification(context context.Context, userID, conversationID string, updateUnreadCountTime int64) {
log.NewInfo(operationID, utils.GetSelfFuncName()) log.NewInfo(operationID, utils.GetSelfFuncName())
ConversationChangedTips := &sdkws.ConversationUpdateTips{ ConversationChangedTips := &sdkws.ConversationUpdateTips{

View File

@ -7,36 +7,36 @@ import (
"context" "context"
) )
func (s *msgServer) DelMsgList(ctx context.Context, req *common.DelMsgListReq) (*common.DelMsgListResp, error) { func (m *msgServer) DelMsgList(ctx context.Context, req *common.DelMsgListReq) (*common.DelMsgListResp, error) {
resp := &common.DelMsgListResp{} resp := &common.DelMsgListResp{}
if err := s.MsgInterface.DelMsgFromCache(ctx, req.UserID, req.SeqList); err != nil { if err := m.MsgInterface.DelMsgFromCache(ctx, req.UserID, req.SeqList); err != nil {
return nil, err return nil, err
} }
DeleteMessageNotification(ctx, req.UserID, req.SeqList) DeleteMessageNotification(ctx, req.UserID, req.SeqList)
return resp, nil return resp, nil
} }
func (s *msgServer) DelSuperGroupMsg(ctx context.Context, req *msg.DelSuperGroupMsgReq) (*msg.DelSuperGroupMsgResp, error) { func (m *msgServer) DelSuperGroupMsg(ctx context.Context, req *msg.DelSuperGroupMsgReq) (*msg.DelSuperGroupMsgResp, error) {
resp := &msg.DelSuperGroupMsgResp{} resp := &msg.DelSuperGroupMsgResp{}
if err := tokenverify.CheckAdmin(ctx); err != nil { if err := tokenverify.CheckAdmin(ctx); err != nil {
return nil, err return nil, err
} }
maxSeq, err := s.MsgInterface.GetGroupMaxSeq(ctx, req.GroupID) maxSeq, err := m.MsgInterface.GetGroupMaxSeq(ctx, req.GroupID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if err := s.MsgInterface.SetGroupUserMinSeq(ctx, req.GroupID, maxSeq); err != nil { if err := m.MsgInterface.SetGroupUserMinSeq(ctx, req.GroupID, maxSeq); err != nil {
return nil, err return nil, err
} }
return resp, nil return resp, nil
} }
func (s *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (*msg.ClearMsgResp, error) { func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (*msg.ClearMsgResp, error) {
resp := &msg.ClearMsgResp{} resp := &msg.ClearMsgResp{}
if err := tokenverify.CheckAccessV3(ctx, req.UserID); err != nil { if err := tokenverify.CheckAccessV3(ctx, req.UserID); err != nil {
return nil, err return nil, err
} }
if err := s.MsgInterface.DelUserAllSeq(ctx, req.UserID); err != nil { if err := m.MsgInterface.DelUserAllSeq(ctx, req.UserID); err != nil {
return nil, err return nil, err
} }
return resp, nil return resp, nil

View File

@ -14,8 +14,8 @@ import (
"time" "time"
) )
func (rpc *msgServer) SetMessageReactionExtensions(ctx context.Context, req *msg.SetMessageReactionExtensionsReq) (resp *msg.SetMessageReactionExtensionsResp, err error) { func (m *msgServer) SetMessageReactionExtensions(ctx context.Context, req *msg.SetMessageReactionExtensionsReq) (resp *msg.SetMessageReactionExtensionsResp, err error) {
log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc args is:", req.String()) log.Debug(req.OperationID, utils.GetSelfFuncName(), "m args is:", req.String())
var rResp msg.SetMessageReactionExtensionsResp var rResp msg.SetMessageReactionExtensionsResp
rResp.ClientMsgID = req.ClientMsgID rResp.ClientMsgID = req.ClientMsgID
rResp.MsgFirstModifyTime = req.MsgFirstModifyTime rResp.MsgFirstModifyTime = req.MsgFirstModifyTime
@ -72,7 +72,7 @@ func (rpc *msgServer) SetMessageReactionExtensions(ctx context.Context, req *msg
log.Debug(req.OperationID, "redis handle firstly", req.String()) log.Debug(req.OperationID, "redis handle firstly", req.String())
rResp.MsgFirstModifyTime = utils.GetCurrentTimestampByMill() rResp.MsgFirstModifyTime = utils.GetCurrentTimestampByMill()
for k, v := range req.ReactionExtensionList { for k, v := range req.ReactionExtensionList {
err := rpc.dMessageLocker.LockMessageTypeKey(req.ClientMsgID, k) err := m.dMessageLocker.LockMessageTypeKey(req.ClientMsgID, k)
if err != nil { if err != nil {
setKeyResultInfo(&rResp, 100, err.Error(), req.ClientMsgID, k, v) setKeyResultInfo(&rResp, 100, err.Error(), req.ClientMsgID, k, v)
continue continue
@ -91,7 +91,7 @@ func (rpc *msgServer) SetMessageReactionExtensions(ctx context.Context, req *msg
log.Error(req.OperationID, "SetMessageReactionExpire err:", err.Error(), req.String()) log.Error(req.OperationID, "SetMessageReactionExpire err:", err.Error(), req.String())
} }
} else { } else {
err := rpc.dMessageLocker.LockGlobalMessage(req.ClientMsgID) err := m.dMessageLocker.LockGlobalMessage(req.ClientMsgID)
if err != nil { if err != nil {
rResp.ErrCode = 100 rResp.ErrCode = 100
rResp.ErrMsg = err.Error() rResp.ErrMsg = err.Error()
@ -149,7 +149,7 @@ func (rpc *msgServer) SetMessageReactionExtensions(ctx context.Context, req *msg
rResp.Result = append(rResp.Result, temp) rResp.Result = append(rResp.Result, temp)
} }
} }
lockErr := rpc.dMessageLocker.UnLockGlobalMessage(req.ClientMsgID) lockErr := m.dMessageLocker.UnLockGlobalMessage(req.ClientMsgID)
if lockErr != nil { if lockErr != nil {
log.Error(req.OperationID, "UnLockGlobalMessage err:", lockErr.Error()) log.Error(req.OperationID, "UnLockGlobalMessage err:", lockErr.Error())
} }
@ -159,7 +159,7 @@ func (rpc *msgServer) SetMessageReactionExtensions(ctx context.Context, req *msg
log.Debug(req.OperationID, "redis handle secondly", req.String()) log.Debug(req.OperationID, "redis handle secondly", req.String())
for k, v := range req.ReactionExtensionList { for k, v := range req.ReactionExtensionList {
err := rpc.dMessageLocker.LockMessageTypeKey(req.ClientMsgID, k) err := m.dMessageLocker.LockMessageTypeKey(req.ClientMsgID, k)
if err != nil { if err != nil {
setKeyResultInfo(&rResp, 100, err.Error(), req.ClientMsgID, k, v) setKeyResultInfo(&rResp, 100, err.Error(), req.ClientMsgID, k, v)
continue continue
@ -195,7 +195,7 @@ func (rpc *msgServer) SetMessageReactionExtensions(ctx context.Context, req *msg
} else { } else {
notification.ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, false, true) notification.ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, false, true)
} }
log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc return is:", rResp.String()) log.Debug(req.OperationID, utils.GetSelfFuncName(), "m return is:", rResp.String())
return &rResp, nil return &rResp, nil
} }
@ -216,8 +216,8 @@ func setDeleteKeyResultInfo(r *msg.DeleteMessageListReactionExtensionsResp, errC
_ = db.DB.UnLockMessageTypeKey(clientMsgID, typeKey) _ = db.DB.UnLockMessageTypeKey(clientMsgID, typeKey)
} }
func (rpc *msgServer) GetMessageListReactionExtensions(ctx context.Context, req *msg.GetMessageListReactionExtensionsReq) (resp *msg.GetMessageListReactionExtensionsResp, err error) { func (m *msgServer) GetMessageListReactionExtensions(ctx context.Context, req *msg.GetMessageListReactionExtensionsReq) (resp *msg.GetMessageListReactionExtensionsResp, err error) {
log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc args is:", req.String()) log.Debug(req.OperationID, utils.GetSelfFuncName(), "m args is:", req.String())
var rResp msg.GetMessageListReactionExtensionsResp var rResp msg.GetMessageListReactionExtensionsResp
for _, messageValue := range req.MessageReactionKeyList { for _, messageValue := range req.MessageReactionKeyList {
var oneMessage msg.SingleMessageExtensionResult var oneMessage msg.SingleMessageExtensionResult
@ -267,17 +267,17 @@ func (rpc *msgServer) GetMessageListReactionExtensions(ctx context.Context, req
} }
rResp.SingleMessageResult = append(rResp.SingleMessageResult, &oneMessage) rResp.SingleMessageResult = append(rResp.SingleMessageResult, &oneMessage)
} }
log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc return is:", rResp.String()) log.Debug(req.OperationID, utils.GetSelfFuncName(), "m return is:", rResp.String())
return &rResp, nil return &rResp, nil
} }
func (rpc *msgServer) AddMessageReactionExtensions(ctx context.Context, req *msg.ModifyMessageReactionExtensionsReq) (resp *msg.ModifyMessageReactionExtensionsResp, err error) { func (m *msgServer) AddMessageReactionExtensions(ctx context.Context, req *msg.ModifyMessageReactionExtensionsReq) (resp *msg.ModifyMessageReactionExtensionsResp, err error) {
return return
} }
func (rpc *msgServer) DeleteMessageReactionExtensions(ctx context.Context, req *msg.DeleteMessageListReactionExtensionsReq) (resp *msg.DeleteMessageListReactionExtensionsResp, err error) { func (m *msgServer) DeleteMessageReactionExtensions(ctx context.Context, req *msg.DeleteMessageListReactionExtensionsReq) (resp *msg.DeleteMessageListReactionExtensionsResp, err error) {
log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc args is:", req.String()) log.Debug(req.OperationID, utils.GetSelfFuncName(), "m args is:", req.String())
var rResp msg.DeleteMessageListReactionExtensionsResp var rResp msg.DeleteMessageListReactionExtensionsResp
callbackResp := notification.callbackDeleteMessageReactionExtensions(req) callbackResp := notification.callbackDeleteMessageReactionExtensions(req)
if callbackResp.ActionCode != constant.ActionAllow || callbackResp.ErrCode != 0 { if callbackResp.ActionCode != constant.ActionAllow || callbackResp.ErrCode != 0 {
@ -328,7 +328,7 @@ func (rpc *msgServer) DeleteMessageReactionExtensions(ctx context.Context, req *
if isExists { if isExists {
log.Debug(req.OperationID, "redis handle this delete", req.String()) log.Debug(req.OperationID, "redis handle this delete", req.String())
for _, v := range req.ReactionExtensionList { for _, v := range req.ReactionExtensionList {
err := rpc.dMessageLocker.LockMessageTypeKey(req.ClientMsgID, v.TypeKey) err := m.dMessageLocker.LockMessageTypeKey(req.ClientMsgID, v.TypeKey)
if err != nil { if err != nil {
setDeleteKeyResultInfo(&rResp, 100, err.Error(), req.ClientMsgID, v.TypeKey, v) setDeleteKeyResultInfo(&rResp, 100, err.Error(), req.ClientMsgID, v.TypeKey, v)
continue continue
@ -354,7 +354,7 @@ func (rpc *msgServer) DeleteMessageReactionExtensions(ctx context.Context, req *
} }
} }
} else { } else {
err := rpc.dMessageLocker.LockGlobalMessage(req.ClientMsgID) err := m.dMessageLocker.LockGlobalMessage(req.ClientMsgID)
if err != nil { if err != nil {
rResp.ErrCode = 100 rResp.ErrCode = 100
rResp.ErrMsg = err.Error() rResp.ErrMsg = err.Error()
@ -413,13 +413,13 @@ func (rpc *msgServer) DeleteMessageReactionExtensions(ctx context.Context, req *
rResp.Result = append(rResp.Result, temp) rResp.Result = append(rResp.Result, temp)
} }
} }
lockErr := rpc.dMessageLocker.UnLockGlobalMessage(req.ClientMsgID) lockErr := m.dMessageLocker.UnLockGlobalMessage(req.ClientMsgID)
if lockErr != nil { if lockErr != nil {
log.Error(req.OperationID, "UnLockGlobalMessage err:", lockErr.Error()) log.Error(req.OperationID, "UnLockGlobalMessage err:", lockErr.Error())
} }
} }
notification.ExtendMessageDeleteNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, false, isExists) notification.ExtendMessageDeleteNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, false, isExists)
log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc return is:", rResp.String()) log.Debug(req.OperationID, utils.GetSelfFuncName(), "m return is:", rResp.String())
return &rResp, nil return &rResp, nil
} }

View File

@ -7,17 +7,17 @@ import (
"context" "context"
) )
func (s *msgServer) SetSendMsgStatus(ctx context.Context, req *pbMsg.SetSendMsgStatusReq) (*pbMsg.SetSendMsgStatusResp, error) { func (m *msgServer) SetSendMsgStatus(ctx context.Context, req *pbMsg.SetSendMsgStatusReq) (*pbMsg.SetSendMsgStatusResp, error) {
resp := &pbMsg.SetSendMsgStatusResp{} resp := &pbMsg.SetSendMsgStatusResp{}
if err := s.MsgInterface.SetSendMsgStatus(ctx, tracelog.GetOperationID(ctx), req.Status); err != nil { if err := m.MsgInterface.SetSendMsgStatus(ctx, tracelog.GetOperationID(ctx), req.Status); err != nil {
return nil, err return nil, err
} }
return resp, nil return resp, nil
} }
func (s *msgServer) GetSendMsgStatus(ctx context.Context, req *pbMsg.GetSendMsgStatusReq) (*pbMsg.GetSendMsgStatusResp, error) { func (m *msgServer) GetSendMsgStatus(ctx context.Context, req *pbMsg.GetSendMsgStatusReq) (*pbMsg.GetSendMsgStatusResp, error) {
resp := &pbMsg.GetSendMsgStatusResp{} resp := &pbMsg.GetSendMsgStatusResp{}
status, err := s.MsgInterface.GetSendMsgStatus(ctx, tracelog.GetOperationID(ctx)) status, err := m.MsgInterface.GetSendMsgStatus(ctx, tracelog.GetOperationID(ctx))
if IsNotFound(err) { if IsNotFound(err) {
resp.Status = constant.MsgStatusNotExist resp.Status = constant.MsgStatusNotExist
return resp, nil return resp, nil

View File

@ -1,11 +1,13 @@
package msg package msg
import ( import (
"Open_IM/internal/common/check"
"Open_IM/pkg/common/config" "Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant" "Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db/localcache"
"Open_IM/pkg/common/tracelog" "Open_IM/pkg/common/tracelog"
"Open_IM/pkg/proto/msg" "Open_IM/pkg/proto/msg"
sdkws "Open_IM/pkg/proto/sdkws" "Open_IM/pkg/proto/sdkws"
"Open_IM/pkg/utils" "Open_IM/pkg/utils"
"context" "context"
"math/rand" "math/rand"
@ -57,7 +59,7 @@ type MsgCallBackResp struct {
} }
func userIsMuteAndIsAdminInGroup(ctx context.Context, groupID, userID string) (isMute bool, err error) { func userIsMuteAndIsAdminInGroup(ctx context.Context, groupID, userID string) (isMute bool, err error) {
groupMemberInfo, err := GetGroupMemberInfo(ctx, groupID, userID) groupMemberInfo, err := check.NewGroupChecker().GetGroupMemberInfo(ctx, groupID, userID)
if err != nil { if err != nil {
return false, err return false, err
} }
@ -69,13 +71,13 @@ func userIsMuteAndIsAdminInGroup(ctx context.Context, groupID, userID string) (i
// 如果禁言了,再看下是否群管理员 // 如果禁言了,再看下是否群管理员
func groupIsMuted(ctx context.Context, groupID string, userID string) (bool, bool, error) { func groupIsMuted(ctx context.Context, groupID string, userID string) (bool, bool, error) {
groupInfo, err := GetGroupInfo(ctx, groupID) groupInfo, err := check.NewGroupChecker().GetGroupInfo(ctx, groupID)
if err != nil { if err != nil {
return false, false, err return false, false, err
} }
if groupInfo.Status == constant.GroupStatusMuted { if groupInfo.Status == constant.GroupStatusMuted {
groupMemberInfo, err := GetGroupMemberInfo(ctx, groupID, userID) groupMemberInfo, err := check.NewGroupChecker().GetGroupMemberInfo(ctx, groupID, userID)
if err != nil { if err != nil {
return false, false, err return false, false, err
} }
@ -85,20 +87,24 @@ func groupIsMuted(ctx context.Context, groupID string, userID string) (bool, boo
} }
func GetGroupMemberIDs(ctx context.Context, groupID string) (groupMemberIDs []string, err error) { func GetGroupMemberIDs(ctx context.Context, groupID string) (groupMemberIDs []string, err error) {
return localcache.NewGroupMemberIDsLocalCache().GetGroupMemberIDs(ctx, groupID)
} }
func GetGroupInfo(ctx context.Context, groupID string) (sdkws.GroupInfo, error) { //func GetGroupInfo(ctx context.Context, groupID string) (*sdkws.GroupInfo, error) {
// return check.NewGroupChecker().GetGroupInfo(ctx, groupID)
//
//
//}
} //func GetGroupMemberInfo(ctx context.Context, groupID string, userID string) (*sdkws.GroupMemberFullInfo, error) {
// check.NewGroupChecker().GetGroupMemberInfo
//}
func GetGroupMemberInfo(ctx context.Context, groupID string, userID string) (*sdkws.GroupMemberFullInfo, error) { //func (m *msgServer)GetSuperGroupMsg(ctx context.Context, groupID string, seq uint32) (*sdkws.MsgData, error) {
// return m.MsgInterface.GetSuperGroupMsg(ctx, groupID, seq)
//}
} func (m *msgServer) messageVerification(ctx context.Context, data *msg.SendMsgReq) ([]string, error) {
func GetSuperGroupMsg(ctx context.Context, groupID string, seq uint32) (*sdkws.MsgData, error) {
}
func (rpc *msgServer) messageVerification(ctx context.Context, data *msg.SendMsgReq) ([]string, error) {
switch data.MsgData.SessionType { switch data.MsgData.SessionType {
case constant.SingleChatType: case constant.SingleChatType:
if utils.IsContain(data.MsgData.SendID, config.Config.Manager.AppManagerUid) { if utils.IsContain(data.MsgData.SendID, config.Config.Manager.AppManagerUid) {
@ -163,7 +169,7 @@ func (rpc *msgServer) messageVerification(ctx context.Context, data *msg.SendMsg
} }
return userIDList, nil return userIDList, nil
case constant.SuperGroupChatType: case constant.SuperGroupChatType:
groupInfo, err := GetGroupInfo(ctx, data.MsgData.GroupID) groupInfo, err := check.NewGroupChecker().GetGroupInfo(ctx, data.MsgData.GroupID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -175,7 +181,7 @@ func (rpc *msgServer) messageVerification(ctx context.Context, data *msg.SendMsg
} }
if revokeMessage.RevokerID != revokeMessage.SourceMessageSendID { if revokeMessage.RevokerID != revokeMessage.SourceMessageSendID {
resp, err := GetSuperGroupMsg(ctx, data.MsgData.GroupID, revokeMessage.Seq) resp, err := m.MsgInterface.GetSuperGroupMsg(ctx, data.MsgData.GroupID, revokeMessage.Seq)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -231,7 +237,7 @@ func (rpc *msgServer) messageVerification(ctx context.Context, data *msg.SendMsg
return nil, nil return nil, nil
} }
} }
func (rpc *msgServer) encapsulateMsgData(msg *sdkws.MsgData) { func (m *msgServer) encapsulateMsgData(msg *sdkws.MsgData) {
msg.ServerMsgID = GetMsgID(msg.SendID) msg.ServerMsgID = GetMsgID(msg.SendID)
msg.SendTime = utils.GetCurrentTimestampByMill() msg.SendTime = utils.GetCurrentTimestampByMill()
switch msg.ContentType { switch msg.ContentType {

View File

@ -75,7 +75,7 @@ func (rpc *rpcChat) initPrometheus() {
promePkg.NewWorkSuperGroupChatMsgProcessFailedCounter() promePkg.NewWorkSuperGroupChatMsgProcessFailedCounter()
} }
func (rpc *msgServer) Run() { func (m *msgServer) Run() {
log.Info("", "rpcChat init...") log.Info("", "rpcChat init...")
listenIP := "" listenIP := ""
if config.Config.ListenIP == "" { if config.Config.ListenIP == "" {
@ -83,10 +83,10 @@ func (rpc *msgServer) Run() {
} else { } else {
listenIP = config.Config.ListenIP listenIP = config.Config.ListenIP
} }
address := listenIP + ":" + strconv.Itoa(rpc.rpcPort) address := listenIP + ":" + strconv.Itoa(m.rpcPort)
listener, err := net.Listen("tcp", address) listener, err := net.Listen("tcp", address)
if err != nil { if err != nil {
panic("listening err:" + err.Error() + rpc.rpcRegisterName) panic("listening err:" + err.Error() + m.rpcRegisterName)
} }
log.Info("", "listen network success, address ", address) log.Info("", "listen network success, address ", address)
recvSize := 1024 * 1024 * 30 recvSize := 1024 * 1024 * 30
@ -109,26 +109,26 @@ func (rpc *msgServer) Run() {
defer srv.GracefulStop() defer srv.GracefulStop()
rpcRegisterIP := config.Config.RpcRegisterIP rpcRegisterIP := config.Config.RpcRegisterIP
msg.RegisterMsgServer(srv, rpc) msg.RegisterMsgServer(srv, m)
if config.Config.RpcRegisterIP == "" { if config.Config.RpcRegisterIP == "" {
rpcRegisterIP, err = utils.GetLocalIP() rpcRegisterIP, err = utils.GetLocalIP()
if err != nil { if err != nil {
log.Error("", "GetLocalIP failed ", err.Error()) log.Error("", "GetLocalIP failed ", err.Error())
} }
} }
err = getcdv3.RegisterEtcd(rpc.etcdSchema, strings.Join(rpc.etcdAddr, ","), rpcRegisterIP, rpc.rpcPort, rpc.rpcRegisterName, 10, "") err = getcdv3.RegisterEtcd(m.etcdSchema, strings.Join(m.etcdAddr, ","), rpcRegisterIP, m.rpcPort, m.rpcRegisterName, 10, "")
if err != nil { if err != nil {
log.Error("", "register rpcChat to etcd failed ", err.Error()) log.Error("", "register rpcChat to etcd failed ", err.Error())
panic(utils.Wrap(err, "register chat module rpc to etcd err")) panic(utils.Wrap(err, "register chat module m to etcd err"))
} }
go rpc.runCh() go m.runCh()
rpc.initPrometheus() m.initPrometheus()
err = srv.Serve(listener) err = srv.Serve(listener)
if err != nil { if err != nil {
log.Error("", "rpc rpcChat failed ", err.Error()) log.Error("", "m rpcChat failed ", err.Error())
return return
} }
log.Info("", "rpc rpcChat init success") log.Info("", "m rpcChat init success")
} }
func (rpc *rpcChat) runCh() { func (rpc *rpcChat) runCh() {

View File

@ -33,6 +33,7 @@ type MsgInterface interface {
GetUserMinSeq(ctx context.Context, userID string) (uint32, error) GetUserMinSeq(ctx context.Context, userID string) (uint32, error)
GetMessageListBySeq(ctx context.Context, userID string, seqs []uint32) ([]*sdkws.MsgData, error) GetMessageListBySeq(ctx context.Context, userID string, seqs []uint32) ([]*sdkws.MsgData, error)
GetSuperGroupMsg(ctx context.Context, groupID string, seq uint32) (*sdkws.MsgData, error)
} }
type MsgDatabaseInterface interface { type MsgDatabaseInterface interface {

View File

@ -24,8 +24,8 @@ type GroupMemberIDsHash struct {
userIDs []string userIDs []string
} }
func NewGroupMemberIDsLocalCache(client discoveryRegistry.SvcDiscoveryRegistry) GroupLocalCache { func NewGroupMemberIDsLocalCache(client discoveryRegistry.SvcDiscoveryRegistry) *GroupLocalCache {
return GroupLocalCache{ return &GroupLocalCache{
cache: make(map[string]GroupMemberIDsHash, 0), cache: make(map[string]GroupMemberIDsHash, 0),
client: client, client: client,
} }