diff --git a/cmd/Open-IM-SDK-Core b/cmd/Open-IM-SDK-Core index 5e8d3f536..1667b0f4e 160000 --- a/cmd/Open-IM-SDK-Core +++ b/cmd/Open-IM-SDK-Core @@ -1 +1 @@ -Subproject commit 5e8d3f5366700f00db7db2905da27189b9353630 +Subproject commit 1667b0f4e205fc4ed7c690ab55b662087d61c277 diff --git a/config/config.yaml b/config/config.yaml index 156b5a94e..55a96f0b8 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -17,9 +17,9 @@ mysql: dbMysqlDatabaseName: openIM_v2 #默认即可 dbTableName: eMsg #默认即可 dbMsgTableNum: 1 - dbMaxOpenConns: 2000 - dbMaxIdleConns: 100 - dbMaxLifeTime: 3600 + dbMaxOpenConns: 100 + dbMaxIdleConns: 10 + dbMaxLifeTime: 5 mongo: dbUri: ""#当dbUri值不为空则直接使用该值 diff --git a/internal/msg_gateway/gate/logic.go b/internal/msg_gateway/gate/logic.go index a6f3a1974..2ee9be712 100644 --- a/internal/msg_gateway/gate/logic.go +++ b/internal/msg_gateway/gate/logic.go @@ -153,6 +153,7 @@ func (ws *WServer) pullMsgBySeqListReq(conn *UserConn, m *Req) { ws.pullMsgBySeqListResp(conn, m, nReply) } } + func (ws *WServer) pullMsgBySeqListResp(conn *UserConn, m *Req, pb *sdk_ws.PullMessageBySeqListResp) { log.NewInfo(m.OperationID, "pullMsgBySeqListResp come here ", pb.String()) c, _ := proto.Marshal(pb) @@ -166,10 +167,9 @@ func (ws *WServer) pullMsgBySeqListResp(conn *UserConn, m *Req, pb *sdk_ws.PullM } log.NewInfo(m.OperationID, "pullMsgBySeqListResp all data is ", mReply.ReqIdentifier, mReply.MsgIncr, mReply.ErrCode, mReply.ErrMsg, len(mReply.Data)) - ws.sendMsg(conn, mReply) - } + func (ws *WServer) sendMsgReq(conn *UserConn, m *Req) { sendMsgAllCountLock.Lock() sendMsgAllCount++ diff --git a/internal/msg_gateway/gate/ws_server.go b/internal/msg_gateway/gate/ws_server.go index 29ec4f8ea..f631a6c33 100644 --- a/internal/msg_gateway/gate/ws_server.go +++ b/internal/msg_gateway/gate/ws_server.go @@ -138,7 +138,9 @@ func (ws *WServer) MultiTerminalLoginRemoteChecker(userID string, platformID int } if resp.ErrCode != 0 { log.Error(operationID, "MultiTerminalLoginCheck errCode, errMsg: ", resp.ErrCode, resp.ErrMsg) + continue } + log.Debug(operationID, "MultiTerminalLoginCheck resp ", resp.String()) } } diff --git a/internal/push/logic/init.go b/internal/push/logic/init.go index 6d6229dca..bfe5e1187 100644 --- a/internal/push/logic/init.go +++ b/internal/push/logic/init.go @@ -28,10 +28,10 @@ var ( ) func Init(rpcPort int) { - rpcServer.Init(rpcPort) pushCh.Init() pushTerminal = []int32{constant.IOSPlatformID, constant.AndroidPlatformID} + } func init() { producer = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.Ws2mschat.Topic) diff --git a/internal/push/logic/push_to_client.go b/internal/push/logic/push_to_client.go index 0e34ca5d9..b178d49ea 100644 --- a/internal/push/logic/push_to_client.go +++ b/internal/push/logic/push_to_client.go @@ -8,12 +8,12 @@ package logic import ( "Open_IM/internal/push" + utils2 "Open_IM/internal/utils" "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db" "Open_IM/pkg/common/log" "Open_IM/pkg/grpc-etcdv3/getcdv3" - pbCache "Open_IM/pkg/proto/cache" pbPush "Open_IM/pkg/proto/push" pbRelay "Open_IM/pkg/proto/relay" pbRtc "Open_IM/pkg/proto/rtc" @@ -21,9 +21,8 @@ import ( "Open_IM/pkg/utils" "context" "encoding/json" - "strings" - "github.com/golang/protobuf/proto" + "strings" ) type OpenIMContent struct { @@ -176,24 +175,30 @@ func MsgToSuperGroupUser(pushMsg *pbPush.PushMsgReq) { log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "callback userIDList Resp", pushToUserIDList) } if len(pushToUserIDList) == 0 { - getGroupMemberIDListFromCacheReq := &pbCache.GetGroupMemberIDListFromCacheReq{OperationID: pushMsg.OperationID, GroupID: pushMsg.MsgData.GroupID} - etcdConn := getcdv3.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName, pushMsg.OperationID) - if etcdConn == nil { - errMsg := pushMsg.OperationID + "getcdv3.GetDefaultConn == nil" - log.NewError(pushMsg.OperationID, errMsg) - return - } - client := pbCache.NewCacheClient(etcdConn) - cacheResp, err := client.GetGroupMemberIDListFromCache(context.Background(), getGroupMemberIDListFromCacheReq) + userIDList, err := utils2.GetGroupMemberUserIDList(pushMsg.MsgData.GroupID, pushMsg.OperationID) if err != nil { - log.NewError(pushMsg.OperationID, "GetGroupMemberIDListFromCache rpc call failed ", err.Error()) + log.Error(pushMsg.OperationID, "GetGroupMemberUserIDList failed ", err.Error(), pushMsg.MsgData.GroupID) return } - if cacheResp.CommonResp.ErrCode != 0 { - log.NewError(pushMsg.OperationID, "GetGroupMemberIDListFromCache rpc logic call failed ", cacheResp.String()) - return - } - pushToUserIDList = cacheResp.UserIDList + pushToUserIDList = userIDList + //getGroupMemberIDListFromCacheReq := &pbCache.GetGroupMemberIDListFromCacheReq{OperationID: pushMsg.OperationID, GroupID: pushMsg.MsgData.GroupID} + //etcdConn := getcdv3.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName, pushMsg.OperationID) + //if etcdConn == nil { + // errMsg := pushMsg.OperationID + "getcdv3.GetDefaultConn == nil" + // log.NewError(pushMsg.OperationID, errMsg) + // return + //} + //client := pbCache.NewCacheClient(etcdConn) + //cacheResp, err := client.GetGroupMemberIDListFromCache(context.Background(), getGroupMemberIDListFromCacheReq) + //if err != nil { + // log.NewError(pushMsg.OperationID, "GetGroupMemberIDListFromCache rpc call failed ", err.Error()) + // return + //} + //if cacheResp.CommonResp.ErrCode != 0 { + // log.NewError(pushMsg.OperationID, "GetGroupMemberIDListFromCache rpc logic call failed ", cacheResp.String()) + // return + //} + //pushToUserIDList = cacheResp.UserIDList } grpcCons := getcdv3.GetDefaultGatewayConn4Unique(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), pushMsg.OperationID) diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index ab39a3662..02d8a163e 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -1,6 +1,7 @@ package msg import ( + utils2 "Open_IM/internal/utils" "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db" @@ -9,7 +10,6 @@ import ( "Open_IM/pkg/common/token_verify" "Open_IM/pkg/grpc-etcdv3/getcdv3" cacheRpc "Open_IM/pkg/proto/cache" - pbCache "Open_IM/pkg/proto/cache" pbConversation "Open_IM/pkg/proto/conversation" pbChat "Open_IM/pkg/proto/msg" pbRelay "Open_IM/pkg/proto/relay" @@ -139,36 +139,45 @@ func messageVerification(data *pbChat.SendMsgReq) (bool, int32, string, []string if groupInfo.GroupType == constant.SuperGroup { return true, 0, "", nil } else { - getGroupMemberIDListFromCacheReq := &pbCache.GetGroupMemberIDListFromCacheReq{OperationID: data.OperationID, GroupID: data.MsgData.GroupID} - etcdConn := getcdv3.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName, data.OperationID) - if etcdConn == nil { - errMsg := data.OperationID + "getcdv3.GetDefaultConn == nil" + userIDList, err := utils2.GetGroupMemberUserIDList(data.MsgData.GroupID, data.OperationID) + if err != nil { + errMsg := data.OperationID + err.Error() log.NewError(data.OperationID, errMsg) - //return returnMsg(&replay, pb, 201, errMsg, "", 0) return false, 201, errMsg, nil } - client := pbCache.NewCacheClient(etcdConn) - cacheResp, err := client.GetGroupMemberIDListFromCache(context.Background(), getGroupMemberIDListFromCacheReq) - if err != nil { - log.NewError(data.OperationID, "GetGroupMemberIDListFromCache rpc call failed ", err.Error()) - //return returnMsg(&replay, pb, 201, "GetGroupMemberIDListFromCache failed", "", 0) - return false, 201, err.Error(), nil - } - if cacheResp.CommonResp.ErrCode != 0 { - log.NewError(data.OperationID, "GetGroupMemberIDListFromCache rpc logic call failed ", cacheResp.String()) - //return returnMsg(&replay, pb, 201, "GetGroupMemberIDListFromCache logic failed", "", 0) - return false, cacheResp.CommonResp.ErrCode, cacheResp.CommonResp.ErrMsg, nil - } + + // + //getGroupMemberIDListFromCacheReq := &pbCache.GetGroupMemberIDListFromCacheReq{OperationID: data.OperationID, GroupID: data.MsgData.GroupID} + //etcdConn := getcdv3.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName, data.OperationID) + //if etcdConn == nil { + // errMsg := data.OperationID + "getcdv3.GetDefaultConn == nil" + // log.NewError(data.OperationID, errMsg) + // return false, 201, errMsg, nil + //} + //client := pbCache.NewCacheClient(etcdConn) + // cacheResp, err := client.GetGroupMemberIDListFromCache(context.Background(), getGroupMemberIDListFromCacheReq) + // + // + //if err != nil { + // log.NewError(data.OperationID, "GetGroupMemberIDListFromCache rpc call failed ", err.Error()) + // //return returnMsg(&replay, pb, 201, "GetGroupMemberIDListFromCache failed", "", 0) + // return false, 201, err.Error(), nil + //} + //if cacheResp.CommonResp.ErrCode != 0 { + // log.NewError(data.OperationID, "GetGroupMemberIDListFromCache rpc logic call failed ", cacheResp.String()) + // //return returnMsg(&replay, pb, 201, "GetGroupMemberIDListFromCache logic failed", "", 0) + // return false, cacheResp.CommonResp.ErrCode, cacheResp.CommonResp.ErrMsg, nil + //} if !token_verify.IsManagerUserID(data.MsgData.SendID) { if data.MsgData.ContentType <= constant.NotificationEnd && data.MsgData.ContentType >= constant.NotificationBegin { - return true, 0, "", cacheResp.UserIDList + return true, 0, "", userIDList } - if !utils.IsContain(data.MsgData.SendID, cacheResp.UserIDList) { + if !utils.IsContain(data.MsgData.SendID, userIDList) { //return returnMsg(&replay, pb, 202, "you are not in group", "", 0) return false, 202, "you are not in group", nil } } - return true, 0, "", cacheResp.UserIDList + return true, 0, "", userIDList } default: return true, 0, "", nil diff --git a/internal/utils/local_cache.go b/internal/utils/local_cache.go new file mode 100644 index 000000000..3be1226a1 --- /dev/null +++ b/internal/utils/local_cache.go @@ -0,0 +1,75 @@ +package utils + +import ( + "Open_IM/pkg/common/config" + rocksCache "Open_IM/pkg/common/db/rocks_cache" + "Open_IM/pkg/common/log" + "Open_IM/pkg/grpc-etcdv3/getcdv3" + pbCache "Open_IM/pkg/proto/cache" + "Open_IM/pkg/utils" + "context" + "errors" + "strings" + "sync" +) + +type GroupMemberUserIDListHash struct { + MemberListHash uint64 + UserIDList []string +} + +var CacheGroupMemberUserIDList map[string]*GroupMemberUserIDListHash = make(map[string]*GroupMemberUserIDListHash, 0) +var CacheGroupMtx sync.RWMutex + +func GetGroupMemberUserIDList(groupID string, operationID string) ([]string, error) { + groupHashRemote, err := GetGroupMemberUserIDListHashFromRemote(groupID) + if err != nil { + CacheGroupMtx.Lock() + defer CacheGroupMtx.Unlock() + delete(CacheGroupMemberUserIDList, groupID) + log.Error(operationID, "GetGroupMemberUserIDListHashFromRemote failed ", err.Error(), groupID) + return nil, utils.Wrap(err, groupID) + } + + CacheGroupMtx.Lock() + defer CacheGroupMtx.Unlock() + groupInLocalCache, ok := CacheGroupMemberUserIDList[groupID] + if ok && groupInLocalCache.MemberListHash == groupHashRemote { + log.Debug(operationID, "in local cache ", groupID) + return groupInLocalCache.UserIDList, nil + } + log.Debug(operationID, "not in local cache or hash changed", groupID, " remote hash ", groupHashRemote, " in cache ", ok) + memberUserIDListRemote, err := GetGroupMemberUserIDListFromRemote(groupID, operationID) + if err != nil { + log.Error(operationID, "GetGroupMemberUserIDListFromRemote failed ", err.Error(), groupID) + return nil, utils.Wrap(err, groupID) + } + CacheGroupMemberUserIDList[groupID] = &GroupMemberUserIDListHash{MemberListHash: groupHashRemote, UserIDList: memberUserIDListRemote} + return memberUserIDListRemote, nil +} + +func GetGroupMemberUserIDListHashFromRemote(groupID string) (uint64, error) { + return rocksCache.GetGroupMemberListHashFromCache(groupID) +} + +func GetGroupMemberUserIDListFromRemote(groupID string, operationID string) ([]string, error) { + getGroupMemberIDListFromCacheReq := &pbCache.GetGroupMemberIDListFromCacheReq{OperationID: operationID, GroupID: groupID} + etcdConn := getcdv3.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName, operationID) + if etcdConn == nil { + errMsg := operationID + "getcdv3.GetDefaultConn == nil" + log.NewError(operationID, errMsg) + return nil, errors.New("errMsg") + } + client := pbCache.NewCacheClient(etcdConn) + cacheResp, err := client.GetGroupMemberIDListFromCache(context.Background(), getGroupMemberIDListFromCacheReq) + if err != nil { + log.NewError(operationID, "GetGroupMemberIDListFromCache rpc call failed ", err.Error()) + return nil, utils.Wrap(err, "GetGroupMemberIDListFromCache rpc call failed") + } + if cacheResp.CommonResp.ErrCode != 0 { + errMsg := operationID + "GetGroupMemberIDListFromCache rpc logic call failed " + cacheResp.CommonResp.ErrMsg + log.NewError(operationID, errMsg) + return nil, errors.New("errMsg") + } + return cacheResp.UserIDList, nil +}