diff --git a/cmd/Open-IM-SDK-Core b/cmd/Open-IM-SDK-Core index e43ec7d42..855e89361 160000 --- a/cmd/Open-IM-SDK-Core +++ b/cmd/Open-IM-SDK-Core @@ -1 +1 @@ -Subproject commit e43ec7d427a84702eea7a6aaa358a7a0a809019d +Subproject commit 855e893610c905e3105484c3519613b993301bd2 diff --git a/cmd/test/main.go b/cmd/test/main.go new file mode 100644 index 000000000..87d5165f7 --- /dev/null +++ b/cmd/test/main.go @@ -0,0 +1,69 @@ +package main + +import ( + "Open_IM/pkg/utils" + "context" + "fmt" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "time" +) + +type MongoMsg struct { + UID string + Msg []string +} + + +func main() { + //"mongodb://%s:%s@%s/%s/?maxPoolSize=%d" + uri := "mongodb://user:pass@sample.host:27017/?maxPoolSize=20&w=majority" + DBAddress := "127.0.0.1:37017" + DBDatabase := "new-test-db" + Collection := "new-test-collection" + DBMaxPoolSize := 100 + uri = fmt.Sprintf("mongodb://%s/%s/?maxPoolSize=%d", + DBAddress,DBDatabase, + DBMaxPoolSize) + + mongoClient, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(uri)) + if err != nil { + panic(err) + } + filter := bson.M{"uid":"my_uid"} + ctx, _ := context.WithTimeout(context.Background(), 30*time.Second) + for i:=0; i < 2; i++{ + + if err = mongoClient.Database(DBDatabase).Collection(Collection).FindOneAndUpdate(ctx, filter, + bson.M{"$push": bson.M{"msg": utils.Int32ToString(int32(i))}}).Err(); err != nil{ + fmt.Println("FindOneAndUpdate failed ", i, ) + var mmsg MongoMsg + mmsg.UID = "my_uid" + mmsg.Msg = append(mmsg.Msg, utils.Int32ToString(int32(i))) + _, err := mongoClient.Database(DBDatabase).Collection(Collection).InsertOne(ctx, &mmsg) + if err != nil { + fmt.Println("insertone failed ", err.Error(), i) + } else{ + fmt.Println("insertone ok ", i) + } + + }else { + fmt.Println("FindOneAndUpdate ok ", i) + } + + } + + var mmsg MongoMsg + + if err = mongoClient.Database(DBDatabase).Collection(Collection).FindOne(ctx, filter).Decode(&mmsg); err != nil { + fmt.Println("findone failed ", err.Error()) + }else{ + fmt.Println("findone ok ", mmsg.UID) + for i, v:=range mmsg.Msg{ + fmt.Println("find value: ", i, v) + } + } + + +} diff --git a/config/config.yaml b/config/config.yaml index c9b800b0f..08fb9d7e4 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -1,47 +1,47 @@ # The class cannot be named by Pascal or camel case. # If it is not used, the corresponding structure will not be set, # and it will not be read naturally. -serverversion: 1.0.3 +serverversion: 3.0.0 #---------------Infrastructure configuration---------------------# etcd: - etcdSchema: openIM - etcdAddr: [ 127.0.0.1:2379 ] + etcdSchema: openIM #默认即可 + etcdAddr: [ 127.0.0.1:2379 ] #单机部署时,默认即可 mysql: - dbMysqlAddress: [ 127.0.0.1:13306 ] - dbMysqlUserName: root - dbMysqlPassword: openIM - dbMysqlDatabaseName: openIM - dbTableName: eMsg + dbMysqlAddress: [ 127.0.0.1:13306 ] #mysql地址 目前仅支持单机,默认即可 + dbMysqlUserName: root #mysql用户名,建议修改 + dbMysqlPassword: openIM # mysql密码,建议修改 + dbMysqlDatabaseName: openIM #默认即可 + dbTableName: eMsg #默认即可 dbMsgTableNum: 1 dbMaxOpenConns: 20 dbMaxIdleConns: 10 dbMaxLifeTime: 120 mongo: - dbAddress: [ 127.0.0.1:37017 ] + dbAddress: [ 127.0.0.1:37017 ] #redis地址 目前仅支持单机,默认即可 dbDirect: false dbTimeout: 10 - dbDatabase: openIM + dbDatabase: openIM #mongo db 默认即可 dbSource: admin - dbUserName: - dbPassword: + dbUserName: #mongo用户名,建议修改 + dbPassword: #mongo密码,建议修改 dbMaxPoolSize: 20 - dbRetainChatRecords: 7 + dbRetainChatRecords: 3650 #mongo保存离线消息时间(天) redis: - dbAddress: 127.0.0.1:16379 + dbAddress: 127.0.0.1:16379 #redis地址 目前仅支持单机,默认即可 dbMaxIdle: 128 dbMaxActive: 0 dbIdleTimeout: 120 - dbPassWord: openIM + dbPassWord: openIM #redis密码 建议修改 kafka: ws2mschat: - addr: [ 127.0.0.1:9092 ] + addr: [ 127.0.0.1:9092 ] #kafka配置,默认即可 topic: "ws2ms_chat" ms2pschat: - addr: [ 127.0.0.1:9092 ] + addr: [ 127.0.0.1:9092 ] #kafka配置,默认即可 topic: "ms2ps_chat" consumergroupid: msgToMongo: mongo @@ -55,6 +55,7 @@ kafka: # The service ip default is empty, # automatically obtain the machine's valid network card ip as the service ip, # otherwise the configuration ip is preferred +#如果是单机模式,用0.0.0.0或者不填,默认即可 serverip: 0.0.0.0 # endpoints 内部组件间访问的端点host名称,访问时,可以内部直接访问 host:port 来访问 @@ -73,27 +74,22 @@ endpoints: rpc_message_cms: openim_rpc_admin_cms api: - openImApiPort: [ 10000 ] + openImApiPort: [ 10000 ] #api服务端口,默认即可,注意开放此端口或做nginx转发 cmsapi: - openImCmsApiPort: [ 8000 ] + openImCmsApiPort: [ 8000 ] #管理后台api服务端口,默认即可,注意开放此端口或做nginx转发 sdk: - openImSdkWsPort: [ 30000 ] + openImSdkWsPort: [ 30000 ] #jssdk服务端口,默认即可,项目中使用jssdk才需开放此端口或做nginx转发 -credential: +credential: #腾讯cos,发送图片、视频、文件时需要,请自行申请后替换,必须修改 tencent: appID: 1302656840 region: ap-chengdu bucket: echat-1302656840 secretID: AKIDGNYVChzIQinu7QEgtNp0hnNgqcV8vZTC secretKey: kz15vW83qM6dBUWIq681eBZA0c0vlIbe - minio: - bucket: openim - location: us-east-1 - endpoint: http://127.0.0.1:9000 - accessKeyID: minioadmin - secretAccessKey: minioadmin -rpcport: + +rpcport: #rpc服务端口 默认即可 openImUserPort: [ 10100 ] openImFriendPort: [ 10200 ] openImOfflineMessagePort: [ 10300 ] @@ -114,7 +110,7 @@ rpcport: stateChange: switch: false -rpcregistername: +rpcregistername: #rpc注册服务名,默认即可 openImUserName: User openImFriendName: Friend openImOfflineMessageName: OfflineMessage @@ -129,57 +125,57 @@ rpcregistername: log: storageLocation: ../logs/ rotationTime: 24 - remainRotationCount: 5 - remainLogLevel: 6 + remainRotationCount: 5 #日志数量 + remainLogLevel: 6 #日志级别 6表示全都打印,测试阶段建议设置为6 elasticSearchSwitch: false elasticSearchAddr: [ 127.0.0.1:9201 ] elasticSearchUser: "" elasticSearchPassword: "" -modulename: +modulename: #日志文件按模块命名,默认即可 longConnSvrName: msg_gateway msgTransferName: msg_transfer pushName: push longconnsvr: - openImWsPort: [ 17778 ] + openImWsPort: [ 17778 ] # ws服务端口,默认即可,要开放此端口或做nginx转发 websocketMaxConnNum: 10000 websocketMaxMsgLen: 4096 websocketTimeOut: 10 push: - tpns: + tpns: #腾讯推送,暂未测试 暂不要使用 ios: accessID: 1600018281 secretKey: 3cd68a77a95b89e5089a1aca523f318f android: accessID: 111 secretKey: 111 - jpns: + jpns: #极光推送 在极光后台申请后,修改以下四项,必须修改 appKey: cf47465a368f24c659608e7e masterSecret: 02204efe3f3832947a236ee5 pushUrl: "https://api.jpush.cn/v3/push" pushIntent: "intent:#Intent;component=io.openim.app.enterprisechat/io.openim.app.enterprisechat.MainActivity;end" manager: + #app管理员userID和对应的secret 建议修改。 用于管理后台登录,也可以用户管理后台对应的api appManagerUid: [ "openIM123456","openIM654321", "openIM333", "openIMAdmin"] secrets: [ "openIM1","openIM2", "openIM333", "openIMAdmin"] secret: tuoyun - +# 多端互踢策略 +# 1:多平台登录:Android、iOS、Windows、Mac 每种平台只能一个在线,web端可以多个同时在线 multiloginpolicy: 1 #token config tokenpolicy: - accessSecret: "open_im_server" + accessSecret: "open_im_server" #token生成相关,默认即可 # Token effective time day as a unit - accessExpire: 7 + accessExpire: 3650 #token过期时间(天) -messagecallback: +messagecallback: #暂时不要使用 还需完善 callbackUrl: "http://www.xxx.com/msg/judge" #TimeOut use second as unit callbackTimeOut: 10 -messagejudge: - isJudgeFriend: true # c2c: # callbackBeforeSendMsg: # switch: false @@ -189,7 +185,7 @@ messagejudge: # state: # stateChange: # switch: false - +#ios系统推送声音以及标记计数 iospush: pushSound: "xxx" badgeCount: true @@ -253,7 +249,7 @@ notification: desc: "groupApplicationAccepted desc" ext: "groupApplicationAccepted ext" defaultTips: - tips: "was allowed to join the group" # group info changed by xx + tips: "allowed to join the group" # group info changed by xx groupApplicationRejected: conversation: @@ -265,7 +261,7 @@ notification: desc: " desc" ext: " ext" defaultTips: - tips: "was rejected into the group" # group info changed by xx + tips: "rejected into the group" # group info changed by xx groupOwnerTransferred: conversation: @@ -289,7 +285,7 @@ notification: desc: "memberKicked desc" ext: "memberKicked ext" defaultTips: - tips: "was kicked out of the group" # group info changed by xx + tips: "kicked out of the group" # group info changed by xx memberInvited: conversation: @@ -301,7 +297,7 @@ notification: desc: "memberInvited desc" ext: "memberInvited ext" defaultTips: - tips: "was invited into the group" # group info changed by xx + tips: "invited into the group" # group info changed by xx memberEnter: conversation: @@ -378,11 +374,11 @@ notification: unreadCount: false offlinePush: switch: true - title: "Deleted a friend" - desc: "Deleted a friend" - ext: "Deleted a friend" + title: "deleted a friend" + desc: "deleted a friend" + ext: "deleted a friend" defaultTips: - tips: "Deleted a friend" # + tips: "deleted a friend" # friendRemarkSet: @@ -405,11 +401,11 @@ notification: unreadCount: false offlinePush: switch: true - title: "Blocked a user" - desc: "Blocked a user" - ext: "Blocked a user" + title: "blocked a user" + desc: "blocked a user" + ext: "blocked a user" defaultTips: - tips: "Blocked a user" # + tips: "blocked a user" # blackDeleted: @@ -435,7 +431,7 @@ notification: desc: "Remove a blocked user" ext: "Remove a blocked user" defaultTips: - tips: "Remove a blocked user" + tips: "remove a blocked user" #####################conversation######################### conversationOptUpdate: @@ -454,23 +450,24 @@ notification: #---------------demo configuration---------------------# #The following configuration items are applied to openIM Demo configuration +#是否启动demo,如果自身没有账号体系,设置为true demoswitch: true demo: - openImDemoPort: [ 42233 ] - alismsverify: + openImDemoPort: [ 42233 ] #demo对外服务端口,默认即可,需要开放此端口或做nginx转发 + alismsverify: #阿里云短信配置,在阿里云申请成功后修改以下四项,必须修改 accessKeyId: LTAI5tJPkn4HuuePdiLdGqe71 accessKeySecret: 4n9OJ7ZCVN1U6KeHDAtOyNeVZcjOuV1 signName: OpenIM Corporation verificationCodeTemplateCode: SMS_2268101641 - superCode: 666666 + superCode: 666666 #超级验证码,建议修改掉,收不到短信验证码时可以用此替代 # second codeTTL: 60 - mail: + mail: #仅支持qq邮箱,具体操作参考 https://service.mail.qq.com/cgi-bin/help?subtype=1&id=28&no=1001256 必须修改 title: "openIM" senderMail: "1765567899@qq.com" senderAuthorizationCode: "1gxyausfoevlzbfag" smtpAddr: "smtp.qq.com" - smtpPort: 25 + smtpPort: 25 #需开放此端口 出口方向 diff --git a/go.mod b/go.mod index 75e85fb3f..9fbbf033a 100644 --- a/go.mod +++ b/go.mod @@ -45,6 +45,7 @@ require ( github.com/tencentyun/qcloud-cos-sts-sdk v0.0.0-20210325043845-84a0811633ca github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect go.etcd.io/etcd v0.0.0-20200402134248-51bdeb39e698 + go.mongodb.org/mongo-driver v1.8.3 golang.org/x/image v0.0.0-20210220032944-ac19c3e999fb golang.org/x/net v0.0.0-20210917221730-978cfadd31cf google.golang.org/grpc v1.40.0 diff --git a/internal/demo/register/login.go b/internal/demo/register/login.go index 944a9d039..b6ec445f6 100644 --- a/internal/demo/register/login.go +++ b/internal/demo/register/login.go @@ -43,7 +43,7 @@ func Login(c *gin.Context) { } if r.Password != params.Password { log.NewError(params.OperationID, "password err", params.Password, account, r.Password, r.Account) - c.JSON(http.StatusOK, gin.H{"errCode": constant.PasswordErr, "errMsg": "Mobile phone number is not registered"}) + c.JSON(http.StatusOK, gin.H{"errCode": constant.PasswordErr, "errMsg": "password err"}) return } url := fmt.Sprintf("http://%s:10000/auth/user_token", utils.ServerIP) diff --git a/internal/msg_transfer/logic/db.go b/internal/msg_transfer/logic/db.go index b9ce8b589..edc269f9f 100644 --- a/internal/msg_transfer/logic/db.go +++ b/internal/msg_transfer/logic/db.go @@ -19,4 +19,5 @@ func saveUserChat(uid string, msg *pbMsg.MsgDataToMQ) error { pbSaveData.MsgData = msg.MsgData log.NewInfo(msg.OperationID, "IncrUserSeq cost time", utils.GetCurrentTimestampByMill()-time) return db.DB.SaveUserChat(uid, pbSaveData.MsgData.SendTime, &pbSaveData) +// return db.DB.SaveUserChatMongo2(uid, pbSaveData.MsgData.SendTime, &pbSaveData) } diff --git a/internal/push/logic/push_to_client.go b/internal/push/logic/push_to_client.go index 721405a78..41c943c4b 100644 --- a/internal/push/logic/push_to_client.go +++ b/internal/push/logic/push_to_client.go @@ -71,32 +71,34 @@ func MsgToUser(pushMsg *pbPush.PushMsgReq) { bCustomContent, _ := json.Marshal(customContent) jsonCustomContent := string(bCustomContent) var content string - switch pushMsg.MsgData.ContentType { - case constant.Text: - content = constant.ContentType2PushContent[constant.Text] - case constant.Picture: - content = constant.ContentType2PushContent[constant.Picture] - case constant.Voice: - content = constant.ContentType2PushContent[constant.Voice] - case constant.Video: - content = constant.ContentType2PushContent[constant.Video] - case constant.File: - content = constant.ContentType2PushContent[constant.File] - case constant.AtText: - a := AtContent{} - _ = utils.JsonStringToStruct(string(pushMsg.MsgData.Content), &a) - if utils.IsContain(v.RecvID, a.AtUserList) { - content = constant.ContentType2PushContent[constant.AtText] + constant.ContentType2PushContent[constant.Common] - } else { - content = constant.ContentType2PushContent[constant.GroupMsg] - } - default: - content = constant.ContentType2PushContent[constant.Common] - } if pushMsg.MsgData.OfflinePushInfo != nil { content = pushMsg.MsgData.OfflinePushInfo.Title + } else { + switch pushMsg.MsgData.ContentType { + case constant.Text: + content = constant.ContentType2PushContent[constant.Text] + case constant.Picture: + content = constant.ContentType2PushContent[constant.Picture] + case constant.Voice: + content = constant.ContentType2PushContent[constant.Voice] + case constant.Video: + content = constant.ContentType2PushContent[constant.Video] + case constant.File: + content = constant.ContentType2PushContent[constant.File] + case constant.AtText: + a := AtContent{} + _ = utils.JsonStringToStruct(string(pushMsg.MsgData.Content), &a) + if utils.IsContain(v.RecvID, a.AtUserList) { + content = constant.ContentType2PushContent[constant.AtText] + constant.ContentType2PushContent[constant.Common] + } else { + content = constant.ContentType2PushContent[constant.GroupMsg] + } + default: + content = constant.ContentType2PushContent[constant.Common] + } } + pushResult, err := push.JGAccountListPush(UIDList, content, jsonCustomContent, constant.PlatformIDToName(t)) if err != nil { log.NewError(pushMsg.OperationID, "offline push error", pushMsg.String(), err.Error(), constant.PlatformIDToName(t)) diff --git a/internal/rpc/msg/pull_message.go b/internal/rpc/msg/pull_message.go index 05f1dad39..126c6eb29 100644 --- a/internal/rpc/msg/pull_message.go +++ b/internal/rpc/msg/pull_message.go @@ -40,6 +40,7 @@ func (rpc *rpcChat) PullMessageBySeqList(_ context.Context, in *open_im_sdk.Pull log.NewInfo(in.OperationID, "rpc PullMessageBySeqList is arriving", in.String()) resp := new(open_im_sdk.PullMessageBySeqListResp) msgList, err := commonDB.DB.GetMsgBySeqList(in.UserID, in.SeqList, in.OperationID) +// msgList, err := commonDB.DB.GetMsgBySeqListMongo2(in.UserID, in.SeqList, in.OperationID) if err != nil { log.ErrorByKv("PullMessageBySeqList data error", in.OperationID, in.String()) resp.ErrCode = 201 diff --git a/pkg/base_info/conversation_api_struct.go b/pkg/base_info/conversation_api_struct.go index bd54bafbf..95aa8527f 100644 --- a/pkg/base_info/conversation_api_struct.go +++ b/pkg/base_info/conversation_api_struct.go @@ -31,3 +31,18 @@ type SetReceiveMessageOptResp struct { CommResp ConversationOptResultList []*OptResult `json:"data"` } + +//type Conversation struct { +// OwnerUserID string `gorm:"column:owner_user_id;primary_key;type:char(128)" json:"OwnerUserID"` +// ConversationID string `gorm:"column:conversation_id;primary_key;type:char(128)" json:"conversationID"` +// ConversationType int32 `gorm:"column:conversation_type" json:"conversationType"` +// UserID string `gorm:"column:user_id;type:char(64)" json:"userID"` +// GroupID string `gorm:"column:group_id;type:char(128)" json:"groupID"` +// RecvMsgOpt int32 `gorm:"column:recv_msg_opt" json:"recvMsgOpt"` +// UnreadCount int32 `gorm:"column:unread_count" json:"unreadCount"` +// DraftTextTime int64 `gorm:"column:draft_text_time" json:"draftTextTime"` +// IsPinned bool `gorm:"column:is_pinned" json:"isPinned"` +// AttachedInfo string `gorm:"column:attached_info;type:varchar(1024)" json:"attachedInfo"` +// Ex string `gorm:"column:ex;type:varchar(1024)" json:"ex"` +//} + diff --git a/pkg/base_info/public_struct.go b/pkg/base_info/public_struct.go index 1e60ae294..2950dbea8 100644 --- a/pkg/base_info/public_struct.go +++ b/pkg/base_info/public_struct.go @@ -16,6 +16,20 @@ type ApiUserInfo struct { Ex string `json:"ex" binding:"omitempty,max=1024"` } +//type Conversation struct { +// OwnerUserID string `gorm:"column:owner_user_id;primary_key;type:char(128)" json:"OwnerUserID"` +// ConversationID string `gorm:"column:conversation_id;primary_key;type:char(128)" json:"conversationID"` +// ConversationType int32 `gorm:"column:conversation_type" json:"conversationType"` +// UserID string `gorm:"column:user_id;type:char(64)" json:"userID"` +// GroupID string `gorm:"column:group_id;type:char(128)" json:"groupID"` +// RecvMsgOpt int32 `gorm:"column:recv_msg_opt" json:"recvMsgOpt"` +// UnreadCount int32 `gorm:"column:unread_count" json:"unreadCount"` +// DraftTextTime int64 `gorm:"column:draft_text_time" json:"draftTextTime"` +// IsPinned bool `gorm:"column:is_pinned" json:"isPinned"` +// AttachedInfo string `gorm:"column:attached_info;type:varchar(1024)" json:"attachedInfo"` +// Ex string `gorm:"column:ex;type:varchar(1024)" json:"ex"` +//} + type GroupAddMemberInfo struct { UserID string `json:"userID" binding:"required"` RoleLevel int32 `json:"roleLevel" binding:"required"` diff --git a/pkg/common/constant/constant.go b/pkg/common/constant/constant.go index 5b90f5132..50c0e544f 100644 --- a/pkg/common/constant/constant.go +++ b/pkg/common/constant/constant.go @@ -182,3 +182,5 @@ func GroupIsBanPrivateChat(status int32) bool { } return true } + + const BigVersion = "v3" \ No newline at end of file diff --git a/pkg/common/db/model.go b/pkg/common/db/model.go index e2a1fb511..f990d95a3 100644 --- a/pkg/common/db/model.go +++ b/pkg/common/db/model.go @@ -3,9 +3,16 @@ package db import ( "Open_IM/pkg/common/config" "Open_IM/pkg/common/log" +// "context" +// "fmt" "github.com/garyburd/redigo/redis" "gopkg.in/mgo.v2" "time" + + //"go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" +// "go.mongodb.org/mongo-driver/mongo/options" + ) var DB DataBases @@ -14,6 +21,7 @@ type DataBases struct { MysqlDB mysqlDB mgoSession *mgo.Session redisPool *redis.Pool + mongoClient *mongo.Client } func key(dbAddress, dbName string) string { @@ -22,10 +30,31 @@ func key(dbAddress, dbName string) string { func init() { var mgoSession *mgo.Session + var mongoClient *mongo.Client var err1 error //mysql init initMysqlDB() // mongo init + // "mongodb://sysop:moon@localhost/records" + // uri := "mongodb://user:pass@sample.host:27017/?maxPoolSize=20&w=majority" + //uri := fmt.Sprintf("mongodb://%s:%s@%s/%s/?maxPoolSize=%d", + // config.Config.Mongo.DBUserName, config.Config.Mongo.DBPassword, + // config.Config.Mongo.DBAddress[0],config.Config.Mongo.DBDatabase, + // config.Config.Mongo.DBMaxPoolSize) + // + //mongoClient, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(uri)) + //if err != nil{ + // log.NewError(" mongo.Connect failed, try ", err.Error(), uri) + // time.Sleep(time.Duration(30) * time.Second) + // mongoClient, err1 = mongo.Connect(context.TODO(), options.Client().ApplyURI(uri)) + // if err1 != nil { + // log.NewError(" mongo.Connect failed, panic", err.Error(), uri) + // panic(err1.Error()) + // } + //} + + + mgoDailInfo := &mgo.DialInfo{ Addrs: config.Config.Mongo.DBAddress, Direct: config.Config.Mongo.DBDirect, @@ -37,16 +66,16 @@ func init() { PoolLimit: config.Config.Mongo.DBMaxPoolSize, } mgoSession, err := mgo.DialWithInfo(mgoDailInfo) + if err != nil { - log.NewError("mgo init err", err.Error(), mgoDailInfo) - } - if err != nil { - time.Sleep(time.Duration(30) * time.Second) + mgoSession, err1 = mgo.DialWithInfo(mgoDailInfo) if err1 != nil { + log.NewError(" mongo.Connect failed, panic", err.Error()) panic(err1.Error()) } } + DB.mongoClient = mongoClient DB.mgoSession = mgoSession DB.mgoSession.SetMode(mgo.Monotonic, true) c := DB.mgoSession.DB(config.Config.Mongo.DBDatabase).C(cChat) @@ -55,6 +84,7 @@ func init() { panic(err.Error()) } + // redis pool init DB.redisPool = &redis.Pool{ MaxIdle: config.Config.Redis.DBMaxIdle, diff --git a/pkg/common/db/model_struct.go b/pkg/common/db/model_struct.go index 51aa112b4..184f51e9d 100644 --- a/pkg/common/db/model_struct.go +++ b/pkg/common/db/model_struct.go @@ -187,7 +187,7 @@ type ChatLog struct { SessionType int32 `gorm:"column:session_type" json:"sessionType"` MsgFrom int32 `gorm:"column:msg_from" json:"msgFrom"` ContentType int32 `gorm:"column:content_type" json:"contentType"` - Content string `gorm:"column:content;type:varchar(1000)" json:"content"` + Content string `gorm:"column:content;type:varchar(3000)" json:"content"` Status int32 `gorm:"column:status" json:"status"` SendTime time.Time `gorm:"column:send_time" json:"sendTime"` CreateTime time.Time `gorm:"column:create_time" json:"createTime"` diff --git a/pkg/common/db/mongoModel.go b/pkg/common/db/mongoModel.go index 98318dd65..7a9c4b302 100644 --- a/pkg/common/db/mongoModel.go +++ b/pkg/common/db/mongoModel.go @@ -6,10 +6,12 @@ import ( pbMsg "Open_IM/pkg/proto/chat" open_im_sdk "Open_IM/pkg/proto/sdk_ws" "Open_IM/pkg/utils" + "context" "errors" - "github.com/garyburd/redigo/redis" + //"github.com/garyburd/redigo/redis" "github.com/golang/protobuf/proto" "gopkg.in/mgo.v2/bson" + "strconv" "time" ) @@ -34,33 +36,35 @@ type GroupMember_x struct { } func (d *DataBases) GetMinSeqFromMongo(uid string) (MinSeq uint32, err error) { - var i, NB uint32 - var seqUid string - session := d.mgoSession.Clone() - if session == nil { - return MinSeq, errors.New("session == nil") - } - defer session.Close() - c := session.DB(config.Config.Mongo.DBDatabase).C(cChat) - MaxSeq, err := d.GetUserMaxSeq(uid) - if err != nil && err != redis.ErrNil { - return MinSeq, err - } - NB = uint32(MaxSeq / singleGocMsgNum) - for i = 0; i <= NB; i++ { - seqUid = indexGen(uid, i) - n, err := c.Find(bson.M{"uid": seqUid}).Count() - if err == nil && n != 0 { - if i == 0 { - MinSeq = 1 - } else { - MinSeq = uint32(i * singleGocMsgNum) - } - break - } - } - return MinSeq, nil + return 1, nil + //var i, NB uint32 + //var seqUid string + //session := d.mgoSession.Clone() + //if session == nil { + // return MinSeq, errors.New("session == nil") + //} + //defer session.Close() + //c := session.DB(config.Config.Mongo.DBDatabase).C(cChat) + //MaxSeq, err := d.GetUserMaxSeq(uid) + //if err != nil && err != redis.ErrNil { + // return MinSeq, err + //} + //NB = uint32(MaxSeq / singleGocMsgNum) + //for i = 0; i <= NB; i++ { + // seqUid = indexGen(uid, i) + // n, err := c.Find(bson.M{"uid": seqUid}).Count() + // if err == nil && n != 0 { + // if i == 0 { + // MinSeq = 1 + // } else { + // MinSeq = uint32(i * singleGocMsgNum) + // } + // break + // } + //} + //return MinSeq, nil } + func (d *DataBases) GetMsgBySeqList(uid string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, err error) { var hasSeqList []uint32 singleCount := 0 @@ -115,6 +119,61 @@ func (d *DataBases) GetMsgBySeqList(uid string, seqList []uint32, operationID st } return seqMsg, nil } + + +func (d *DataBases) GetMsgBySeqListMongo2(uid string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, err error) { + var hasSeqList []uint32 + singleCount := 0 + ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) + c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) + + m := func(uid string, seqList []uint32) map[string][]uint32 { + t := make(map[string][]uint32) + for i := 0; i < len(seqList); i++ { + seqUid := getSeqUid(uid, seqList[i]) + if value, ok := t[seqUid]; !ok { + var temp []uint32 + t[seqUid] = append(temp, seqList[i]) + } else { + t[seqUid] = append(value, seqList[i]) + } + } + return t + }(uid, seqList) + sChat := UserChat{} + for seqUid, value := range m { + if err = c.FindOne(ctx, bson.M{"uid": seqUid}).Decode(&sChat); err != nil { + log.NewError(operationID, "not find seqUid", seqUid, value, uid, seqList, err.Error()) + continue + } + singleCount = 0 + for i := 0; i < len(sChat.Msg); i++ { + msg := new(open_im_sdk.MsgData) + if err = proto.Unmarshal(sChat.Msg[i].Msg, msg); err != nil { + log.NewError(operationID, "Unmarshal err", seqUid, value, uid, seqList, err.Error()) + return nil, err + } + if isContainInt32(msg.Seq, value) { + seqMsg = append(seqMsg, msg) + hasSeqList = append(hasSeqList, msg.Seq) + singleCount++ + if singleCount == len(value) { + break + } + } + } + } + if len(hasSeqList) != len(seqList) { + var diff []uint32 + diff = utils.Difference(hasSeqList, seqList) + exceptionMSg := genExceptionMessageBySeqList(diff) + seqMsg = append(seqMsg, exceptionMSg...) + + } + return seqMsg, nil +} + + func genExceptionMessageBySeqList(seqList []uint32) (exceptionMsg []*open_im_sdk.MsgData) { for _, v := range seqList { msg := new(open_im_sdk.MsgData) @@ -124,6 +183,37 @@ func genExceptionMessageBySeqList(seqList []uint32) (exceptionMsg []*open_im_sdk return exceptionMsg } +func (d *DataBases) SaveUserChatMongo2(uid string, sendTime int64, m *pbMsg.MsgDataToDB) error { + ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) + c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) + newTime := getCurrentTimestampByMill() + operationID := "" + seqUid := getSeqUid(uid, m.MsgData.Seq) + filter := bson.M{"uid": seqUid} + var err error + sMsg := MsgInfo{} + sMsg.SendTime = sendTime + if sMsg.Msg, err = proto.Marshal(m.MsgData); err != nil { + return utils.Wrap(err,"") + } + err = c.FindOneAndUpdate(ctx, filter, bson.M{"$push": bson.M{"msg": sMsg}}).Err() + log.NewDebug(operationID, "get mgoSession cost time", getCurrentTimestampByMill()-newTime) + if err != nil { + sChat := UserChat{} + sChat.UID = seqUid + sChat.Msg = append(sChat.Msg, sMsg) + if _, err = c.InsertOne(ctx, &sChat) ; err != nil{ + log.NewDebug(operationID, "InsertOne failed", filter) + return utils.Wrap(err, "") + } + }else{ + log.NewDebug(operationID, "FindOneAndUpdate ok", filter) + } + + log.NewDebug(operationID, "find mgo uid cost time", getCurrentTimestampByMill()-newTime) + return nil +} + func (d *DataBases) SaveUserChat(uid string, sendTime int64, m *pbMsg.MsgDataToDB) error { var seqUid string newTime := getCurrentTimestampByMill() @@ -163,115 +253,137 @@ func (d *DataBases) SaveUserChat(uid string, sendTime int64, m *pbMsg.MsgDataToD return nil } -func (d *DataBases) DelUserChat(uid string) error { - session := d.mgoSession.Clone() - if session == nil { - return errors.New("session == nil") - } - defer session.Close() - c := session.DB(config.Config.Mongo.DBDatabase).C(cChat) +func (d *DataBases) DelUserChat(uid string) error { + return nil + //session := d.mgoSession.Clone() + //if session == nil { + // return errors.New("session == nil") + //} + //defer session.Close() + // + //c := session.DB(config.Config.Mongo.DBDatabase).C(cChat) + // + //delTime := time.Now().Unix() - int64(config.Config.Mongo.DBRetainChatRecords)*24*3600 + //if err := c.Update(bson.M{"uid": uid}, bson.M{"$pull": bson.M{"msg": bson.M{"sendtime": bson.M{"$lte": delTime}}}}); err != nil { + // return err + //} + // + //return nil +} + + +func (d *DataBases) DelUserChatMongo2(uid string) error { + ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) + c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) + filter := bson.M{"uid": uid} delTime := time.Now().Unix() - int64(config.Config.Mongo.DBRetainChatRecords)*24*3600 - if err := c.Update(bson.M{"uid": uid}, bson.M{"$pull": bson.M{"msg": bson.M{"sendtime": bson.M{"$lte": delTime}}}}); err != nil { - return err + if _, err := c.UpdateOne(ctx, filter, bson.M{"$pull": bson.M{"msg": bson.M{"sendtime": bson.M{"$lte": delTime}}}}); err != nil { + return utils.Wrap(err, "") } - return nil } + + func (d *DataBases) MgoUserCount() (int, error) { - session := d.mgoSession.Clone() - if session == nil { - return 0, errors.New("session == nil") - } - defer session.Close() - - c := session.DB(config.Config.Mongo.DBDatabase).C(cChat) - - return c.Find(nil).Count() + return 0, nil + //session := d.mgoSession.Clone() + //if session == nil { + // return 0, errors.New("session == nil") + //} + //defer session.Close() + // + //c := session.DB(config.Config.Mongo.DBDatabase).C(cChat) + // + //return c.Find(nil).Count() } func (d *DataBases) MgoSkipUID(count int) (string, error) { - session := d.mgoSession.Clone() - if session == nil { - return "", errors.New("session == nil") - } - defer session.Close() - - c := session.DB(config.Config.Mongo.DBDatabase).C(cChat) - - sChat := UserChat{} - c.Find(nil).Skip(count).Limit(1).One(&sChat) - return sChat.UID, nil + return "", nil + //session := d.mgoSession.Clone() + //if session == nil { + // return "", errors.New("session == nil") + //} + //defer session.Close() + // + //c := session.DB(config.Config.Mongo.DBDatabase).C(cChat) + // + //sChat := UserChat{} + //c.Find(nil).Skip(count).Limit(1).One(&sChat) + //return sChat.UID, nil } func (d *DataBases) GetGroupMember(groupID string) []string { - groupInfo := GroupMember_x{} - groupInfo.GroupID = groupID - groupInfo.UIDList = make([]string, 0) - - session := d.mgoSession.Clone() - if session == nil { - return groupInfo.UIDList - } - defer session.Close() - - c := session.DB(config.Config.Mongo.DBDatabase).C(cGroup) - - if err := c.Find(bson.M{"groupid": groupInfo.GroupID}).One(&groupInfo); err != nil { - return groupInfo.UIDList - } - - return groupInfo.UIDList + return nil + //groupInfo := GroupMember_x{} + //groupInfo.GroupID = groupID + //groupInfo.UIDList = make([]string, 0) + // + //session := d.mgoSession.Clone() + //if session == nil { + // return groupInfo.UIDList + //} + //defer session.Close() + // + //c := session.DB(config.Config.Mongo.DBDatabase).C(cGroup) + // + //if err := c.Find(bson.M{"groupid": groupInfo.GroupID}).One(&groupInfo); err != nil { + // return groupInfo.UIDList + //} + // + //return groupInfo.UIDList } func (d *DataBases) AddGroupMember(groupID, uid string) error { - session := d.mgoSession.Clone() - if session == nil { - return errors.New("session == nil") - } - defer session.Close() - - c := session.DB(config.Config.Mongo.DBDatabase).C(cGroup) - - n, err := c.Find(bson.M{"groupid": groupID}).Count() - if err != nil { - return err - } - - if n == 0 { - groupInfo := GroupMember_x{} - groupInfo.GroupID = groupID - groupInfo.UIDList = append(groupInfo.UIDList, uid) - err = c.Insert(&groupInfo) - if err != nil { - return err - } - } else { - err = c.Update(bson.M{"groupid": groupID}, bson.M{"$addToSet": bson.M{"uidlist": uid}}) - if err != nil { - return err - } - } - return nil + //session := d.mgoSession.Clone() + //if session == nil { + // return errors.New("session == nil") + //} + //defer session.Close() + // + //c := session.DB(config.Config.Mongo.DBDatabase).C(cGroup) + // + //n, err := c.Find(bson.M{"groupid": groupID}).Count() + //if err != nil { + // return err + //} + // + //if n == 0 { + // groupInfo := GroupMember_x{} + // groupInfo.GroupID = groupID + // groupInfo.UIDList = append(groupInfo.UIDList, uid) + // err = c.Insert(&groupInfo) + // if err != nil { + // return err + // } + //} else { + // err = c.Update(bson.M{"groupid": groupID}, bson.M{"$addToSet": bson.M{"uidlist": uid}}) + // if err != nil { + // return err + // } + //} + // + //return nil } func (d *DataBases) DelGroupMember(groupID, uid string) error { - session := d.mgoSession.Clone() - if session == nil { - return errors.New("session == nil") - } - defer session.Close() - - c := session.DB(config.Config.Mongo.DBDatabase).C(cGroup) - - if err := c.Update(bson.M{"groupid": groupID}, bson.M{"$pull": bson.M{"uidlist": uid}}); err != nil { - return err - } - return nil + //session := d.mgoSession.Clone() + //if session == nil { + // return errors.New("session == nil") + //} + //defer session.Close() + // + //c := session.DB(config.Config.Mongo.DBDatabase).C(cGroup) + // + //if err := c.Update(bson.M{"groupid": groupID}, bson.M{"$pull": bson.M{"uidlist": uid}}); err != nil { + // return err + //} + // + //return nil } func getCurrentTimestampByMill() int64 {