diff --git a/cmd/Open-IM-SDK-Core b/cmd/Open-IM-SDK-Core index 992f76df0..1c6c7af53 160000 --- a/cmd/Open-IM-SDK-Core +++ b/cmd/Open-IM-SDK-Core @@ -1 +1 @@ -Subproject commit 992f76df0ee500a0377523b0780d3a85f2275755 +Subproject commit 1c6c7af5393b3e9eefbaf16b673519ca863a6c2c diff --git a/config/config.yaml b/config/config.yaml index 8eaec096c..7d025c863 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -132,7 +132,7 @@ log: storageLocation: ../logs/ rotationTime: 24 remainRotationCount: 5 #日志数量 - remainLogLevel: 6 #日志级别 6表示全都打印,测试阶段建议设置为6 + remainLogLevel: 4 #日志级别 6表示全都打印,测试阶段建议设置为6 elasticSearchSwitch: false elasticSearchAddr: [ 127.0.0.1:9201 ] elasticSearchUser: "" @@ -178,10 +178,13 @@ tokenpolicy: # Token effective time day as a unit accessExpire: 3650 #token过期时间(天) 默认即可 -messagecallback: #暂时不要使用 还需完善 +messagecallback: + callbackSwitch: false callbackUrl: "http://www.xxx.com/msg/judge" #TimeOut use second as unit callbackTimeOut: 10 +messagejudge: + isJudgeFriend: true # c2c: # callbackBeforeSendMsg: # switch: false @@ -255,7 +258,7 @@ notification: desc: "groupApplicationAccepted desc" ext: "groupApplicationAccepted ext" defaultTips: - tips: "allowed to join the group" # group info changed by xx + tips: "was allowed to join the group" # group info changed by xx groupApplicationRejected: conversation: @@ -267,7 +270,7 @@ notification: desc: " desc" ext: " ext" defaultTips: - tips: "rejected into the group" # group info changed by xx + tips: "was rejected into the group" # group info changed by xx groupOwnerTransferred: conversation: @@ -291,7 +294,7 @@ notification: desc: "memberKicked desc" ext: "memberKicked ext" defaultTips: - tips: "kicked out of the group" # group info changed by xx + tips: "was kicked out of the group" # group info changed by xx memberInvited: conversation: @@ -303,7 +306,7 @@ notification: desc: "memberInvited desc" ext: "memberInvited ext" defaultTips: - tips: "invited into the group" # group info changed by xx + tips: "was invited into the group" # group info changed by xx memberEnter: conversation: @@ -459,19 +462,20 @@ notification: #是否启动demo,如果自身没有账号体系,设置为true demoswitch: true demo: - openImDemoPort: [ 42233 ] #demo对外服务端口,默认即可,需要开放此端口或做nginx转发 + #demo对外服务端口,默认即可,需要开放此端口或做nginx转发 + openImDemoPort: [ 42233 ] alismsverify: #阿里云短信配置,在阿里云申请成功后修改以下四项,必须修改 - accessKeyId: LTAI5tJPkn4HuuePdiLdGqe71 - accessKeySecret: 4n9OJ7ZCVN1U6KeHDAtOyNeVZcjOuV1 - signName: OpenIM Corporation - verificationCodeTemplateCode: SMS_2268101641 + accessKeyId: LTAI5tJPkn4HuuePdiLdGqe7 + accessKeySecret: 4n9OJ7ZCVN1U6KeHDAtOyNeVZcjOuV + signName: 托云信息技术 + verificationCodeTemplateCode: SMS_226810164 superCode: 666666 #超级验证码,建议修改掉,收不到短信验证码时可以用此替代 # second - codeTTL: 60 + codeTTL: 300 mail: #仅支持qq邮箱,具体操作参考 https://service.mail.qq.com/cgi-bin/help?subtype=1&id=28&no=1001256 必须修改 title: "openIM" - senderMail: "1765567899@qq.com" - senderAuthorizationCode: "1gxyausfoevlzbfag" + senderMail: "765567899@qq.com" + senderAuthorizationCode: "gxyausfoevlzbfag" smtpAddr: "smtp.qq.com" smtpPort: 25 #需开放此端口 出口方向 diff --git a/docker-compose.yaml b/docker-compose.yaml index b8d99cc7d..5497faa43 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -89,7 +89,7 @@ services: command: /usr/local/bin/etcd --name etcd0 --data-dir /etcd-data --listen-client-urls http://0.0.0.0:2379 --advertise-client-urls http://0.0.0.0:2379 --listen-peer-urls http://0.0.0.0:2380 --initial-advertise-peer-urls http://0.0.0.0:2380 --initial-cluster etcd0=http://0.0.0.0:2380 --initial-cluster-token tkn --initial-cluster-state new open_im_server: - image: openim/open_im_server:v2.0.0 + image: openim/open_im_server:v2.0.1 container_name: open_im_server volumes: - ./logs:/Open-IM-Server/logs diff --git a/internal/api/manage/management_chat.go b/internal/api/manage/management_chat.go index f09b9acd7..8ff3e5906 100644 --- a/internal/api/manage/management_chat.go +++ b/internal/api/manage/management_chat.go @@ -7,12 +7,14 @@ package manage import ( + api "Open_IM/pkg/base_info" "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/log" "Open_IM/pkg/common/token_verify" "Open_IM/pkg/grpc-etcdv3/getcdv3" pbChat "Open_IM/pkg/proto/chat" + "Open_IM/pkg/proto/sdk_ws" open_im_sdk "Open_IM/pkg/proto/sdk_ws" "Open_IM/pkg/utils" "context" @@ -40,11 +42,13 @@ func newUserSendMsgReq(params *ManagementSendMsgReq) *pbChat.SendMsgReq { newContent = utils.StructToJsonString(params.Content) default: } - options := make(map[string]bool, 2) + var options map[string]bool if params.IsOnlineOnly { + options = make(map[string]bool, 5) utils.SetSwitchFromOptions(options, constant.IsOfflinePush, false) utils.SetSwitchFromOptions(options, constant.IsHistory, false) utils.SetSwitchFromOptions(options, constant.IsPersistent, false) + utils.SetSwitchFromOptions(options, constant.IsSenderSync, false) } pbData := pbChat.SendMsgReq{ OperationID: params.OperationID, @@ -152,20 +156,16 @@ func ManagementSendMsg(c *gin.Context) { log.Info("", "", "api ManagementSendMsg call, api call rpc...") - reply, err := client.SendMsg(context.Background(), pbData) + RpcResp, err := client.SendMsg(context.Background(), pbData) if err != nil { log.NewError(params.OperationID, "call delete UserSendMsg rpc server failed", err.Error()) c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "call UserSendMsg rpc server failed"}) return } - log.Info("", "", "api ManagementSendMsg call end..., [data: %s] [reply: %s]", pbData.String(), reply.String()) - - c.JSON(http.StatusOK, gin.H{ - "errCode": reply.ErrCode, - "errMsg": reply.ErrMsg, - "sendTime": reply.SendTime, - "msgID": reply.ClientMsgID, - }) + log.Info("", "", "api ManagementSendMsg call end..., [data: %s] [reply: %s]", pbData.String(), RpcResp.String()) + resp := api.ManagementSendMsgResp{CommResp: api.CommResp{ErrCode: RpcResp.ErrCode, ErrMsg: RpcResp.ErrMsg}, ResultList: server_api_params.UserSendMsgResp{ServerMsgID: RpcResp.ServerMsgID, ClientMsgID: RpcResp.ClientMsgID, SendTime: RpcResp.SendTime}} + log.Info(params.OperationID, "ManagementSendMsg return", resp) + c.JSON(http.StatusOK, resp) } diff --git a/internal/msg_gateway/gate/init.go b/internal/msg_gateway/gate/init.go index 03f82f3ad..47d1dcedb 100644 --- a/internal/msg_gateway/gate/init.go +++ b/internal/msg_gateway/gate/init.go @@ -3,6 +3,8 @@ package gate import ( "Open_IM/pkg/common/config" "Open_IM/pkg/common/log" + "Open_IM/pkg/statistics" + "fmt" "github.com/go-playground/validator/v10" "sync" ) @@ -12,6 +14,7 @@ var ( validate *validator.Validate ws WServer rpcSvr RPCServer + count uint64 ) func Init(rpcPort, wsPort int) { @@ -19,6 +22,7 @@ func Init(rpcPort, wsPort int) { log.NewPrivateLog(config.Config.ModuleName.LongConnSvrName) rwLock = new(sync.RWMutex) validate = validator.New() + statistics.NewStatistics(&count, config.Config.ModuleName.LongConnSvrName, fmt.Sprintf("%d second recv to msg_gateway count", count), 10) ws.onInit(wsPort) rpcSvr.onInit(rpcPort) } diff --git a/internal/msg_gateway/gate/logic.go b/internal/msg_gateway/gate/logic.go index d1756093c..2aa19f609 100644 --- a/internal/msg_gateway/gate/logic.go +++ b/internal/msg_gateway/gate/logic.go @@ -142,6 +142,7 @@ func (ws *WServer) pullMsgBySeqListResp(conn *UserConn, m *Req, pb *sdk_ws.PullM } func (ws *WServer) sendMsgReq(conn *UserConn, m *Req) { + count++ log.NewInfo(m.OperationID, "Ws call success to sendMsgReq start", m.MsgIncr, m.ReqIdentifier, m.SendID) nReply := new(pbChat.SendMsgResp) isPass, errCode, errMsg, pData := ws.argsValidate(m, constant.WSSendMsg) diff --git a/internal/msg_transfer/logic/history_msg_handler.go b/internal/msg_transfer/logic/history_msg_handler.go index b78695c11..294b5ed70 100644 --- a/internal/msg_transfer/logic/history_msg_handler.go +++ b/internal/msg_transfer/logic/history_msg_handler.go @@ -8,6 +8,7 @@ import ( "Open_IM/pkg/grpc-etcdv3/getcdv3" pbMsg "Open_IM/pkg/proto/chat" pbPush "Open_IM/pkg/proto/push" + "Open_IM/pkg/statistics" "Open_IM/pkg/utils" "context" "github.com/Shopify/sarama" @@ -20,9 +21,14 @@ type fcb func(msg []byte, msgKey string) type HistoryConsumerHandler struct { msgHandle map[string]fcb historyConsumerGroup *kfk.MConsumerGroup + singleMsgCount uint64 + groupMsgCount uint64 } func (mc *HistoryConsumerHandler) Init() { + statistics.NewStatistics(&mc.singleMsgCount, config.Config.ModuleName.MsgTransferName, "singleMsgCount insert to mongo ", 10) + statistics.NewStatistics(&mc.groupMsgCount, config.Config.ModuleName.MsgTransferName, "groupMsgCount insert to mongo ", 10) + mc.msgHandle = make(map[string]fcb) mc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = mc.handleChatWs2Mongo mc.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0, @@ -55,6 +61,7 @@ func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string) log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String()) return } + mc.singleMsgCount++ log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", utils.GetCurrentTimestampByNano()-time) } if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID { @@ -70,6 +77,7 @@ func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string) log.NewError(operationID, "group data insert to mongo err", msgFromMQ.String(), msgFromMQ.MsgData.RecvID, err.Error()) return } + mc.groupMsgCount++ } go sendMessageToPush(&msgFromMQ, msgFromMQ.MsgData.RecvID) default: diff --git a/internal/msg_transfer/logic/init.go b/internal/msg_transfer/logic/init.go index cce434d99..7d1cad671 100644 --- a/internal/msg_transfer/logic/init.go +++ b/internal/msg_transfer/logic/init.go @@ -20,6 +20,6 @@ func Init() { } func Run() { //register mysqlConsumerHandler to - go persistentCH.persistentConsumerGroup.RegisterHandleAndConsumer(&persistentCH) + //go persistentCH.persistentConsumerGroup.RegisterHandleAndConsumer(&persistentCH) go historyCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyCH) } diff --git a/internal/push/logic/init.go b/internal/push/logic/init.go index b8941cc6f..02d96fda4 100644 --- a/internal/push/logic/init.go +++ b/internal/push/logic/init.go @@ -11,6 +11,8 @@ import ( "Open_IM/pkg/common/constant" "Open_IM/pkg/common/kafka" "Open_IM/pkg/common/log" + "Open_IM/pkg/statistics" + "fmt" ) var ( @@ -18,6 +20,7 @@ var ( pushCh PushConsumerHandler pushTerminal []int32 producer *kafka.Producer + count uint64 ) func Init(rpcPort int) { @@ -28,6 +31,7 @@ func Init(rpcPort int) { } func init() { producer = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.Ws2mschat.Topic) + statistics.NewStatistics(&count, config.Config.ModuleName.PushName, fmt.Sprintf("%d second push to msg_gateway count", 10), 10) } func Run() { diff --git a/internal/push/logic/push_to_client.go b/internal/push/logic/push_to_client.go index 41c943c4b..b6a624f91 100644 --- a/internal/push/logic/push_to_client.go +++ b/internal/push/logic/push_to_client.go @@ -7,16 +7,12 @@ package logic import ( - push "Open_IM/internal/push/jpush" "Open_IM/pkg/common/config" - "Open_IM/pkg/common/constant" "Open_IM/pkg/common/log" "Open_IM/pkg/grpc-etcdv3/getcdv3" pbPush "Open_IM/pkg/proto/push" pbRelay "Open_IM/pkg/proto/relay" - "Open_IM/pkg/utils" "context" - "encoding/json" "strings" ) @@ -34,7 +30,7 @@ type AtContent struct { func MsgToUser(pushMsg *pbPush.PushMsgReq) { var wsResult []*pbRelay.SingleMsgToUser - isOfflinePush := utils.GetSwitchFromOptions(pushMsg.MsgData.Options, constant.IsOfflinePush) + //isOfflinePush := utils.GetSwitchFromOptions(pushMsg.MsgData.Options, constant.IsOfflinePush) log.InfoByKv("Get msg from msg_transfer And push msg", pushMsg.OperationID, "PushData", pushMsg.String()) grpcCons := getcdv3.GetConn4Unique(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOnlineMessageRelayName) //Online push message @@ -51,66 +47,67 @@ func MsgToUser(pushMsg *pbPush.PushMsgReq) { } } log.InfoByKv("push_result", pushMsg.OperationID, "result", wsResult, "sendData", pushMsg.MsgData) - if isOfflinePush && pushMsg.PushToUserID != pushMsg.MsgData.SendID { - for _, v := range wsResult { - if v.ResultCode == 0 { - continue - } - //supported terminal - for _, t := range pushTerminal { - if v.RecvPlatFormID == t { - //Use offline push messaging - var UIDList []string - UIDList = append(UIDList, v.RecvID) - customContent := OpenIMContent{ - SessionType: int(pushMsg.MsgData.SessionType), - From: pushMsg.MsgData.SendID, - To: pushMsg.MsgData.RecvID, - Seq: pushMsg.MsgData.Seq, - } - bCustomContent, _ := json.Marshal(customContent) - jsonCustomContent := string(bCustomContent) - var content string - if pushMsg.MsgData.OfflinePushInfo != nil { - content = pushMsg.MsgData.OfflinePushInfo.Title - - } else { - switch pushMsg.MsgData.ContentType { - case constant.Text: - content = constant.ContentType2PushContent[constant.Text] - case constant.Picture: - content = constant.ContentType2PushContent[constant.Picture] - case constant.Voice: - content = constant.ContentType2PushContent[constant.Voice] - case constant.Video: - content = constant.ContentType2PushContent[constant.Video] - case constant.File: - content = constant.ContentType2PushContent[constant.File] - case constant.AtText: - a := AtContent{} - _ = utils.JsonStringToStruct(string(pushMsg.MsgData.Content), &a) - if utils.IsContain(v.RecvID, a.AtUserList) { - content = constant.ContentType2PushContent[constant.AtText] + constant.ContentType2PushContent[constant.Common] - } else { - content = constant.ContentType2PushContent[constant.GroupMsg] - } - default: - content = constant.ContentType2PushContent[constant.Common] - } - } - - pushResult, err := push.JGAccountListPush(UIDList, content, jsonCustomContent, constant.PlatformIDToName(t)) - if err != nil { - log.NewError(pushMsg.OperationID, "offline push error", pushMsg.String(), err.Error(), constant.PlatformIDToName(t)) - } else { - log.NewDebug(pushMsg.OperationID, "offline push return result is ", string(pushResult), pushMsg.MsgData, constant.PlatformIDToName(t)) - } - - } - } - } - - } + count++ + //if isOfflinePush && pushMsg.PushToUserID != pushMsg.MsgData.SendID { + // for _, v := range wsResult { + // if v.ResultCode == 0 { + // continue + // } + // //supported terminal + // for _, t := range pushTerminal { + // if v.RecvPlatFormID == t { + // //Use offline push messaging + // var UIDList []string + // UIDList = append(UIDList, v.RecvID) + // customContent := OpenIMContent{ + // SessionType: int(pushMsg.MsgData.SessionType), + // From: pushMsg.MsgData.SendID, + // To: pushMsg.MsgData.RecvID, + // Seq: pushMsg.MsgData.Seq, + // } + // bCustomContent, _ := json.Marshal(customContent) + // jsonCustomContent := string(bCustomContent) + // var content string + // if pushMsg.MsgData.OfflinePushInfo != nil { + // content = pushMsg.MsgData.OfflinePushInfo.Title + // + // } else { + // switch pushMsg.MsgData.ContentType { + // case constant.Text: + // content = constant.ContentType2PushContent[constant.Text] + // case constant.Picture: + // content = constant.ContentType2PushContent[constant.Picture] + // case constant.Voice: + // content = constant.ContentType2PushContent[constant.Voice] + // case constant.Video: + // content = constant.ContentType2PushContent[constant.Video] + // case constant.File: + // content = constant.ContentType2PushContent[constant.File] + // case constant.AtText: + // a := AtContent{} + // _ = utils.JsonStringToStruct(string(pushMsg.MsgData.Content), &a) + // if utils.IsContain(v.RecvID, a.AtUserList) { + // content = constant.ContentType2PushContent[constant.AtText] + constant.ContentType2PushContent[constant.Common] + // } else { + // content = constant.ContentType2PushContent[constant.GroupMsg] + // } + // default: + // content = constant.ContentType2PushContent[constant.Common] + // } + // } + // + // pushResult, err := push.JGAccountListPush(UIDList, content, jsonCustomContent, constant.PlatformIDToName(t)) + // if err != nil { + // log.NewError(pushMsg.OperationID, "offline push error", pushMsg.String(), err.Error(), constant.PlatformIDToName(t)) + // } else { + // log.NewDebug(pushMsg.OperationID, "offline push return result is ", string(pushResult), pushMsg.MsgData, constant.PlatformIDToName(t)) + // } + // + // } + // } + // } + // + //} } //func SendMsgByWS(m *pbChat.WSToMsgSvrChatMsg) { diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index 114746da2..a57a6e892 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -245,7 +245,7 @@ func (rpc *rpcChat) sendMsgToKafka(m *pbChat.MsgDataToMQ, key string) error { } func GetMsgID(sendID string) string { t := time.Now().Format("2006-01-02 15:04:05") - return t + "-" + sendID + "-" + strconv.Itoa(rand.Int()) + return utils.Md5(t + "-" + sendID + "-" + strconv.Itoa(rand.Int())) } func returnMsg(replay *pbChat.SendMsgResp, pb *pbChat.SendMsgReq, errCode int32, errMsg, serverMsgID string, sendTime int64) (*pbChat.SendMsgResp, error) { diff --git a/pkg/base_info/manage_api_struct.go b/pkg/base_info/manage_api_struct.go index 61e2bbba4..4b2061b57 100644 --- a/pkg/base_info/manage_api_struct.go +++ b/pkg/base_info/manage_api_struct.go @@ -2,6 +2,7 @@ package base_info import ( pbRelay "Open_IM/pkg/proto/relay" + "Open_IM/pkg/proto/sdk_ws" pbUser "Open_IM/pkg/proto/user" ) @@ -36,3 +37,8 @@ type AccountCheckResp struct { CommResp ResultList []*pbUser.AccountCheckResp_SingleUserStatus `json:"data"` } + +type ManagementSendMsgResp struct { + CommResp + ResultList server_api_params.UserSendMsgResp `json:"data"` +} diff --git a/pkg/base_info/user_api_struct.go b/pkg/base_info/user_api_struct.go index f4be785f2..10f20376c 100644 --- a/pkg/base_info/user_api_struct.go +++ b/pkg/base_info/user_api_struct.go @@ -10,8 +10,8 @@ type GetUsersInfoReq struct { } type GetUsersInfoResp struct { CommResp - UserInfoList []*open_im_sdk.PublicUserInfo - Data []map[string]interface{} `json:"data"` + UserInfoList []*open_im_sdk.PublicUserInfo `json:"-"` + Data []map[string]interface{} `json:"data"` } type UpdateSelfUserInfoReq struct { diff --git a/pkg/common/db/mysql_model/im_mysql_model/demo_model.go b/pkg/common/db/mysql_model/im_mysql_model/demo_model.go index a10218828..897e5e212 100644 --- a/pkg/common/db/mysql_model/im_mysql_model/demo_model.go +++ b/pkg/common/db/mysql_model/im_mysql_model/demo_model.go @@ -11,7 +11,7 @@ func GetRegister(account string) (*db.Register, error) { return nil, err } var r db.Register - return &r, dbConn.Debug().Table("registers").Where("account = ?", + return &r, dbConn.Table("registers").Where("account = ?", account).Take(&r).Error } diff --git a/pkg/common/db/mysql_model/im_mysql_model/friend_model.go b/pkg/common/db/mysql_model/im_mysql_model/friend_model.go index 45650357b..700082353 100644 --- a/pkg/common/db/mysql_model/im_mysql_model/friend_model.go +++ b/pkg/common/db/mysql_model/im_mysql_model/friend_model.go @@ -26,7 +26,7 @@ func GetFriendRelationshipFromFriend(OwnerUserID, FriendUserID string) (*db.Frie return nil, err } var friend db.Friend - err = dbConn.Table("friends").Where("owner_user_id=? and friend_user_id=?", OwnerUserID, FriendUserID).Find(&friend).Error + err = dbConn.Table("friends").Where("owner_user_id=? and friend_user_id=?", OwnerUserID, FriendUserID).Take(&friend).Error if err != nil { return nil, err } diff --git a/pkg/common/db/mysql_model/im_mysql_model/friend_request_model.go b/pkg/common/db/mysql_model/im_mysql_model/friend_request_model.go index 609be8687..171a6aedf 100644 --- a/pkg/common/db/mysql_model/im_mysql_model/friend_request_model.go +++ b/pkg/common/db/mysql_model/im_mysql_model/friend_request_model.go @@ -53,7 +53,7 @@ func GetFriendApplicationByBothUserID(FromUserID, ToUserID string) (*db.FriendRe return nil, err } var friendRequest db.FriendRequest - err = dbConn.Table("friend_requests").Where("from_user_id=? and to_user_id=?", FromUserID, ToUserID).Find(&friendRequest).Error + err = dbConn.Table("friend_requests").Where("from_user_id=? and to_user_id=?", FromUserID, ToUserID).Take(&friendRequest).Error if err != nil { return nil, err } diff --git a/pkg/common/db/mysql_model/im_mysql_model/group_member_model.go b/pkg/common/db/mysql_model/im_mysql_model/group_member_model.go index 4dafa30a6..63f12c115 100644 --- a/pkg/common/db/mysql_model/im_mysql_model/group_member_model.go +++ b/pkg/common/db/mysql_model/im_mysql_model/group_member_model.go @@ -4,6 +4,7 @@ import ( "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db" "Open_IM/pkg/utils" + "errors" "fmt" "time" ) @@ -43,6 +44,7 @@ func GetGroupMemberListByUserID(userID string) ([]db.GroupMember, error) { } var groupMemberList []db.GroupMember err = dbConn.Table("group_members").Where("user_id=?", userID).Find(&groupMemberList).Error + //err = dbConn.Table("group_members").Where("user_id=?", userID).Take(&groupMemberList).Error if err != nil { return nil, err } @@ -82,7 +84,7 @@ func GetGroupMemberInfoByGroupIDAndUserID(groupID, userID string) (*db.GroupMemb return nil, err } var groupMember db.GroupMember - err = dbConn.Table("group_members").Where("group_id=? and user_id=? ", groupID, userID).Limit(1).Find(&groupMember).Error + err = dbConn.Table("group_members").Where("group_id=? and user_id=? ", groupID, userID).Limit(1).Take(&groupMember).Error if err != nil { return nil, err } @@ -149,7 +151,8 @@ func GetGroupOwnerInfoByGroupID(groupID string) (*db.GroupMember, error) { return &v, nil } } - return nil, nil + + return nil, utils.Wrap(errors.New("no owner"), "") } func IsExistGroupMember(groupID, userID string) bool { diff --git a/pkg/common/db/mysql_model/im_mysql_model/group_model.go b/pkg/common/db/mysql_model/im_mysql_model/group_model.go index a9fe13236..587e3672e 100644 --- a/pkg/common/db/mysql_model/im_mysql_model/group_model.go +++ b/pkg/common/db/mysql_model/im_mysql_model/group_model.go @@ -45,7 +45,7 @@ func GetGroupInfoByGroupID(groupId string) (*db.Group, error) { return nil, utils.Wrap(err, "") } var groupInfo db.Group - err = dbConn.Table("groups").Where("group_id=?", groupId).Find(&groupInfo).Error + err = dbConn.Table("groups").Where("group_id=?", groupId).Take(&groupInfo).Error if err != nil { return nil, err } @@ -86,11 +86,10 @@ func GetGroups(pageNumber, showNumber int) ([]db.Group, error) { return groups, nil } - func OperateGroupStatus(groupId string, groupStatus int32) error { group := db.Group{ GroupID: groupId, - Status: groupStatus, + Status: groupStatus, } if err := SetGroupInfo(group); err != nil { return err @@ -98,7 +97,6 @@ func OperateGroupStatus(groupId string, groupStatus int32) error { return nil } - func DeleteGroup(groupId string) error { dbConn, err := db.DB.MysqlDB.DefaultGormDB() if err != nil { @@ -129,15 +127,14 @@ func OperateGroupRole(userId, groupId string, roleLevel int32) (string, string, updateInfo := db.GroupMember{ RoleLevel: roleLevel, } - groupMaster := db.GroupMember{ - } + groupMaster := db.GroupMember{} switch roleLevel { case constant.GroupOwner: err = dbConn.Transaction(func(tx *gorm.DB) error { result := dbConn.Table("group_members").Where("group_id = ? and role_level = ?", groupId, constant.GroupOwner).First(&groupMaster).Update(&db.GroupMember{ RoleLevel: constant.GroupOrdinaryUsers, }) - if result.Error != nil { + if result.Error != nil { return result.Error } if result.RowsAffected == 0 { @@ -145,7 +142,7 @@ func OperateGroupRole(userId, groupId string, roleLevel int32) (string, string, } result = dbConn.Table("group_members").First(&groupMember).Update(updateInfo) - if result.Error != nil { + if result.Error != nil { return result.Error } if result.RowsAffected == 0 { @@ -161,7 +158,7 @@ func OperateGroupRole(userId, groupId string, roleLevel int32) (string, string, return result.Error } if result.RowsAffected == 0 { - return errors.New(fmt.Sprintf("user %s not exist in group %s or already operate", userId, groupId)) + return errors.New(fmt.Sprintf("user %s not exist in group %s or already operate", userId, groupId)) } if groupMaster.UserID == userId { return errors.New(fmt.Sprintf("user %s is master of %s, cant set to ordinary user", userId, groupId)) @@ -171,7 +168,7 @@ func OperateGroupRole(userId, groupId string, roleLevel int32) (string, string, return result.Error } if result.RowsAffected == 0 { - return errors.New(fmt.Sprintf("user %s not exist in group %s or already operate", userId, groupId)) + return errors.New(fmt.Sprintf("user %s not exist in group %s or already operate", userId, groupId)) } } return nil @@ -219,4 +216,4 @@ func GetGroupMaster(groupId string) (db.GroupMember, error) { return groupMember, err } return groupMember, nil -} \ No newline at end of file +} diff --git a/pkg/common/db/mysql_model/im_mysql_model/group_request_model.go b/pkg/common/db/mysql_model/im_mysql_model/group_request_model.go index 745ad0034..836f3f0bf 100644 --- a/pkg/common/db/mysql_model/im_mysql_model/group_request_model.go +++ b/pkg/common/db/mysql_model/im_mysql_model/group_request_model.go @@ -63,7 +63,7 @@ func GetGroupRequestByGroupIDAndUserID(groupID, userID string) (*db.GroupRequest return nil, err } var groupRequest db.GroupRequest - err = dbConn.Table("group_requests").Where("user_id=? and group_id=?", userID, groupID).Find(&groupRequest).Error + err = dbConn.Table("group_requests").Where("user_id=? and group_id=?", userID, groupID).Take(&groupRequest).Error if err != nil { return nil, err } @@ -128,7 +128,6 @@ func GetUserReqGroupByUserID(userID string) ([]db.GroupRequest, error) { return groupRequestList, err } - // //func GroupApplicationResponse(pb *group.GroupApplicationResponseReq) (*group.CommonResp, error) { // diff --git a/pkg/common/db/mysql_model/im_mysql_model/user_model.go b/pkg/common/db/mysql_model/im_mysql_model/user_model.go index de5045855..7492d6a38 100644 --- a/pkg/common/db/mysql_model/im_mysql_model/user_model.go +++ b/pkg/common/db/mysql_model/im_mysql_model/user_model.go @@ -51,18 +51,6 @@ func UserRegister(user db.User) error { return nil } -type User struct { - UserID string `gorm:"column:user_id;primaryKey;"` - Nickname string `gorm:"column:name"` - FaceUrl string `gorm:"column:icon"` - Gender int32 `gorm:"column:gender"` - PhoneNumber string `gorm:"column:phone_number"` - Birth string `gorm:"column:birth"` - Email string `gorm:"column:email"` - Ex string `gorm:"column:ex"` - CreateTime time.Time `gorm:"column:create_time"` -} - func DeleteUser(userID string) (i int64) { dbConn, err := db.DB.MysqlDB.DefaultGormDB() if err != nil { @@ -78,7 +66,7 @@ func GetUserByUserID(userID string) (*db.User, error) { return nil, err } var user db.User - err = dbConn.Table("users").Where("user_id=?", userID).First(&user).Error + err = dbConn.Table("users").Where("user_id=?", userID).Take(&user).Error if err != nil { return nil, err } diff --git a/pkg/grpc-etcdv3/getcdv3/register.go b/pkg/grpc-etcdv3/getcdv3/register.go index 4fd2cd5a7..d1c7957bf 100644 --- a/pkg/grpc-etcdv3/getcdv3/register.go +++ b/pkg/grpc-etcdv3/getcdv3/register.go @@ -1,6 +1,7 @@ package getcdv3 import ( + "Open_IM/pkg/common/log" "context" "fmt" "go.etcd.io/etcd/clientv3" @@ -39,9 +40,9 @@ func RegisterEtcd4Unique(schema, etcdAddr, myHost string, myPort int, serviceNam func RegisterEtcd(schema, etcdAddr, myHost string, myPort int, serviceName string, ttl int) error { cli, err := clientv3.New(clientv3.Config{ Endpoints: strings.Split(etcdAddr, ","), DialTimeout: 5 * time.Second}) - fmt.Println("RegisterEtcd") + + log.Info("", "RegisterEtcd, ", schema, etcdAddr, myHost, myPort, serviceName, ttl) if err != nil { - // return fmt.Errorf("grpclb: create clientv3 client failed: %v", err) return fmt.Errorf("create etcd clientv3 client failed, errmsg:%v, etcd addr:%s", err, etcdAddr) } @@ -66,15 +67,16 @@ func RegisterEtcd(schema, etcdAddr, myHost string, myPort int, serviceName strin if err != nil { return fmt.Errorf("keepalive failed, errmsg:%v, lease id:%d", err, resp.ID) } - fmt.Println("RegisterEtcd ok") + log.Info("", "RegisterEtcd ok ") + go func() { for { select { case v, ok := <-kresp: if ok == true { - // fmt.Println(" kresp ok ", v) + log.Debug("", "KeepAlive kresp ok", v, schema, etcdAddr, myHost, myPort, serviceName, ttl) } else { - fmt.Println(" kresp failed ", v) + log.Error("", "KeepAlive kresp failed", schema, etcdAddr, myHost, myPort, serviceName, ttl) } } } diff --git a/pkg/grpc-etcdv3/getcdv3/resolver.go b/pkg/grpc-etcdv3/getcdv3/resolver.go index aa41f86c4..45f47a717 100644 --- a/pkg/grpc-etcdv3/getcdv3/resolver.go +++ b/pkg/grpc-etcdv3/getcdv3/resolver.go @@ -1,6 +1,7 @@ package getcdv3 import ( + "Open_IM/pkg/common/log" "context" "fmt" "go.etcd.io/etcd/clientv3" @@ -102,7 +103,7 @@ func (r *Resolver) Build(target resolver.Target, cc resolver.ClientConn, opts re if err == nil { var addrList []resolver.Address for i := range resp.Kvs { - fmt.Println("init addr: ", string(resp.Kvs[i].Value)) + log.Debug("", "init addr: ", string(resp.Kvs[i].Value)) addrList = append(addrList, resolver.Address{Addr: string(resp.Kvs[i].Value)}) } r.cc.UpdateState(resolver.State{Addresses: addrList}) @@ -148,27 +149,27 @@ func (r *Resolver) watch(prefix string, addrList []resolver.Address) { if !exists(addrList, string(ev.Kv.Value)) { flag = 1 addrList = append(addrList, resolver.Address{Addr: string(ev.Kv.Value)}) - fmt.Println("after add, new list: ", addrList) + log.Debug("", "after add, new list: ", addrList) } case mvccpb.DELETE: - fmt.Println("remove addr key: ", string(ev.Kv.Key), "value:", string(ev.Kv.Value)) + log.Debug("remove addr key: ", string(ev.Kv.Key), "value:", string(ev.Kv.Value)) i := strings.LastIndexAny(string(ev.Kv.Key), "/") if i < 0 { return } t := string(ev.Kv.Key)[i+1:] - fmt.Println("remove addr key: ", string(ev.Kv.Key), "value:", string(ev.Kv.Value), "addr:", t) + log.Debug("remove addr key: ", string(ev.Kv.Key), "value:", string(ev.Kv.Value), "addr:", t) if s, ok := remove(addrList, t); ok { flag = 1 addrList = s - fmt.Println("after remove, new list: ", addrList) + log.Debug("after remove, new list: ", addrList) } } } if flag == 1 { r.cc.UpdateState(resolver.State{Addresses: addrList}) - fmt.Println("update: ", addrList) + log.Debug("update: ", addrList) } } } @@ -176,7 +177,7 @@ func (r *Resolver) watch(prefix string, addrList []resolver.Address) { func GetConn4Unique(schema, etcdaddr, servicename string) []*grpc.ClientConn { gEtcdCli, err := clientv3.New(clientv3.Config{Endpoints: strings.Split(etcdaddr, ",")}) if err != nil { - fmt.Println("eeeeeeeeeeeee", err.Error()) + log.Error("clientv3.New failed", err.Error()) return nil } @@ -200,7 +201,7 @@ func GetConn4Unique(schema, etcdaddr, servicename string) []*grpc.ClientConn { } } else { gEtcdCli.Close() - fmt.Println("rrrrrrrrrrr", err.Error()) + log.Error("gEtcdCli.Get failed", err.Error()) return nil } gEtcdCli.Close() @@ -236,7 +237,7 @@ func GetConnPool(schema, etcdaddr, servicename string) (*ClientConn, error) { ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(1000*time.Millisecond)) c, err := p.Get(ctx) - fmt.Println(err) + log.Info("", "Get ", err) return c, err } diff --git a/pkg/statistics/statistics.go b/pkg/statistics/statistics.go new file mode 100644 index 000000000..bda8ded85 --- /dev/null +++ b/pkg/statistics/statistics.go @@ -0,0 +1,33 @@ +package statistics + +import ( + "Open_IM/pkg/common/log" + "time" +) + +type Statistics struct { + Count *uint64 + ModuleName string + PrintArgs string + SleepTime int +} + +func (s *Statistics) output() { + t := time.NewTicker(time.Duration(s.SleepTime) * time.Second) + defer t.Stop() + var sum uint64 + for { + sum = *s.Count + select { + case <-t.C: + } + log.NewWarn(s.ModuleName, s.PrintArgs, *s.Count-sum) + + } +} + +func NewStatistics(count *uint64, moduleName, printArgs string, sleepTime int) *Statistics { + p := &Statistics{Count: count, ModuleName: moduleName, SleepTime: sleepTime, PrintArgs: printArgs} + go p.output() + return p +}