This commit is contained in:
wangchuxiao 2023-03-08 16:35:18 +08:00
parent ff6a503f53
commit ca1cdbdeb7
10 changed files with 305 additions and 98 deletions

View File

@ -2,9 +2,7 @@ package check
import ( import (
"OpenIM/pkg/common/config" "OpenIM/pkg/common/config"
"OpenIM/pkg/common/constant"
discoveryRegistry "OpenIM/pkg/discoveryregistry" discoveryRegistry "OpenIM/pkg/discoveryregistry"
"OpenIM/pkg/errs"
"OpenIM/pkg/proto/friend" "OpenIM/pkg/proto/friend"
sdkws "OpenIM/pkg/proto/sdkws" sdkws "OpenIM/pkg/proto/sdkws"
"context" "context"
@ -51,26 +49,15 @@ func (f *FriendChecker) IsFriend(ctx context.Context, possibleFriendUserID, user
} }
func (f *FriendChecker) GetAllPageFriends(ctx context.Context, ownerUserID string) (resp []*sdkws.FriendInfo, err error) { func (f *FriendChecker) GetFriendIDs(ctx context.Context, ownerUserID string) (friendIDs []string, err error) {
cc, err := f.getConn() cc, err := f.getConn()
if err != nil { if err != nil {
return nil, err return nil, err
} }
page := int32(0) req := friend.GetFriendIDsReq{UserID: ownerUserID}
req := friend.GetPaginationFriendsReq{UserID: ownerUserID} resp, err := friend.NewFriendClient(cc).GetFriendIDs(ctx, &req)
for { if err != nil {
req.Pagination = &sdkws.RequestPagination{PageNumber: page, ShowNumber: constant.ShowNumber} return nil, err
tmp, err := friend.NewFriendClient(cc).GetPaginationFriends(ctx, &req)
if err != nil {
return nil, err
}
if len(tmp.FriendsInfo) == 0 {
if tmp.Total == int32(len(resp)) {
return resp, nil
}
return nil, errs.ErrData.Wrap("The total number of results and expectations are different, but result is nil")
}
resp = append(resp, tmp.FriendsInfo...)
page++
} }
return resp.FriendIDs, err
} }

View File

@ -47,21 +47,3 @@ func (m *MsgCheck) PullMessageBySeqList(ctx context.Context, req *sdkws.PullMess
resp, err := msg.NewMsgClient(cc).PullMessageBySeqs(ctx, req) resp, err := msg.NewMsgClient(cc).PullMessageBySeqs(ctx, req)
return resp, err return resp, err
} }
//func (m *MsgCheck) SendMsg(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) {
// cc, err := m.getConn()
// if err != nil {
// return nil, err
// }
// resp, err := msg.NewMsgClient(cc).SendMsg(ctx, req)
// return resp, err
//}
//
//func (m *MsgCheck) SendMsg(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) {
// cc, err := m.getConn()
// if err != nil {
// return nil, err
// }
// resp, err := msg.NewMsgClient(cc).SendMsg(ctx, req)
// return resp, err
//}

View File

@ -24,7 +24,7 @@ func NewOnlineHistoryMongoConsumerHandler(database controller.MsgDatabase) *Onli
mc := &OnlineHistoryMongoConsumerHandler{ mc := &OnlineHistoryMongoConsumerHandler{
historyConsumerGroup: kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0, historyConsumerGroup: kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.MsgToMongo.Topic}, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.MsgToMongo.Topic},
config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo), config.Config.Kafka.MsgToMongo.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo),
msgDatabase: database, msgDatabase: database,
} }
return mc return mc

View File

