mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-11-04 11:22:10 +08:00 
			
		
		
		
	Refactoring code
This commit is contained in:
		
							parent
							
								
									10a6f951eb
								
							
						
					
					
						commit
						1af134dfad
					
				@ -1 +1 @@
 | 
				
			|||||||
Subproject commit e43ec7d427a84702eea7a6aaa358a7a0a809019d
 | 
					Subproject commit bdb27fcff0e9785cb0f92d9d04bdafe544a04de0
 | 
				
			||||||
							
								
								
									
										69
									
								
								cmd/test/main.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										69
									
								
								cmd/test/main.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,69 @@
 | 
				
			|||||||
 | 
					package main
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"Open_IM/pkg/utils"
 | 
				
			||||||
 | 
						"context"
 | 
				
			||||||
 | 
						"fmt"
 | 
				
			||||||
 | 
						"go.mongodb.org/mongo-driver/bson"
 | 
				
			||||||
 | 
						"go.mongodb.org/mongo-driver/mongo"
 | 
				
			||||||
 | 
						"go.mongodb.org/mongo-driver/mongo/options"
 | 
				
			||||||
 | 
						"time"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type MongoMsg struct {
 | 
				
			||||||
 | 
						UID string
 | 
				
			||||||
 | 
						Msg []string
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func main()  {
 | 
				
			||||||
 | 
						//"mongodb://%s:%s@%s/%s/?maxPoolSize=%d"
 | 
				
			||||||
 | 
						uri := "mongodb://user:pass@sample.host:27017/?maxPoolSize=20&w=majority"
 | 
				
			||||||
 | 
						DBAddress := "127.0.0.1:37017"
 | 
				
			||||||
 | 
						DBDatabase := "new-test-db"
 | 
				
			||||||
 | 
						Collection := "new-test-collection"
 | 
				
			||||||
 | 
						DBMaxPoolSize := 100
 | 
				
			||||||
 | 
						uri = fmt.Sprintf("mongodb://%s/%s/?maxPoolSize=%d",
 | 
				
			||||||
 | 
							DBAddress,DBDatabase,
 | 
				
			||||||
 | 
							DBMaxPoolSize)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						mongoClient, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(uri))
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							panic(err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						filter := bson.M{"uid":"my_uid"}
 | 
				
			||||||
 | 
						ctx, _ := context.WithTimeout(context.Background(), 30*time.Second)
 | 
				
			||||||
 | 
						for i:=0; i < 2; i++{
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							if err = mongoClient.Database(DBDatabase).Collection(Collection).FindOneAndUpdate(ctx, filter,
 | 
				
			||||||
 | 
								bson.M{"$push": bson.M{"msg": utils.Int32ToString(int32(i))}}).Err(); err != nil{
 | 
				
			||||||
 | 
								fmt.Println("FindOneAndUpdate failed ", i, )
 | 
				
			||||||
 | 
								var mmsg MongoMsg
 | 
				
			||||||
 | 
								mmsg.UID = "my_uid"
 | 
				
			||||||
 | 
								mmsg.Msg = append(mmsg.Msg, utils.Int32ToString(int32(i)))
 | 
				
			||||||
 | 
								_, err := mongoClient.Database(DBDatabase).Collection(Collection).InsertOne(ctx, &mmsg)
 | 
				
			||||||
 | 
								if err != nil {
 | 
				
			||||||
 | 
									fmt.Println("insertone failed ", err.Error(), i)
 | 
				
			||||||
 | 
								} else{
 | 
				
			||||||
 | 
									fmt.Println("insertone ok ", i)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							}else {
 | 
				
			||||||
 | 
								fmt.Println("FindOneAndUpdate ok ", i)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						var mmsg MongoMsg
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if  err = mongoClient.Database(DBDatabase).Collection(Collection).FindOne(ctx, filter).Decode(&mmsg); err != nil {
 | 
				
			||||||
 | 
							fmt.Println("findone failed ", err.Error())
 | 
				
			||||||
 | 
						}else{
 | 
				
			||||||
 | 
							fmt.Println("findone ok ", mmsg.UID)
 | 
				
			||||||
 | 
							for i, v:=range mmsg.Msg{
 | 
				
			||||||
 | 
								fmt.Println("find value: ", i, v)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
							
								
								
									
										1
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										1
									
								
								go.mod
									
									
									
									
									
								
							@ -45,6 +45,7 @@ require (
 | 
				
			|||||||
	github.com/tencentyun/qcloud-cos-sts-sdk v0.0.0-20210325043845-84a0811633ca
 | 
						github.com/tencentyun/qcloud-cos-sts-sdk v0.0.0-20210325043845-84a0811633ca
 | 
				
			||||||
	github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect
 | 
						github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect
 | 
				
			||||||
	go.etcd.io/etcd v0.0.0-20200402134248-51bdeb39e698
 | 
						go.etcd.io/etcd v0.0.0-20200402134248-51bdeb39e698
 | 
				
			||||||
 | 
						go.mongodb.org/mongo-driver v1.8.3
 | 
				
			||||||
	golang.org/x/image v0.0.0-20210220032944-ac19c3e999fb
 | 
						golang.org/x/image v0.0.0-20210220032944-ac19c3e999fb
 | 
				
			||||||
	golang.org/x/net v0.0.0-20210917221730-978cfadd31cf
 | 
						golang.org/x/net v0.0.0-20210917221730-978cfadd31cf
 | 
				
			||||||
	google.golang.org/grpc v1.40.0
 | 
						google.golang.org/grpc v1.40.0
 | 
				
			||||||
 | 
				
			|||||||
@ -31,3 +31,18 @@ type SetReceiveMessageOptResp struct {
 | 
				
			|||||||
	CommResp
 | 
						CommResp
 | 
				
			||||||
	ConversationOptResultList []*OptResult `json:"data"`
 | 
						ConversationOptResultList []*OptResult `json:"data"`
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					//type Conversation struct {
 | 
				
			||||||
 | 
					//	OwnerUserID      string `gorm:"column:owner_user_id;primary_key;type:char(128)" json:"OwnerUserID"`
 | 
				
			||||||
 | 
					//	ConversationID   string `gorm:"column:conversation_id;primary_key;type:char(128)" json:"conversationID"`
 | 
				
			||||||
 | 
					//	ConversationType int32  `gorm:"column:conversation_type" json:"conversationType"`
 | 
				
			||||||
 | 
					//	UserID           string `gorm:"column:user_id;type:char(64)" json:"userID"`
 | 
				
			||||||
 | 
					//	GroupID          string `gorm:"column:group_id;type:char(128)" json:"groupID"`
 | 
				
			||||||
 | 
					//	RecvMsgOpt       int32  `gorm:"column:recv_msg_opt" json:"recvMsgOpt"`
 | 
				
			||||||
 | 
					//	UnreadCount      int32  `gorm:"column:unread_count" json:"unreadCount"`
 | 
				
			||||||
 | 
					//	DraftTextTime    int64  `gorm:"column:draft_text_time" json:"draftTextTime"`
 | 
				
			||||||
 | 
					//	IsPinned         bool   `gorm:"column:is_pinned" json:"isPinned"`
 | 
				
			||||||
 | 
					//	AttachedInfo     string `gorm:"column:attached_info;type:varchar(1024)" json:"attachedInfo"`
 | 
				
			||||||
 | 
					//	Ex               string `gorm:"column:ex;type:varchar(1024)" json:"ex"`
 | 
				
			||||||
 | 
					//}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
@ -16,6 +16,20 @@ type ApiUserInfo struct {
 | 
				
			|||||||
	Ex          string `json:"ex" binding:"omitempty,max=1024"`
 | 
						Ex          string `json:"ex" binding:"omitempty,max=1024"`
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					//type Conversation struct {
 | 
				
			||||||
 | 
					//	OwnerUserID      string `gorm:"column:owner_user_id;primary_key;type:char(128)" json:"OwnerUserID"`
 | 
				
			||||||
 | 
					//	ConversationID   string `gorm:"column:conversation_id;primary_key;type:char(128)" json:"conversationID"`
 | 
				
			||||||
 | 
					//	ConversationType int32  `gorm:"column:conversation_type" json:"conversationType"`
 | 
				
			||||||
 | 
					//	UserID           string `gorm:"column:user_id;type:char(64)" json:"userID"`
 | 
				
			||||||
 | 
					//	GroupID          string `gorm:"column:group_id;type:char(128)" json:"groupID"`
 | 
				
			||||||
 | 
					//	RecvMsgOpt       int32  `gorm:"column:recv_msg_opt" json:"recvMsgOpt"`
 | 
				
			||||||
 | 
					//	UnreadCount      int32  `gorm:"column:unread_count" json:"unreadCount"`
 | 
				
			||||||
 | 
					//	DraftTextTime    int64  `gorm:"column:draft_text_time" json:"draftTextTime"`
 | 
				
			||||||
 | 
					//	IsPinned         bool   `gorm:"column:is_pinned" json:"isPinned"`
 | 
				
			||||||
 | 
					//	AttachedInfo     string `gorm:"column:attached_info;type:varchar(1024)" json:"attachedInfo"`
 | 
				
			||||||
 | 
					//	Ex               string `gorm:"column:ex;type:varchar(1024)" json:"ex"`
 | 
				
			||||||
 | 
					//}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type GroupAddMemberInfo struct {
 | 
					type GroupAddMemberInfo struct {
 | 
				
			||||||
	UserID    string `json:"userID" binding:"required"`
 | 
						UserID    string `json:"userID" binding:"required"`
 | 
				
			||||||
	RoleLevel int32  `json:"roleLevel" binding:"required"`
 | 
						RoleLevel int32  `json:"roleLevel" binding:"required"`
 | 
				
			||||||
 | 
				
			|||||||
@ -182,3 +182,5 @@ func GroupIsBanPrivateChat(status int32) bool {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
	return true
 | 
						return true
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					 const BigVersion = "v3"
 | 
				
			||||||
@ -3,9 +3,16 @@ package db
 | 
				
			|||||||
import (
 | 
					import (
 | 
				
			||||||
	"Open_IM/pkg/common/config"
 | 
						"Open_IM/pkg/common/config"
 | 
				
			||||||
	"Open_IM/pkg/common/log"
 | 
						"Open_IM/pkg/common/log"
 | 
				
			||||||
 | 
						"context"
 | 
				
			||||||
 | 
						"fmt"
 | 
				
			||||||
	"github.com/garyburd/redigo/redis"
 | 
						"github.com/garyburd/redigo/redis"
 | 
				
			||||||
	"gopkg.in/mgo.v2"
 | 
						"gopkg.in/mgo.v2"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						//"go.mongodb.org/mongo-driver/bson"
 | 
				
			||||||
 | 
						"go.mongodb.org/mongo-driver/mongo"
 | 
				
			||||||
 | 
						"go.mongodb.org/mongo-driver/mongo/options"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
var DB DataBases
 | 
					var DB DataBases
 | 
				
			||||||
@ -14,6 +21,7 @@ type DataBases struct {
 | 
				
			|||||||
	MysqlDB    mysqlDB
 | 
						MysqlDB    mysqlDB
 | 
				
			||||||
	mgoSession *mgo.Session
 | 
						mgoSession *mgo.Session
 | 
				
			||||||
	redisPool  *redis.Pool
 | 
						redisPool  *redis.Pool
 | 
				
			||||||
 | 
						mongoClient *mongo.Client
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func key(dbAddress, dbName string) string {
 | 
					func key(dbAddress, dbName string) string {
 | 
				
			||||||
@ -22,10 +30,21 @@ func key(dbAddress, dbName string) string {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
func init() {
 | 
					func init() {
 | 
				
			||||||
	var mgoSession *mgo.Session
 | 
						var mgoSession *mgo.Session
 | 
				
			||||||
 | 
						var mongoClient *mongo.Client
 | 
				
			||||||
	var err1 error
 | 
						var err1 error
 | 
				
			||||||
	//mysql init
 | 
						//mysql init
 | 
				
			||||||
	initMysqlDB()
 | 
						initMysqlDB()
 | 
				
			||||||
	// mongo init
 | 
						// mongo init
 | 
				
			||||||
 | 
						// "mongodb://sysop:moon@localhost/records"
 | 
				
			||||||
 | 
						// uri := "mongodb://user:pass@sample.host:27017/?maxPoolSize=20&w=majority"
 | 
				
			||||||
 | 
						uri := fmt.Sprintf("mongodb://%s:%s@%s/%s/?maxPoolSize=%d",
 | 
				
			||||||
 | 
							config.Config.Mongo.DBUserName, config.Config.Mongo.DBPassword,
 | 
				
			||||||
 | 
							config.Config.Mongo.DBAddress[0],config.Config.Mongo.DBDatabase,
 | 
				
			||||||
 | 
							config.Config.Mongo.DBMaxPoolSize)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						mongoClient, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(uri))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	mgoDailInfo := &mgo.DialInfo{
 | 
						mgoDailInfo := &mgo.DialInfo{
 | 
				
			||||||
		Addrs:     config.Config.Mongo.DBAddress,
 | 
							Addrs:     config.Config.Mongo.DBAddress,
 | 
				
			||||||
		Direct:    config.Config.Mongo.DBDirect,
 | 
							Direct:    config.Config.Mongo.DBDirect,
 | 
				
			||||||
@ -36,17 +55,19 @@ func init() {
 | 
				
			|||||||
		Password:  config.Config.Mongo.DBPassword,
 | 
							Password:  config.Config.Mongo.DBPassword,
 | 
				
			||||||
		PoolLimit: config.Config.Mongo.DBMaxPoolSize,
 | 
							PoolLimit: config.Config.Mongo.DBMaxPoolSize,
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	mgoSession, err := mgo.DialWithInfo(mgoDailInfo)
 | 
						mgoSession, err = mgo.DialWithInfo(mgoDailInfo)
 | 
				
			||||||
	if err != nil {
 | 
					
 | 
				
			||||||
		log.NewError("mgo init err", err.Error(), mgoDailInfo)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
 | 
							log.NewError(" mongo.Connect  failed, try ", err.Error(), uri)
 | 
				
			||||||
		time.Sleep(time.Duration(30) * time.Second)
 | 
							time.Sleep(time.Duration(30) * time.Second)
 | 
				
			||||||
 | 
							mongoClient, err1 = mongo.Connect(context.TODO(), options.Client().ApplyURI(uri))
 | 
				
			||||||
		mgoSession, err1 = mgo.DialWithInfo(mgoDailInfo)
 | 
							mgoSession, err1 = mgo.DialWithInfo(mgoDailInfo)
 | 
				
			||||||
		if err1 != nil {
 | 
							if err1 != nil {
 | 
				
			||||||
 | 
								log.NewError(" mongo.Connect  failed, panic", err.Error(), uri)
 | 
				
			||||||
			panic(err1.Error())
 | 
								panic(err1.Error())
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						DB.mongoClient = mongoClient
 | 
				
			||||||
	DB.mgoSession = mgoSession
 | 
						DB.mgoSession = mgoSession
 | 
				
			||||||
	DB.mgoSession.SetMode(mgo.Monotonic, true)
 | 
						DB.mgoSession.SetMode(mgo.Monotonic, true)
 | 
				
			||||||
	c := DB.mgoSession.DB(config.Config.Mongo.DBDatabase).C(cChat)
 | 
						c := DB.mgoSession.DB(config.Config.Mongo.DBDatabase).C(cChat)
 | 
				
			||||||
@ -55,6 +76,7 @@ func init() {
 | 
				
			|||||||
		panic(err.Error())
 | 
							panic(err.Error())
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// redis pool init
 | 
						// redis pool init
 | 
				
			||||||
	DB.redisPool = &redis.Pool{
 | 
						DB.redisPool = &redis.Pool{
 | 
				
			||||||
		MaxIdle:     config.Config.Redis.DBMaxIdle,
 | 
							MaxIdle:     config.Config.Redis.DBMaxIdle,
 | 
				
			||||||
 | 
				
			|||||||
@ -6,10 +6,12 @@ import (
 | 
				
			|||||||
	pbMsg "Open_IM/pkg/proto/chat"
 | 
						pbMsg "Open_IM/pkg/proto/chat"
 | 
				
			||||||
	open_im_sdk "Open_IM/pkg/proto/sdk_ws"
 | 
						open_im_sdk "Open_IM/pkg/proto/sdk_ws"
 | 
				
			||||||
	"Open_IM/pkg/utils"
 | 
						"Open_IM/pkg/utils"
 | 
				
			||||||
 | 
						"context"
 | 
				
			||||||
	"errors"
 | 
						"errors"
 | 
				
			||||||
	"github.com/garyburd/redigo/redis"
 | 
						//"github.com/garyburd/redigo/redis"
 | 
				
			||||||
	"github.com/golang/protobuf/proto"
 | 
						"github.com/golang/protobuf/proto"
 | 
				
			||||||
	"gopkg.in/mgo.v2/bson"
 | 
						"gopkg.in/mgo.v2/bson"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"strconv"
 | 
						"strconv"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
@ -34,33 +36,35 @@ type GroupMember_x struct {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (d *DataBases) GetMinSeqFromMongo(uid string) (MinSeq uint32, err error) {
 | 
					func (d *DataBases) GetMinSeqFromMongo(uid string) (MinSeq uint32, err error) {
 | 
				
			||||||
	var i, NB uint32
 | 
						return 1, nil
 | 
				
			||||||
	var seqUid string
 | 
						//var i, NB uint32
 | 
				
			||||||
	session := d.mgoSession.Clone()
 | 
						//var seqUid string
 | 
				
			||||||
	if session == nil {
 | 
						//session := d.mgoSession.Clone()
 | 
				
			||||||
		return MinSeq, errors.New("session == nil")
 | 
						//if session == nil {
 | 
				
			||||||
	}
 | 
						//	return MinSeq, errors.New("session == nil")
 | 
				
			||||||
	defer session.Close()
 | 
						//}
 | 
				
			||||||
	c := session.DB(config.Config.Mongo.DBDatabase).C(cChat)
 | 
						//defer session.Close()
 | 
				
			||||||
	MaxSeq, err := d.GetUserMaxSeq(uid)
 | 
						//c := session.DB(config.Config.Mongo.DBDatabase).C(cChat)
 | 
				
			||||||
	if err != nil && err != redis.ErrNil {
 | 
						//MaxSeq, err := d.GetUserMaxSeq(uid)
 | 
				
			||||||
		return MinSeq, err
 | 
						//if err != nil && err != redis.ErrNil {
 | 
				
			||||||
	}
 | 
						//	return MinSeq, err
 | 
				
			||||||
	NB = uint32(MaxSeq / singleGocMsgNum)
 | 
						//}
 | 
				
			||||||
	for i = 0; i <= NB; i++ {
 | 
						//NB = uint32(MaxSeq / singleGocMsgNum)
 | 
				
			||||||
		seqUid = indexGen(uid, i)
 | 
						//for i = 0; i <= NB; i++ {
 | 
				
			||||||
		n, err := c.Find(bson.M{"uid": seqUid}).Count()
 | 
						//	seqUid = indexGen(uid, i)
 | 
				
			||||||
		if err == nil && n != 0 {
 | 
						//	n, err := c.Find(bson.M{"uid": seqUid}).Count()
 | 
				
			||||||
			if i == 0 {
 | 
						//	if err == nil && n != 0 {
 | 
				
			||||||
				MinSeq = 1
 | 
						//		if i == 0 {
 | 
				
			||||||
			} else {
 | 
						//			MinSeq = 1
 | 
				
			||||||
				MinSeq = uint32(i * singleGocMsgNum)
 | 
						//		} else {
 | 
				
			||||||
			}
 | 
						//			MinSeq = uint32(i * singleGocMsgNum)
 | 
				
			||||||
			break
 | 
						//		}
 | 
				
			||||||
		}
 | 
						//		break
 | 
				
			||||||
	}
 | 
						//	}
 | 
				
			||||||
	return MinSeq, nil
 | 
						//}
 | 
				
			||||||
 | 
						//return MinSeq, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (d *DataBases) GetMsgBySeqList(uid string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, err error) {
 | 
					func (d *DataBases) GetMsgBySeqList(uid string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, err error) {
 | 
				
			||||||
	var hasSeqList []uint32
 | 
						var hasSeqList []uint32
 | 
				
			||||||
	singleCount := 0
 | 
						singleCount := 0
 | 
				
			||||||
@ -115,6 +119,61 @@ func (d *DataBases) GetMsgBySeqList(uid string, seqList []uint32, operationID st
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
	return seqMsg, nil
 | 
						return seqMsg, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (d *DataBases) GetMsgBySeqListMongo2(uid string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, err error) {
 | 
				
			||||||
 | 
						var hasSeqList []uint32
 | 
				
			||||||
 | 
						singleCount := 0
 | 
				
			||||||
 | 
						ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
 | 
				
			||||||
 | 
						c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						m := func(uid string, seqList []uint32) map[string][]uint32 {
 | 
				
			||||||
 | 
							t := make(map[string][]uint32)
 | 
				
			||||||
 | 
							for i := 0; i < len(seqList); i++ {
 | 
				
			||||||
 | 
								seqUid := getSeqUid(uid, seqList[i])
 | 
				
			||||||
 | 
								if value, ok := t[seqUid]; !ok {
 | 
				
			||||||
 | 
									var temp []uint32
 | 
				
			||||||
 | 
									t[seqUid] = append(temp, seqList[i])
 | 
				
			||||||
 | 
								} else {
 | 
				
			||||||
 | 
									t[seqUid] = append(value, seqList[i])
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							return t
 | 
				
			||||||
 | 
						}(uid, seqList)
 | 
				
			||||||
 | 
						sChat := UserChat{}
 | 
				
			||||||
 | 
						for seqUid, value := range m {
 | 
				
			||||||
 | 
							if err = c.FindOne(ctx, bson.M{"uid": seqUid}).Decode(&sChat); err != nil {
 | 
				
			||||||
 | 
								log.NewError(operationID, "not find seqUid", seqUid, value, uid, seqList, err.Error())
 | 
				
			||||||
 | 
								continue
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							singleCount = 0
 | 
				
			||||||
 | 
							for i := 0; i < len(sChat.Msg); i++ {
 | 
				
			||||||
 | 
								msg := new(open_im_sdk.MsgData)
 | 
				
			||||||
 | 
								if err = proto.Unmarshal(sChat.Msg[i].Msg, msg); err != nil {
 | 
				
			||||||
 | 
									log.NewError(operationID, "Unmarshal err", seqUid, value, uid, seqList, err.Error())
 | 
				
			||||||
 | 
									return nil, err
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								if isContainInt32(msg.Seq, value) {
 | 
				
			||||||
 | 
									seqMsg = append(seqMsg, msg)
 | 
				
			||||||
 | 
									hasSeqList = append(hasSeqList, msg.Seq)
 | 
				
			||||||
 | 
									singleCount++
 | 
				
			||||||
 | 
									if singleCount == len(value) {
 | 
				
			||||||
 | 
										break
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if len(hasSeqList) != len(seqList) {
 | 
				
			||||||
 | 
							var diff []uint32
 | 
				
			||||||
 | 
							diff = utils.Difference(hasSeqList, seqList)
 | 
				
			||||||
 | 
							exceptionMSg := genExceptionMessageBySeqList(diff)
 | 
				
			||||||
 | 
							seqMsg = append(seqMsg, exceptionMSg...)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return seqMsg, nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func genExceptionMessageBySeqList(seqList []uint32) (exceptionMsg []*open_im_sdk.MsgData) {
 | 
					func genExceptionMessageBySeqList(seqList []uint32) (exceptionMsg []*open_im_sdk.MsgData) {
 | 
				
			||||||
	for _, v := range seqList {
 | 
						for _, v := range seqList {
 | 
				
			||||||
		msg := new(open_im_sdk.MsgData)
 | 
							msg := new(open_im_sdk.MsgData)
 | 
				
			||||||
@ -124,6 +183,37 @@ func genExceptionMessageBySeqList(seqList []uint32) (exceptionMsg []*open_im_sdk
 | 
				
			|||||||
	return exceptionMsg
 | 
						return exceptionMsg
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (d *DataBases) SaveUserChatMongo2(uid string, sendTime int64, m *pbMsg.MsgDataToDB) error {
 | 
				
			||||||
 | 
						ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
 | 
				
			||||||
 | 
						c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)
 | 
				
			||||||
 | 
						newTime := getCurrentTimestampByMill()
 | 
				
			||||||
 | 
						operationID := ""
 | 
				
			||||||
 | 
						seqUid := getSeqUid(uid, m.MsgData.Seq)
 | 
				
			||||||
 | 
						filter := bson.M{"uid": seqUid}
 | 
				
			||||||
 | 
						var err error
 | 
				
			||||||
 | 
						sMsg := MsgInfo{}
 | 
				
			||||||
 | 
						sMsg.SendTime = sendTime
 | 
				
			||||||
 | 
						if sMsg.Msg, err = proto.Marshal(m.MsgData); err != nil {
 | 
				
			||||||
 | 
							return  utils.Wrap(err,"")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						err = c.FindOneAndUpdate(ctx, filter, bson.M{"$push": bson.M{"msg": sMsg}}).Err()
 | 
				
			||||||
 | 
						log.NewDebug(operationID, "get mgoSession cost time", getCurrentTimestampByMill()-newTime)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							sChat := UserChat{}
 | 
				
			||||||
 | 
							sChat.UID = seqUid
 | 
				
			||||||
 | 
							sChat.Msg = append(sChat.Msg, sMsg)
 | 
				
			||||||
 | 
							if _, err = c.InsertOne(ctx, &sChat) ; err != nil{
 | 
				
			||||||
 | 
								log.NewDebug(operationID, "InsertOne failed", filter)
 | 
				
			||||||
 | 
								return utils.Wrap(err, "")
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}else{
 | 
				
			||||||
 | 
							log.NewDebug(operationID, "FindOneAndUpdate ok", filter)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						log.NewDebug(operationID, "find mgo uid cost time", getCurrentTimestampByMill()-newTime)
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (d *DataBases) SaveUserChat(uid string, sendTime int64, m *pbMsg.MsgDataToDB) error {
 | 
					func (d *DataBases) SaveUserChat(uid string, sendTime int64, m *pbMsg.MsgDataToDB) error {
 | 
				
			||||||
	var seqUid string
 | 
						var seqUid string
 | 
				
			||||||
	newTime := getCurrentTimestampByMill()
 | 
						newTime := getCurrentTimestampByMill()
 | 
				
			||||||
@ -163,115 +253,137 @@ func (d *DataBases) SaveUserChat(uid string, sendTime int64, m *pbMsg.MsgDataToD
 | 
				
			|||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (d *DataBases) DelUserChat(uid string) error {
 | 
					 | 
				
			||||||
	session := d.mgoSession.Clone()
 | 
					 | 
				
			||||||
	if session == nil {
 | 
					 | 
				
			||||||
		return errors.New("session == nil")
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	defer session.Close()
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	c := session.DB(config.Config.Mongo.DBDatabase).C(cChat)
 | 
					func (d *DataBases) DelUserChat(uid string) error {
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
						//session := d.mgoSession.Clone()
 | 
				
			||||||
 | 
						//if session == nil {
 | 
				
			||||||
 | 
						//	return errors.New("session == nil")
 | 
				
			||||||
 | 
						//}
 | 
				
			||||||
 | 
						//defer session.Close()
 | 
				
			||||||
 | 
						//
 | 
				
			||||||
 | 
						//c := session.DB(config.Config.Mongo.DBDatabase).C(cChat)
 | 
				
			||||||
 | 
						//
 | 
				
			||||||
 | 
						//delTime := time.Now().Unix() - int64(config.Config.Mongo.DBRetainChatRecords)*24*3600
 | 
				
			||||||
 | 
						//if err := c.Update(bson.M{"uid": uid}, bson.M{"$pull": bson.M{"msg": bson.M{"sendtime": bson.M{"$lte": delTime}}}}); err != nil {
 | 
				
			||||||
 | 
						//	return err
 | 
				
			||||||
 | 
						//}
 | 
				
			||||||
 | 
						//
 | 
				
			||||||
 | 
						//return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (d *DataBases) DelUserChatMongo2(uid string) error {
 | 
				
			||||||
 | 
						ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
 | 
				
			||||||
 | 
						c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)
 | 
				
			||||||
 | 
						filter := bson.M{"uid": uid}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	delTime := time.Now().Unix() - int64(config.Config.Mongo.DBRetainChatRecords)*24*3600
 | 
						delTime := time.Now().Unix() - int64(config.Config.Mongo.DBRetainChatRecords)*24*3600
 | 
				
			||||||
	if err := c.Update(bson.M{"uid": uid}, bson.M{"$pull": bson.M{"msg": bson.M{"sendtime": bson.M{"$lte": delTime}}}}); err != nil {
 | 
						if _, err := c.UpdateOne(ctx, filter, bson.M{"$pull": bson.M{"msg": bson.M{"sendtime": bson.M{"$lte": delTime}}}}); err != nil {
 | 
				
			||||||
		return err
 | 
							return utils.Wrap(err, "")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (d *DataBases) MgoUserCount() (int, error) {
 | 
					func (d *DataBases) MgoUserCount() (int, error) {
 | 
				
			||||||
	session := d.mgoSession.Clone()
 | 
						return 0, nil
 | 
				
			||||||
	if session == nil {
 | 
						//session := d.mgoSession.Clone()
 | 
				
			||||||
		return 0, errors.New("session == nil")
 | 
						//if session == nil {
 | 
				
			||||||
	}
 | 
						//	return 0, errors.New("session == nil")
 | 
				
			||||||
	defer session.Close()
 | 
						//}
 | 
				
			||||||
 | 
						//defer session.Close()
 | 
				
			||||||
	c := session.DB(config.Config.Mongo.DBDatabase).C(cChat)
 | 
						//
 | 
				
			||||||
 | 
						//c := session.DB(config.Config.Mongo.DBDatabase).C(cChat)
 | 
				
			||||||
	return c.Find(nil).Count()
 | 
						//
 | 
				
			||||||
 | 
						//return c.Find(nil).Count()
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (d *DataBases) MgoSkipUID(count int) (string, error) {
 | 
					func (d *DataBases) MgoSkipUID(count int) (string, error) {
 | 
				
			||||||
	session := d.mgoSession.Clone()
 | 
						return "", nil
 | 
				
			||||||
	if session == nil {
 | 
						//session := d.mgoSession.Clone()
 | 
				
			||||||
		return "", errors.New("session == nil")
 | 
						//if session == nil {
 | 
				
			||||||
	}
 | 
						//	return "", errors.New("session == nil")
 | 
				
			||||||
	defer session.Close()
 | 
						//}
 | 
				
			||||||
 | 
						//defer session.Close()
 | 
				
			||||||
	c := session.DB(config.Config.Mongo.DBDatabase).C(cChat)
 | 
						//
 | 
				
			||||||
 | 
						//c := session.DB(config.Config.Mongo.DBDatabase).C(cChat)
 | 
				
			||||||
	sChat := UserChat{}
 | 
						//
 | 
				
			||||||
	c.Find(nil).Skip(count).Limit(1).One(&sChat)
 | 
						//sChat := UserChat{}
 | 
				
			||||||
	return sChat.UID, nil
 | 
						//c.Find(nil).Skip(count).Limit(1).One(&sChat)
 | 
				
			||||||
 | 
						//return sChat.UID, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (d *DataBases) GetGroupMember(groupID string) []string {
 | 
					func (d *DataBases) GetGroupMember(groupID string) []string {
 | 
				
			||||||
	groupInfo := GroupMember_x{}
 | 
						return nil
 | 
				
			||||||
	groupInfo.GroupID = groupID
 | 
						//groupInfo := GroupMember_x{}
 | 
				
			||||||
	groupInfo.UIDList = make([]string, 0)
 | 
						//groupInfo.GroupID = groupID
 | 
				
			||||||
 | 
						//groupInfo.UIDList = make([]string, 0)
 | 
				
			||||||
	session := d.mgoSession.Clone()
 | 
						//
 | 
				
			||||||
	if session == nil {
 | 
						//session := d.mgoSession.Clone()
 | 
				
			||||||
		return groupInfo.UIDList
 | 
						//if session == nil {
 | 
				
			||||||
	}
 | 
						//	return groupInfo.UIDList
 | 
				
			||||||
	defer session.Close()
 | 
						//}
 | 
				
			||||||
 | 
						//defer session.Close()
 | 
				
			||||||
	c := session.DB(config.Config.Mongo.DBDatabase).C(cGroup)
 | 
						//
 | 
				
			||||||
 | 
						//c := session.DB(config.Config.Mongo.DBDatabase).C(cGroup)
 | 
				
			||||||
	if err := c.Find(bson.M{"groupid": groupInfo.GroupID}).One(&groupInfo); err != nil {
 | 
						//
 | 
				
			||||||
		return groupInfo.UIDList
 | 
						//if err := c.Find(bson.M{"groupid": groupInfo.GroupID}).One(&groupInfo); err != nil {
 | 
				
			||||||
	}
 | 
						//	return groupInfo.UIDList
 | 
				
			||||||
 | 
						//}
 | 
				
			||||||
	return groupInfo.UIDList
 | 
						//
 | 
				
			||||||
 | 
						//return groupInfo.UIDList
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (d *DataBases) AddGroupMember(groupID, uid string) error {
 | 
					func (d *DataBases) AddGroupMember(groupID, uid string) error {
 | 
				
			||||||
	session := d.mgoSession.Clone()
 | 
					 | 
				
			||||||
	if session == nil {
 | 
					 | 
				
			||||||
		return errors.New("session == nil")
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	defer session.Close()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	c := session.DB(config.Config.Mongo.DBDatabase).C(cGroup)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	n, err := c.Find(bson.M{"groupid": groupID}).Count()
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		return err
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	if n == 0 {
 | 
					 | 
				
			||||||
		groupInfo := GroupMember_x{}
 | 
					 | 
				
			||||||
		groupInfo.GroupID = groupID
 | 
					 | 
				
			||||||
		groupInfo.UIDList = append(groupInfo.UIDList, uid)
 | 
					 | 
				
			||||||
		err = c.Insert(&groupInfo)
 | 
					 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			return err
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	} else {
 | 
					 | 
				
			||||||
		err = c.Update(bson.M{"groupid": groupID}, bson.M{"$addToSet": bson.M{"uidlist": uid}})
 | 
					 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			return err
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
 | 
						//session := d.mgoSession.Clone()
 | 
				
			||||||
 | 
						//if session == nil {
 | 
				
			||||||
 | 
						//	return errors.New("session == nil")
 | 
				
			||||||
 | 
						//}
 | 
				
			||||||
 | 
						//defer session.Close()
 | 
				
			||||||
 | 
						//
 | 
				
			||||||
 | 
						//c := session.DB(config.Config.Mongo.DBDatabase).C(cGroup)
 | 
				
			||||||
 | 
						//
 | 
				
			||||||
 | 
						//n, err := c.Find(bson.M{"groupid": groupID}).Count()
 | 
				
			||||||
 | 
						//if err != nil {
 | 
				
			||||||
 | 
						//	return err
 | 
				
			||||||
 | 
						//}
 | 
				
			||||||
 | 
						//
 | 
				
			||||||
 | 
						//if n == 0 {
 | 
				
			||||||
 | 
						//	groupInfo := GroupMember_x{}
 | 
				
			||||||
 | 
						//	groupInfo.GroupID = groupID
 | 
				
			||||||
 | 
						//	groupInfo.UIDList = append(groupInfo.UIDList, uid)
 | 
				
			||||||
 | 
						//	err = c.Insert(&groupInfo)
 | 
				
			||||||
 | 
						//	if err != nil {
 | 
				
			||||||
 | 
						//		return err
 | 
				
			||||||
 | 
						//	}
 | 
				
			||||||
 | 
						//} else {
 | 
				
			||||||
 | 
						//	err = c.Update(bson.M{"groupid": groupID}, bson.M{"$addToSet": bson.M{"uidlist": uid}})
 | 
				
			||||||
 | 
						//	if err != nil {
 | 
				
			||||||
 | 
						//		return err
 | 
				
			||||||
 | 
						//	}
 | 
				
			||||||
 | 
						//}
 | 
				
			||||||
 | 
						//
 | 
				
			||||||
 | 
						//return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (d *DataBases) DelGroupMember(groupID, uid string) error {
 | 
					func (d *DataBases) DelGroupMember(groupID, uid string) error {
 | 
				
			||||||
	session := d.mgoSession.Clone()
 | 
					 | 
				
			||||||
	if session == nil {
 | 
					 | 
				
			||||||
		return errors.New("session == nil")
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	defer session.Close()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	c := session.DB(config.Config.Mongo.DBDatabase).C(cGroup)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	if err := c.Update(bson.M{"groupid": groupID}, bson.M{"$pull": bson.M{"uidlist": uid}}); err != nil {
 | 
					 | 
				
			||||||
		return err
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
 | 
						//session := d.mgoSession.Clone()
 | 
				
			||||||
 | 
						//if session == nil {
 | 
				
			||||||
 | 
						//	return errors.New("session == nil")
 | 
				
			||||||
 | 
						//}
 | 
				
			||||||
 | 
						//defer session.Close()
 | 
				
			||||||
 | 
						//
 | 
				
			||||||
 | 
						//c := session.DB(config.Config.Mongo.DBDatabase).C(cGroup)
 | 
				
			||||||
 | 
						//
 | 
				
			||||||
 | 
						//if err := c.Update(bson.M{"groupid": groupID}, bson.M{"$pull": bson.M{"uidlist": uid}}); err != nil {
 | 
				
			||||||
 | 
						//	return err
 | 
				
			||||||
 | 
						//}
 | 
				
			||||||
 | 
						//
 | 
				
			||||||
 | 
						//return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func getCurrentTimestampByMill() int64 {
 | 
					func getCurrentTimestampByMill() int64 {
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user