@ -83,13 +83,13 @@ func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserI
if err != nil { if err != nil {
return nil, err return nil, err
} }
friends, err := s.friendCheck.GetAllPageFriends(ctx, req.UserInfo.UserID) friends, err := s.friendCheck.GetFriendIDs(ctx, req.UserInfo.UserID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
go func() { go func() {
for _, v := range friends { for _, v := range friends {
s.notification.FriendInfoUpdatedNotification(ctx, req.UserInfo.UserID, v.FriendUser.UserID, tracelog.GetOpUserID(ctx)) s.notification.FriendInfoUpdatedNotification(ctx, req.UserInfo.UserID, v, tracelog.GetOpUserID(ctx))
} }
}() }()
s.notification.UserInfoUpdatedNotification(ctx, tracelog.GetOpUserID(ctx), req.UserInfo.UserID) s.notification.UserInfoUpdatedNotification(ctx, tracelog.GetOpUserID(ctx), req.UserInfo.UserID)

View File

@ -11,6 +11,7 @@ import (
"OpenIM/pkg/common/tracelog" "OpenIM/pkg/common/tracelog"
"OpenIM/pkg/utils" "OpenIM/pkg/utils"
"context" "context"
"errors"
"fmt" "fmt"
"github.com/go-redis/redis/v8" "github.com/go-redis/redis/v8"
"math" "math"
@ -22,6 +23,8 @@ type MsgTool struct {
groupDatabase controller.GroupDatabase groupDatabase controller.GroupDatabase
} }
var errSeq = errors.New("cache max seq and mongo max seq is diff > 10")
func NewMsgTool(msgDatabase controller.MsgDatabase, userDatabase controller.UserDatabase, groupDatabase controller.GroupDatabase) *MsgTool { func NewMsgTool(msgDatabase controller.MsgDatabase, userDatabase controller.UserDatabase, groupDatabase controller.GroupDatabase) *MsgTool {
return &MsgTool{ return &MsgTool{
msgDatabase: msgDatabase, msgDatabase: msgDatabase,
@ -125,7 +128,9 @@ func (c *MsgTool) fixGroupSeq(ctx context.Context, groupID string, userIDs []str
continue continue
} }
} }
c.CheckMaxSeqWithMongo(ctx, groupID, maxSeqCache, maxSeqMongo, constant.WriteDiffusion) if err := c.CheckMaxSeqWithMongo(ctx, groupID, maxSeqCache, maxSeqMongo, constant.WriteDiffusion); err != nil {
log.NewWarn(tracelog.GetOperationID(ctx), "cache max seq and mongo max seq is diff > 10", groupID, maxSeqCache, maxSeqMongo, constant.WriteDiffusion)
}
return nil return nil
} }
@ -162,10 +167,11 @@ func (c *MsgTool) GetAndFixGroupUserSeq(ctx context.Context, userID string, grou
return minSeqCache, nil return minSeqCache, nil
} }
func (c *MsgTool) CheckMaxSeqWithMongo(ctx context.Context, sourceID string, maxSeqCache, maxSeqMongo int64, diffusionType int) { func (c *MsgTool) CheckMaxSeqWithMongo(ctx context.Context, sourceID string, maxSeqCache, maxSeqMongo int64, diffusionType int) error {
if math.Abs(float64(maxSeqMongo-maxSeqCache)) > 10 { if math.Abs(float64(maxSeqMongo-maxSeqCache)) > 10 {
log.NewWarn(tracelog.GetOperationID(ctx), "cache max seq and mongo max seq is diff > 10", sourceID, maxSeqCache, maxSeqMongo, diffusionType) return errSeq
} }
return nil
} }
func (c *MsgTool) ShowUserSeqs(ctx context.Context, userID string) { func (c *MsgTool) ShowUserSeqs(ctx context.Context, userID string) {

View File

@ -2,22 +2,24 @@ package tools
import ( import (
"OpenIM/pkg/common/constant" "OpenIM/pkg/common/constant"
"OpenIM/pkg/common/db/cache"
"OpenIM/pkg/common/tracelog"
"OpenIM/pkg/proto/sdkws" "OpenIM/pkg/proto/sdkws"
"OpenIM/pkg/utils"
"context" "context"
"fmt" "github.com/golang/protobuf/proto"
"go.mongodb.org/mongo-driver/bson"
"strconv" "strconv"
"github.com/go-redis/redis/v8" unRelationTb "OpenIM/pkg/common/db/table/unrelation"
"github.com/golang/protobuf/proto" "OpenIM/pkg/common/db/unrelation"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"testing" "testing"
"time" "time"
) )
func GenUserChat(startSeq, stopSeq, delSeq, index int64, userID string) *mongo2.UserChat { func GenMsgDoc(startSeq, stopSeq, delSeq, index int64, userID string) *unRelationTb.MsgDocModel {
chat := &mongo2.UserChat{UID: userID + strconv.Itoa(int(index))} msgDoc := &unRelationTb.MsgDocModel{DocID: userID + strconv.Itoa(int(index))}
for i := startSeq; i <= stopSeq; i++ { for i := startSeq; i <= stopSeq; i++ {
msg := sdkws.MsgData{ msg := sdkws.MsgData{
SendID: "sendID1", SendID: "sendID1",
@ -31,57 +33,281 @@ func GenUserChat(startSeq, stopSeq, delSeq, index int64, userID string) *mongo2.
SessionType: 1, SessionType: 1,
MsgFrom: 100, MsgFrom: 100,
ContentType: 101, ContentType: 101,
Content: []byte("testFaceURL.com"), Content: []byte("testFaceURL"),
Seq: i, Seq: i,
SendTime: time.Now().Unix(), SendTime: time.Now().Unix(),
CreateTime: time.Now().Unix(), CreateTime: time.Now().Unix(),
Status: 1, Status: 1,
} }
bytes, _ := proto.Marshal(&msg) bytes, _ := proto.Marshal(&msg)
sendTime := 0 var sendTime int64
chat.Msg = append(chat.Msg, mongo2.MsgInfo{SendTime: int64(sendTime), Msg: bytes}) if i <= delSeq {
sendTime = 10000
} else {
sendTime = utils.GetCurrentTimestampByMill()
}
msgDoc.Msg = append(msgDoc.Msg, unRelationTb.MsgInfoModel{SendTime: int64(sendTime), Msg: bytes})
} }
return chat return msgDoc
} }
func SetUserMaxSeq(userID string, seq int) error { func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) {
return redisClient.Set(context.Background(), "REDIS_USER_INCR_SEQ"+userID, seq, 0).Err() operationID := "test"
}
func CreateChat(userChat *mongo2.UserChat) error { rdb, err := cache.NewRedis()
_, err := mongoClient.InsertOne(context.Background(), userChat)
return err
}
func TestDeleteUserMsgsAndSetMinSeq(t *testing.T) {
operationID := getCronTaskOperationID()
redisClient = redis.NewClient(&redis.Options{
Addr: "127.0.0.1:16379",
Password: "openIM123", // no password set
DB: 13, // use default DB
})
mongoUri := fmt.Sprintf("mongodb://%s:%s@%s/%s?maxPoolSize=%d&authSource=admin",
"root", "openIM123", "127.0.0.1:37017",
"openIM", 100)
client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(mongoUri))
mongoClient = client.Database("openIM").Collection("msg")
testUID1 := "test_del_id1"
//testUID2 := "test_del_id2"
//testUID3 := "test_del_id3"
//testUID4 := "test_del_id4"
//testUID5 := "test_del_id5"
//testUID6 := "test_del_id6"
err = SetUserMaxSeq(testUID1, 600)
userChat := GenUserChat(1, 500, 200, 0, testUID1)
err = CreateChat(userChat)
if err := DeleteUserMsgsAndSetMinSeq(operationID, testUID1); err != nil {
t.Error("checkMaxSeqWithMongo failed", testUID1)
}
if err := checkMaxSeqWithMongo(operationID, testUID1, constant.WriteDiffusion); err != nil {
t.Error("checkMaxSeqWithMongo failed", testUID1)
}
if err != nil { if err != nil {
t.Error("err is not nil", testUID1, err.Error()) return
}
mgo, err := unrelation.NewMongo()
if err != nil {
return
}
cacheModel := cache.NewCacheModel(rdb)
mongoClient := mgo.GetDatabase().Collection(unRelationTb.MsgDocModel{}.TableName())
ctx := context.Background()
tracelog.SetOperationID(ctx, operationID)
testUID1 := "test_del_id1"
_, err = mongoClient.DeleteOne(ctx, bson.M{"uid": testUID1 + ":" + strconv.Itoa(0)})
if err != nil {
t.Error("DeleteOne failed")
return
}
err = cacheModel.SetUserMaxSeq(ctx, testUID1, 600)
if err != nil {
t.Error("SetUserMaxSeq failed")
}
msgDoc := GenMsgDoc(1, 600, 200, 0, testUID1)
if _, err := mongoClient.InsertOne(ctx, msgDoc); err != nil {
t.Error("InsertOne failed", testUID1)
}
msgTools, err := InitMsgTool()
if err != nil {
t.Error("init failed")
return
}
msgTools.ClearUsersMsg(ctx, []string{testUID1})
minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err := msgTools.msgDatabase.GetUserMinMaxSeqInMongoAndCache(ctx, testUID1)
if err != nil {
t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed")
return
}
if err := msgTools.CheckMaxSeqWithMongo(ctx, testUID1, maxSeqCache, maxSeqMongo, constant.WriteDiffusion); err != nil {
t.Error("checkMaxSeqWithMongo failed", testUID1)
}
if minSeqMongo != minSeqCache {
t.Error("minSeqMongo != minSeqCache", minSeqMongo, minSeqCache)
}
if minSeqCache != 201 {
t.Error("test1 is not the same", "minSeq:", minSeqCache, "targetSeq", 201)
}
/////// uid2
testUID2 := "test_del_id2"
_, err = mongoClient.DeleteOne(ctx, bson.M{"uid": testUID2 + ":" + strconv.Itoa(0)})
if err != nil {
t.Error("delete failed")
}
_, err = mongoClient.DeleteOne(ctx, bson.M{"uid": testUID2 + ":" + strconv.Itoa(1)})
if err != nil {
t.Error("delete failed")
}
err = cacheModel.SetUserMaxSeq(ctx, testUID2, 7000)
if err != nil {
t.Error("SetUserMaxSeq failed")
}
msgDoc = GenMsgDoc(1, 4999, 5000, 0, testUID2)
msgDoc2 := GenMsgDoc(5000, 7000, 6000, 1, testUID2)
if _, err := mongoClient.InsertOne(ctx, msgDoc); err != nil {
t.Error("InsertOne failed", testUID1)
}
if _, err := mongoClient.InsertOne(ctx, msgDoc2); err != nil {
t.Error("InsertOne failed", testUID1)
}
msgTools.ClearUsersMsg(ctx, []string{testUID2})
minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err = msgTools.msgDatabase.GetUserMinMaxSeqInMongoAndCache(ctx, testUID2)
if err != nil {
t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed")
return
}
if err := msgTools.CheckMaxSeqWithMongo(ctx, testUID2, maxSeqCache, maxSeqMongo, constant.WriteDiffusion); err != nil {
t.Error("checkMaxSeqWithMongo failed", testUID2)
}
if minSeqMongo != minSeqCache {
t.Error("minSeqMongo != minSeqCache", minSeqMongo, minSeqCache)
}
if minSeqCache != 6001 {
t.Error("test1 is not the same", "minSeq:", minSeqCache, "targetSeq", 201)
}
/////// uid3
testUID3 := "test_del_id3"
_, err = mongoClient.DeleteOne(ctx, bson.M{"uid": testUID3 + ":" + strconv.Itoa(0)})
if err != nil {
t.Error("delete failed")
}
err = cacheModel.SetUserMaxSeq(ctx, testUID3, 4999)
if err != nil {
t.Error("SetUserMaxSeq failed")
}
msgDoc = GenMsgDoc(1, 4999, 5000, 0, testUID3)
if _, err := mongoClient.InsertOne(ctx, msgDoc); err != nil {
t.Error("InsertOne failed", testUID3)
}
msgTools.ClearUsersMsg(ctx, []string{testUID3})
minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err = msgTools.msgDatabase.GetUserMinMaxSeqInMongoAndCache(ctx, testUID3)
if err != nil {
t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed")
return
}
if err := msgTools.CheckMaxSeqWithMongo(ctx, testUID3, maxSeqCache, maxSeqMongo, constant.WriteDiffusion); err != nil {
t.Error("checkMaxSeqWithMongo failed", testUID3)
}
if minSeqMongo != minSeqCache {
t.Error("minSeqMongo != minSeqCache", minSeqMongo, minSeqCache)
}
if minSeqCache != 5000 {
t.Error("test1 is not the same", "minSeq:", minSeqCache, "targetSeq", 201)
}
//// uid4
testUID4 := "test_del_id4"
_, err = mongoClient.DeleteOne(ctx, bson.M{"uid": testUID4 + ":" + strconv.Itoa(0)})
if err != nil {
t.Error("delete failed")
}
_, err = mongoClient.DeleteOne(ctx, bson.M{"uid": testUID4 + ":" + strconv.Itoa(1)})
if err != nil {
t.Error("delete failed")
}
_, err = mongoClient.DeleteOne(ctx, bson.M{"uid": testUID4 + ":" + strconv.Itoa(2)})
if err != nil {
t.Error("delete failed")
}
err = cacheModel.SetUserMaxSeq(ctx, testUID4, 12000)
msgDoc = GenMsgDoc(1, 4999, 5000, 0, testUID4)
msgDoc2 = GenMsgDoc(5000, 9999, 10000, 1, testUID4)
msgDoc3 := GenMsgDoc(10000, 12000, 11000, 2, testUID4)
if _, err := mongoClient.InsertOne(ctx, msgDoc); err != nil {
t.Error("InsertOne failed", testUID4)
}
if _, err := mongoClient.InsertOne(ctx, msgDoc2); err != nil {
t.Error("InsertOne failed", testUID4)
}
if _, err := mongoClient.InsertOne(ctx, msgDoc3); err != nil {
t.Error("InsertOne failed", testUID4)
}
msgTools.ClearUsersMsg(ctx, []string{testUID4})
if err != nil {
t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed")
return
}
minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err = msgTools.msgDatabase.GetUserMinMaxSeqInMongoAndCache(ctx, testUID4)
if err != nil {
t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed")
return
}
if err := msgTools.CheckMaxSeqWithMongo(ctx, testUID4, maxSeqCache, maxSeqMongo, constant.WriteDiffusion); err != nil {
t.Error("checkMaxSeqWithMongo failed", testUID4)
}
if minSeqMongo != minSeqCache {
t.Error("minSeqMongo != minSeqCache", minSeqMongo, minSeqCache)
}
if minSeqCache != 5000 {
t.Error("test1 is not the same", "minSeq:", minSeqCache)
}
testUID5 := "test_del_id5"
_, err = mongoClient.DeleteOne(ctx, bson.M{"uid": testUID5 + ":" + strconv.Itoa(0)})
if err != nil {
t.Error("delete failed")
}
_, err = mongoClient.DeleteOne(ctx, bson.M{"uid": testUID5 + ":" + strconv.Itoa(1)})
if err != nil {
t.Error("delete failed")
}
err = cacheModel.SetUserMaxSeq(ctx, testUID5, 9999)
msgDoc = GenMsgDoc(1, 4999, 5000, 0, testUID5)
msgDoc2 = GenMsgDoc(5000, 9999, 10000, 1, testUID5)
if _, err := mongoClient.InsertOne(ctx, msgDoc); err != nil {
t.Error("InsertOne failed", testUID5)
}
if _, err := mongoClient.InsertOne(ctx, msgDoc2); err != nil {
t.Error("InsertOne failed", testUID5)
}
msgTools.ClearUsersMsg(ctx, []string{testUID5})
if err != nil {
t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed")
return
}
minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err = msgTools.msgDatabase.GetUserMinMaxSeqInMongoAndCache(ctx, testUID5)
if err != nil {
t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed")
return
}
if err := msgTools.CheckMaxSeqWithMongo(ctx, testUID5, maxSeqCache, maxSeqMongo, constant.WriteDiffusion); err != nil {
t.Error("checkMaxSeqWithMongo failed", testUID5)
}
if minSeqMongo != minSeqCache {
t.Error("minSeqMongo != minSeqCache", minSeqMongo, minSeqCache)
}
if minSeqCache != 10000 {
t.Error("test1 is not the same", "minSeq:", minSeqCache)
}
testUID6 := "test_del_id6"
_, err = mongoClient.DeleteOne(ctx, bson.M{"uid": testUID6 + ":" + strconv.Itoa(0)})
if err != nil {
t.Error("delete failed")
}
_, err = mongoClient.DeleteOne(ctx, bson.M{"uid": testUID6 + ":" + strconv.Itoa(1)})
if err != nil {
t.Error("delete failed")
}
_, err = mongoClient.DeleteOne(ctx, bson.M{"uid": testUID6 + ":" + strconv.Itoa(2)})
if err != nil {
t.Error("delete failed")
}
_, err = mongoClient.DeleteOne(ctx, bson.M{"uid": testUID6 + ":" + strconv.Itoa(3)})
if err != nil {
t.Error("delete failed")
}
msgDoc = GenMsgDoc(1, 4999, 5000, 0, testUID6)
msgDoc2 = GenMsgDoc(5000, 9999, 10000, 1, testUID6)
msgDoc3 = GenMsgDoc(10000, 14999, 13000, 2, testUID6)
msgDoc4 := GenMsgDoc(15000, 19999, 0, 3, testUID6)
if _, err := mongoClient.InsertOne(ctx, msgDoc); err != nil {
t.Error("InsertOne failed", testUID4)
}
if _, err := mongoClient.InsertOne(ctx, msgDoc2); err != nil {
t.Error("InsertOne failed", testUID4)
}
if _, err := mongoClient.InsertOne(ctx, msgDoc3); err != nil {
t.Error("InsertOne failed", testUID4)
}
if _, err := mongoClient.InsertOne(ctx, msgDoc4); err != nil {
t.Error("InsertOne failed", testUID4)
}
minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err = msgTools.msgDatabase.GetUserMinMaxSeqInMongoAndCache(ctx, testUID6)
if err != nil {
t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed")
return
}
if err := msgTools.CheckMaxSeqWithMongo(ctx, testUID6, maxSeqCache, maxSeqMongo, constant.WriteDiffusion); err != nil {
t.Error("checkMaxSeqWithMongo failed", testUID6)
}
if minSeqMongo != minSeqCache {
t.Error("minSeqMongo != minSeqCache", minSeqMongo, minSeqCache)
}
if minSeqCache != 13001 {
t.Error("test1 is not the same", "minSeq:", minSeqCache)
} }
} }

View File

@ -5,6 +5,7 @@ import (
"context" "context"
) )
// for mongoDB
type ExtendMsgDatabase interface { type ExtendMsgDatabase interface {
CreateExtendMsgSet(ctx context.Context, set *unRelationTb.ExtendMsgSetModel) error CreateExtendMsgSet(ctx context.Context, set *unRelationTb.ExtendMsgSetModel) error
GetAllExtendMsgSet(ctx context.Context, ID string, opts *unRelationTb.GetAllExtendMsgSetOpts) (sets []*unRelationTb.ExtendMsgSetModel, err error) GetAllExtendMsgSet(ctx context.Context, ID string, opts *unRelationTb.GetAllExtendMsgSetOpts) (sets []*unRelationTb.ExtendMsgSetModel, err error)

View File

@ -1,10 +1,12 @@
package controller package controller
import ( import (
"OpenIM/pkg/common/config"
"OpenIM/pkg/common/constant" "OpenIM/pkg/common/constant"
"OpenIM/pkg/common/db/cache" "OpenIM/pkg/common/db/cache"
unRelationTb "OpenIM/pkg/common/db/table/unrelation" unRelationTb "OpenIM/pkg/common/db/table/unrelation"
"OpenIM/pkg/common/db/unrelation" "OpenIM/pkg/common/db/unrelation"
"OpenIM/pkg/common/kafka"
"OpenIM/pkg/common/log" "OpenIM/pkg/common/log"
"OpenIM/pkg/common/prome" "OpenIM/pkg/common/prome"
"OpenIM/pkg/common/tracelog" "OpenIM/pkg/common/tracelog"
@ -68,7 +70,7 @@ type MsgDatabase interface {
DeleteReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error DeleteReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error
SetSendMsgStatus(ctx context.Context, id string, status int32) error SetSendMsgStatus(ctx context.Context, id string, status int32) error
GetSendMsgStatus(ctx context.Context, id string) (int32, error) GetSendMsgStatus(ctx context.Context, id string) (int32, error)
MsgToMQ(ctx context.Context, key string, mq *pbMsg.MsgDataToMQ) error MsgToMQ(ctx context.Context, key string, msg2mq *pbMsg.MsgDataToMQ) error
GetUserMaxSeq(ctx context.Context, userID string) (int64, error) GetUserMaxSeq(ctx context.Context, userID string) (int64, error)
GetUserMinSeq(ctx context.Context, userID string) (int64, error) GetUserMinSeq(ctx context.Context, userID string) (int64, error)
GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error)
@ -79,6 +81,7 @@ func NewMsgDatabase(msgDocModel unRelationTb.MsgDocModelInterface, cacheModel ca
return &msgDatabase{ return &msgDatabase{
msgDocDatabase: msgDocModel, msgDocDatabase: msgDocModel,
cache: cacheModel, cache: cacheModel,
producer: kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.Ws2mschat.Topic),
} }
} }
@ -93,6 +96,8 @@ type msgDatabase struct {
msgDocDatabase unRelationTb.MsgDocModelInterface msgDocDatabase unRelationTb.MsgDocModelInterface
extendMsgDatabase unRelationTb.ExtendMsgSetModelInterface extendMsgDatabase unRelationTb.ExtendMsgSetModelInterface
cache cache.Model cache cache.Model
producer *kafka.Producer
// model
msg unRelationTb.MsgDocModel msg unRelationTb.MsgDocModel
extendMsgSetModel unRelationTb.ExtendMsgSetModel extendMsgSetModel unRelationTb.ExtendMsgSetModel
} }
@ -165,9 +170,9 @@ func (db *msgDatabase) GetSendMsgStatus(ctx context.Context, id string) (int32,
return db.cache.GetSendMsgStatus(ctx, id) return db.cache.GetSendMsgStatus(ctx, id)
} }
func (db *msgDatabase) MsgToMQ(ctx context.Context, key string, mq *pbMsg.MsgDataToMQ) error { func (db *msgDatabase) MsgToMQ(ctx context.Context, key string, msg2mq *pbMsg.MsgDataToMQ) error {
//TODO implement me _, _, err := db.producer.SendMessage(ctx, key, msg2mq)
panic("implement me") return err
} }
func (db *msgDatabase) GetUserMaxSeq(ctx context.Context, userID string) (int64, error) { func (db *msgDatabase) GetUserMaxSeq(ctx context.Context, userID string) (int64, error) {

View File

@ -10,7 +10,7 @@ import (
const ( const (
singleGocMsgNum = 5000 singleGocMsgNum = 5000
CChat = "msg" msg = "msg"
OldestList = 0 OldestList = 0
NewestList = -1 NewestList = -1
) )
@ -38,7 +38,7 @@ type MsgDocModelInterface interface {
} }
func (MsgDocModel) TableName() string { func (MsgDocModel) TableName() string {
return CChat return msg
} }
func (MsgDocModel) GetSingleGocMsgNum() int64 { func (MsgDocModel) GetSingleGocMsgNum() int64 {

View File

@ -46,7 +46,7 @@ func NewKafkaProducer(addr []string, topic string) *Producer {
return &p return &p
} }
func (p *Producer) SendMessage(ctx context.Context, m proto.Message, key string) (int32, int64, error) { func (p *Producer) SendMessage(ctx context.Context, key string, m proto.Message) (int32, int64, error) {
operationID := tracelog.GetOperationID(ctx) operationID := tracelog.GetOperationID(ctx)
log.Info(operationID, "SendMessage", "key ", key, m.String(), p.producer) log.Info(operationID, "SendMessage", "key ", key, m.String(), p.producer)
kMsg := &sarama.ProducerMessage{} kMsg := &sarama.ProducerMessage{}