mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-12-03 02:42:19 +08:00
Merge branch 'openimsdk:main' into fix/script
This commit is contained in:
commit
916658837e
@ -53,12 +53,12 @@
|
|||||||
|
|
||||||
## :busts_in_silhouette: Community
|
## :busts_in_silhouette: Community
|
||||||
|
|
||||||
+ 📚 [OpenIM Community](https://github.com/OpenIMSDK/community)
|
+ 💬 [Follow our Twitter account](https://twitter.com/founder_im63606)
|
||||||
+ 💕 [OpenIM Interest Group](https://github.com/Openim-sigs)
|
+ 👫 [Join our Reddit](https://www.reddit.com/r/OpenIMessaging)
|
||||||
+ 🚀 [Join our Slack community](https://join.slack.com/t/openimsdk/shared_invite/zt-22720d66b-o_FvKxMTGXtcnnnHiMqe9Q)
|
+ 🚀 [Join our Slack community](https://join.slack.com/t/openimsdk/shared_invite/zt-22720d66b-o_FvKxMTGXtcnnnHiMqe9Q)
|
||||||
+ :eyes: [Join our wechat (微信群)](https://openim-1253691595.cos.ap-nanjing.myqcloud.com/WechatIMG20.jpeg)
|
+ :eyes: [Join our wechat (微信群)](https://openim-1253691595.cos.ap-nanjing.myqcloud.com/WechatIMG20.jpeg)
|
||||||
+ 👫 [Join our Reddit](https://www.reddit.com/r/OpenIMessaging)
|
+ 📚 [OpenIM Community](https://github.com/OpenIMSDK/community)
|
||||||
+ 💬 [Follow our Twitter account](https://twitter.com/founder_im63606)
|
+ 💕 [OpenIM Interest Group](https://github.com/Openim-sigs)
|
||||||
|
|
||||||
## Ⓜ️ About OpenIM
|
## Ⓜ️ About OpenIM
|
||||||
|
|
||||||
|
|||||||
@ -78,16 +78,17 @@ type Client struct {
|
|||||||
token string
|
token string
|
||||||
}
|
}
|
||||||
|
|
||||||
func newClient(ctx *UserConnContext, conn LongConn, isCompress bool) *Client {
|
// function not used
|
||||||
return &Client{
|
// func newClient(ctx *UserConnContext, conn LongConn, isCompress bool) *Client {
|
||||||
w: new(sync.Mutex),
|
// return &Client{
|
||||||
conn: conn,
|
// w: new(sync.Mutex),
|
||||||
PlatformID: utils.StringToInt(ctx.GetPlatformID()),
|
// conn: conn,
|
||||||
IsCompress: isCompress,
|
// PlatformID: utils.StringToInt(ctx.GetPlatformID()),
|
||||||
UserID: ctx.GetUserID(),
|
// IsCompress: isCompress,
|
||||||
ctx: ctx,
|
// UserID: ctx.GetUserID(),
|
||||||
}
|
// ctx: ctx,
|
||||||
}
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
// ResetClient updates the client's state with new connection and context information.
|
// ResetClient updates the client's state with new connection and context information.
|
||||||
func (c *Client) ResetClient(
|
func (c *Client) ResetClient(
|
||||||
|
|||||||
@ -108,10 +108,12 @@ func (d *GWebSocket) Dial(urlStr string, requestHeader http.Header) (*http.Respo
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (d *GWebSocket) IsNil() bool {
|
func (d *GWebSocket) IsNil() bool {
|
||||||
if d.conn != nil {
|
return d.conn == nil
|
||||||
return false
|
//
|
||||||
}
|
// if d.conn != nil {
|
||||||
return true
|
// return false
|
||||||
|
// }
|
||||||
|
// return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *GWebSocket) SetConnNil() {
|
func (d *GWebSocket) SetConnNil() {
|
||||||
|
|||||||
@ -61,11 +61,12 @@ type LongConnServer interface {
|
|||||||
MessageHandler
|
MessageHandler
|
||||||
}
|
}
|
||||||
|
|
||||||
var bufferPool = sync.Pool{
|
// bufferPool is unused
|
||||||
New: func() any {
|
// var bufferPool = sync.Pool{
|
||||||
return make([]byte, 1024)
|
// New: func() any {
|
||||||
},
|
// return make([]byte, 1024)
|
||||||
}
|
// },
|
||||||
|
// }
|
||||||
|
|
||||||
type WsServer struct {
|
type WsServer struct {
|
||||||
port int
|
port int
|
||||||
|
|||||||
@ -58,12 +58,12 @@ func (u *UserMap) Get(key string, platformID int) ([]*Client, bool, bool) {
|
|||||||
func (u *UserMap) Set(key string, v *Client) {
|
func (u *UserMap) Set(key string, v *Client) {
|
||||||
allClients, existed := u.m.Load(key)
|
allClients, existed := u.m.Load(key)
|
||||||
if existed {
|
if existed {
|
||||||
log.ZDebug(context.Background(), "Set existed", "user_id", key, "client", *v)
|
log.ZDebug(context.Background(), "Set existed", "user_id", key, "client_user_id", v.UserID)
|
||||||
oldClients := allClients.([]*Client)
|
oldClients := allClients.([]*Client)
|
||||||
oldClients = append(oldClients, v)
|
oldClients = append(oldClients, v)
|
||||||
u.m.Store(key, oldClients)
|
u.m.Store(key, oldClients)
|
||||||
} else {
|
} else {
|
||||||
log.ZDebug(context.Background(), "Set not existed", "user_id", key, "client", *v)
|
log.ZDebug(context.Background(), "Set not existed", "user_id", key, "client_user_id", v.UserID)
|
||||||
var clients []*Client
|
var clients []*Client
|
||||||
clients = append(clients, v)
|
clients = append(clients, v)
|
||||||
u.m.Store(key, clients)
|
u.m.Store(key, clients)
|
||||||
|
|||||||
@ -71,7 +71,7 @@ func StartTransfer(prometheusPort int) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := client.CreateRpcRootNodes(config.Config.GetServiceNames()); err != nil {
|
if err2 := client.CreateRpcRootNodes(config.Config.GetServiceNames()); err2 != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
|
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
|
||||||
|
|||||||
@ -74,10 +74,10 @@ type OnlineHistoryRedisConsumerHandler struct {
|
|||||||
chArrays [ChannelNum]chan Cmd2Value
|
chArrays [ChannelNum]chan Cmd2Value
|
||||||
msgDistributionCh chan Cmd2Value
|
msgDistributionCh chan Cmd2Value
|
||||||
|
|
||||||
singleMsgSuccessCount uint64
|
// singleMsgSuccessCount uint64
|
||||||
singleMsgFailedCount uint64
|
// singleMsgFailedCount uint64
|
||||||
singleMsgSuccessCountMutex sync.Mutex
|
// singleMsgSuccessCountMutex sync.Mutex
|
||||||
singleMsgFailedCountMutex sync.Mutex
|
// singleMsgFailedCountMutex sync.Mutex
|
||||||
|
|
||||||
msgDatabase controller.CommonMsgDatabase
|
msgDatabase controller.CommonMsgDatabase
|
||||||
conversationRpcClient *rpcclient.ConversationRpcClient
|
conversationRpcClient *rpcclient.ConversationRpcClient
|
||||||
@ -111,62 +111,59 @@ func NewOnlineHistoryRedisConsumerHandler(
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
|
func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
|
||||||
for {
|
for cmd := range och.chArrays[channelID] {
|
||||||
select {
|
switch cmd.Cmd {
|
||||||
case cmd := <-och.chArrays[channelID]:
|
case SourceMessages:
|
||||||
switch cmd.Cmd {
|
msgChannelValue := cmd.Value.(MsgChannelValue)
|
||||||
case SourceMessages:
|
ctxMsgList := msgChannelValue.ctxMsgList
|
||||||
msgChannelValue := cmd.Value.(MsgChannelValue)
|
ctx := msgChannelValue.ctx
|
||||||
ctxMsgList := msgChannelValue.ctxMsgList
|
log.ZDebug(
|
||||||
ctx := msgChannelValue.ctx
|
ctx,
|
||||||
log.ZDebug(
|
"msg arrived channel",
|
||||||
|
"channel id",
|
||||||
|
channelID,
|
||||||
|
"msgList length",
|
||||||
|
len(ctxMsgList),
|
||||||
|
"uniqueKey",
|
||||||
|
msgChannelValue.uniqueKey,
|
||||||
|
)
|
||||||
|
storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList, modifyMsgList := och.getPushStorageMsgList(
|
||||||
|
ctxMsgList,
|
||||||
|
)
|
||||||
|
log.ZDebug(
|
||||||
|
ctx,
|
||||||
|
"msg lens",
|
||||||
|
"storageMsgList",
|
||||||
|
len(storageMsgList),
|
||||||
|
"notStorageMsgList",
|
||||||
|
len(notStorageMsgList),
|
||||||
|
"storageNotificationList",
|
||||||
|
len(storageNotificationList),
|
||||||
|
"notStorageNotificationList",
|
||||||
|
len(notStorageNotificationList),
|
||||||
|
"modifyMsgList",
|
||||||
|
len(modifyMsgList),
|
||||||
|
)
|
||||||
|
conversationIDMsg := msgprocessor.GetChatConversationIDByMsg(ctxMsgList[0].message)
|
||||||
|
conversationIDNotification := msgprocessor.GetNotificationConversationIDByMsg(ctxMsgList[0].message)
|
||||||
|
och.handleMsg(ctx, msgChannelValue.uniqueKey, conversationIDMsg, storageMsgList, notStorageMsgList)
|
||||||
|
och.handleNotification(
|
||||||
|
ctx,
|
||||||
|
msgChannelValue.uniqueKey,
|
||||||
|
conversationIDNotification,
|
||||||
|
storageNotificationList,
|
||||||
|
notStorageNotificationList,
|
||||||
|
)
|
||||||
|
if err := och.msgDatabase.MsgToModifyMQ(ctx, msgChannelValue.uniqueKey, conversationIDNotification, modifyMsgList); err != nil {
|
||||||
|
log.ZError(
|
||||||
ctx,
|
ctx,
|
||||||
"msg arrived channel",
|
"msg to modify mq error",
|
||||||
"channel id",
|
err,
|
||||||
channelID,
|
|
||||||
"msgList length",
|
|
||||||
len(ctxMsgList),
|
|
||||||
"uniqueKey",
|
"uniqueKey",
|
||||||
msgChannelValue.uniqueKey,
|
msgChannelValue.uniqueKey,
|
||||||
)
|
|
||||||
storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList, modifyMsgList := och.getPushStorageMsgList(
|
|
||||||
ctxMsgList,
|
|
||||||
)
|
|
||||||
log.ZDebug(
|
|
||||||
ctx,
|
|
||||||
"msg lens",
|
|
||||||
"storageMsgList",
|
|
||||||
len(storageMsgList),
|
|
||||||
"notStorageMsgList",
|
|
||||||
len(notStorageMsgList),
|
|
||||||
"storageNotificationList",
|
|
||||||
len(storageNotificationList),
|
|
||||||
"notStorageNotificationList",
|
|
||||||
len(notStorageNotificationList),
|
|
||||||
"modifyMsgList",
|
"modifyMsgList",
|
||||||
len(modifyMsgList),
|
modifyMsgList,
|
||||||
)
|
)
|
||||||
conversationIDMsg := msgprocessor.GetChatConversationIDByMsg(ctxMsgList[0].message)
|
|
||||||
conversationIDNotification := msgprocessor.GetNotificationConversationIDByMsg(ctxMsgList[0].message)
|
|
||||||
och.handleMsg(ctx, msgChannelValue.uniqueKey, conversationIDMsg, storageMsgList, notStorageMsgList)
|
|
||||||
och.handleNotification(
|
|
||||||
ctx,
|
|
||||||
msgChannelValue.uniqueKey,
|
|
||||||
conversationIDNotification,
|
|
||||||
storageNotificationList,
|
|
||||||
notStorageNotificationList,
|
|
||||||
)
|
|
||||||
if err := och.msgDatabase.MsgToModifyMQ(ctx, msgChannelValue.uniqueKey, conversationIDNotification, modifyMsgList); err != nil {
|
|
||||||
log.ZError(
|
|
||||||
ctx,
|
|
||||||
"msg to modify mq error",
|
|
||||||
err,
|
|
||||||
"uniqueKey",
|
|
||||||
msgChannelValue.uniqueKey,
|
|
||||||
"modifyMsgList",
|
|
||||||
modifyMsgList,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -22,7 +22,6 @@ import (
|
|||||||
|
|
||||||
type MsgUtilsCmd struct {
|
type MsgUtilsCmd struct {
|
||||||
cobra.Command
|
cobra.Command
|
||||||
msgTool *tools.MsgTool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MsgUtilsCmd) AddUserIDFlag() {
|
func (m *MsgUtilsCmd) AddUserIDFlag() {
|
||||||
@ -38,19 +37,19 @@ func (m *MsgUtilsCmd) AddFixAllFlag() {
|
|||||||
m.Command.PersistentFlags().BoolP("fixAll", "f", false, "openIM fix all seqs")
|
m.Command.PersistentFlags().BoolP("fixAll", "f", false, "openIM fix all seqs")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MsgUtilsCmd) getFixAllFlag(cmdLines *cobra.Command) bool {
|
/* func (m *MsgUtilsCmd) getFixAllFlag(cmdLines *cobra.Command) bool {
|
||||||
fixAll, _ := cmdLines.Flags().GetBool("fixAll")
|
fixAll, _ := cmdLines.Flags().GetBool("fixAll")
|
||||||
return fixAll
|
return fixAll
|
||||||
}
|
} */
|
||||||
|
|
||||||
func (m *MsgUtilsCmd) AddClearAllFlag() {
|
func (m *MsgUtilsCmd) AddClearAllFlag() {
|
||||||
m.Command.PersistentFlags().BoolP("clearAll", "c", false, "openIM clear all seqs")
|
m.Command.PersistentFlags().BoolP("clearAll", "c", false, "openIM clear all seqs")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MsgUtilsCmd) getClearAllFlag(cmdLines *cobra.Command) bool {
|
/* func (m *MsgUtilsCmd) getClearAllFlag(cmdLines *cobra.Command) bool {
|
||||||
clearAll, _ := cmdLines.Flags().GetBool("clearAll")
|
clearAll, _ := cmdLines.Flags().GetBool("clearAll")
|
||||||
return clearAll
|
return clearAll
|
||||||
}
|
} */
|
||||||
|
|
||||||
func (m *MsgUtilsCmd) AddSuperGroupIDFlag() {
|
func (m *MsgUtilsCmd) AddSuperGroupIDFlag() {
|
||||||
m.Command.PersistentFlags().StringP("superGroupID", "g", "", "openIM superGroupID")
|
m.Command.PersistentFlags().StringP("superGroupID", "g", "", "openIM superGroupID")
|
||||||
@ -65,19 +64,19 @@ func (m *MsgUtilsCmd) AddBeginSeqFlag() {
|
|||||||
m.Command.PersistentFlags().Int64P("beginSeq", "b", 0, "openIM beginSeq")
|
m.Command.PersistentFlags().Int64P("beginSeq", "b", 0, "openIM beginSeq")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MsgUtilsCmd) getBeginSeqFlag(cmdLines *cobra.Command) int64 {
|
/* func (m *MsgUtilsCmd) getBeginSeqFlag(cmdLines *cobra.Command) int64 {
|
||||||
beginSeq, _ := cmdLines.Flags().GetInt64("beginSeq")
|
beginSeq, _ := cmdLines.Flags().GetInt64("beginSeq")
|
||||||
return beginSeq
|
return beginSeq
|
||||||
}
|
} */
|
||||||
|
|
||||||
func (m *MsgUtilsCmd) AddLimitFlag() {
|
func (m *MsgUtilsCmd) AddLimitFlag() {
|
||||||
m.Command.PersistentFlags().Int64P("limit", "l", 0, "openIM limit")
|
m.Command.PersistentFlags().Int64P("limit", "l", 0, "openIM limit")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MsgUtilsCmd) getLimitFlag(cmdLines *cobra.Command) int64 {
|
/* func (m *MsgUtilsCmd) getLimitFlag(cmdLines *cobra.Command) int64 {
|
||||||
limit, _ := cmdLines.Flags().GetInt64("limit")
|
limit, _ := cmdLines.Flags().GetInt64("limit")
|
||||||
return limit
|
return limit
|
||||||
}
|
} */
|
||||||
|
|
||||||
func (m *MsgUtilsCmd) Execute() error {
|
func (m *MsgUtilsCmd) Execute() error {
|
||||||
return m.Command.Execute()
|
return m.Command.Execute()
|
||||||
@ -134,6 +133,7 @@ func NewSeqCmd() *SeqCmd {
|
|||||||
return seqCmd
|
return seqCmd
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
func (s *SeqCmd) GetSeqCmd() *cobra.Command {
|
func (s *SeqCmd) GetSeqCmd() *cobra.Command {
|
||||||
s.Command.Run = func(cmdLines *cobra.Command, args []string) {
|
s.Command.Run = func(cmdLines *cobra.Command, args []string) {
|
||||||
_, err := tools.InitMsgTool()
|
_, err := tools.InitMsgTool()
|
||||||
|
|||||||
@ -26,7 +26,10 @@ import (
|
|||||||
|
|
||||||
func FriendPb2DB(friend *sdkws.FriendInfo) *relation.FriendModel {
|
func FriendPb2DB(friend *sdkws.FriendInfo) *relation.FriendModel {
|
||||||
dbFriend := &relation.FriendModel{}
|
dbFriend := &relation.FriendModel{}
|
||||||
utils.CopyStructFields(dbFriend, friend)
|
err := utils.CopyStructFields(dbFriend, friend)
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
dbFriend.FriendUserID = friend.FriendUser.UserID
|
dbFriend.FriendUserID = friend.FriendUser.UserID
|
||||||
dbFriend.CreateTime = utils.UnixSecondToTime(friend.CreateTime)
|
dbFriend.CreateTime = utils.UnixSecondToTime(friend.CreateTime)
|
||||||
return dbFriend
|
return dbFriend
|
||||||
@ -69,7 +72,11 @@ func FriendsDB2Pb(
|
|||||||
}
|
}
|
||||||
for _, friend := range friendsDB {
|
for _, friend := range friendsDB {
|
||||||
friendPb := &sdkws.FriendInfo{FriendUser: &sdkws.UserInfo{}}
|
friendPb := &sdkws.FriendInfo{FriendUser: &sdkws.UserInfo{}}
|
||||||
utils.CopyStructFields(friendPb, friend)
|
err := utils.CopyStructFields(friendPb, friend)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
friendPb.FriendUser.UserID = users[friend.FriendUserID].UserID
|
friendPb.FriendUser.UserID = users[friend.FriendUserID].UserID
|
||||||
friendPb.FriendUser.Nickname = users[friend.FriendUserID].Nickname
|
friendPb.FriendUser.Nickname = users[friend.FriendUserID].Nickname
|
||||||
friendPb.FriendUser.FaceURL = users[friend.FriendUserID].FaceURL
|
friendPb.FriendUser.FaceURL = users[friend.FriendUserID].FaceURL
|
||||||
@ -79,6 +86,7 @@ func FriendsDB2Pb(
|
|||||||
friendsPb = append(friendsPb, friendPb)
|
friendsPb = append(friendsPb, friendPb)
|
||||||
}
|
}
|
||||||
return friendsPb, nil
|
return friendsPb, nil
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func FriendRequestDB2Pb(
|
func FriendRequestDB2Pb(
|
||||||
|
|||||||
49
pkg/common/db/cache/conversation.go
vendored
49
pkg/common/db/cache/conversation.go
vendored
@ -16,7 +16,6 @@ package cache
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
"math/big"
|
"math/big"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
@ -220,16 +219,16 @@ func (c *ConversationRedisCache) DelConversations(ownerUserID string, conversati
|
|||||||
return cache
|
return cache
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ConversationRedisCache) getConversationIndex(convsation *relationtb.ConversationModel, keys []string) (int, error) {
|
// func (c *ConversationRedisCache) getConversationIndex(convsation *relationtb.ConversationModel, keys []string) (int, error) {
|
||||||
key := c.getConversationKey(convsation.OwnerUserID, convsation.ConversationID)
|
// key := c.getConversationKey(convsation.OwnerUserID, convsation.ConversationID)
|
||||||
for _i, _key := range keys {
|
// for _i, _key := range keys {
|
||||||
if _key == key {
|
// if _key == key {
|
||||||
return _i, nil
|
// return _i, nil
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
|
|
||||||
return 0, errors.New("not found key:" + key + " in keys")
|
// return 0, errors.New("not found key:" + key + " in keys")
|
||||||
}
|
// }
|
||||||
|
|
||||||
func (c *ConversationRedisCache) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationtb.ConversationModel, error) {
|
func (c *ConversationRedisCache) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationtb.ConversationModel, error) {
|
||||||
//var keys []string
|
//var keys []string
|
||||||
@ -333,7 +332,7 @@ func (c *ConversationRedisCache) DelSuperGroupRecvMsgNotNotifyUserIDsHash(groupI
|
|||||||
return cache
|
return cache
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ConversationRedisCache) getUserAllHasReadSeqsIndex(conversationID string, conversationIDs []string) (int, error) {
|
/* func (c *ConversationRedisCache) getUserAllHasReadSeqsIndex(conversationID string, conversationIDs []string) (int, error) {
|
||||||
for _i, _conversationID := range conversationIDs {
|
for _i, _conversationID := range conversationIDs {
|
||||||
if _conversationID == conversationID {
|
if _conversationID == conversationID {
|
||||||
return _i, nil
|
return _i, nil
|
||||||
@ -341,21 +340,21 @@ func (c *ConversationRedisCache) getUserAllHasReadSeqsIndex(conversationID strin
|
|||||||
}
|
}
|
||||||
|
|
||||||
return 0, errors.New("not found key:" + conversationID + " in keys")
|
return 0, errors.New("not found key:" + conversationID + " in keys")
|
||||||
}
|
} */
|
||||||
|
|
||||||
//func (c *ConversationRedisCache) GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error) {
|
/* func (c *ConversationRedisCache) GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error) {
|
||||||
// conversationIDs, err := c.GetUserConversationIDs(ctx, ownerUserID)
|
conversationIDs, err := c.GetUserConversationIDs(ctx, ownerUserID)
|
||||||
// if err != nil {
|
if err != nil {
|
||||||
// return nil, err
|
return nil, err
|
||||||
// }
|
}
|
||||||
// var keys []string
|
var keys []string
|
||||||
// for _, conversarionID := range conversationIDs {
|
for _, conversarionID := range conversationIDs {
|
||||||
// keys = append(keys, c.getConversationHasReadSeqKey(ownerUserID, conversarionID))
|
keys = append(keys, c.getConversationHasReadSeqKey(ownerUserID, conversarionID))
|
||||||
// }
|
}
|
||||||
// return batchGetCacheMap(ctx, c.rcClient, keys, conversationIDs, c.expireTime, c.getUserAllHasReadSeqsIndex, func(ctx context.Context) (map[string]int64, error) {
|
return batchGetCacheMap(ctx, c.rcClient, keys, conversationIDs, c.expireTime, c.getUserAllHasReadSeqsIndex, func(ctx context.Context) (map[string]int64, error) {
|
||||||
// return c.conversationDB.GetUserAllHasReadSeqs(ctx, ownerUserID)
|
return c.conversationDB.GetUserAllHasReadSeqs(ctx, ownerUserID)
|
||||||
// })
|
})
|
||||||
//}
|
} */
|
||||||
|
|
||||||
func (c *ConversationRedisCache) DelUserAllHasReadSeqs(ownerUserID string, conversationIDs ...string) ConversationCache {
|
func (c *ConversationRedisCache) DelUserAllHasReadSeqs(ownerUserID string, conversationIDs ...string) ConversationCache {
|
||||||
cache := c.NewCache()
|
cache := c.NewCache()
|
||||||
|
|||||||
@ -75,6 +75,7 @@ func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformI
|
|||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
claims := tokenverify.BuildClaims(userID, platformID, a.accessExpire)
|
claims := tokenverify.BuildClaims(userID, platformID, a.accessExpire)
|
||||||
token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
|
token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
|
||||||
tokenString, err := token.SignedString([]byte(a.accessSecret))
|
tokenString, err := token.SignedString([]byte(a.accessSecret))
|
||||||
|
|||||||
@ -105,7 +105,7 @@ func (c *conversationDatabase) SetUsersConversationFiledTx(ctx context.Context,
|
|||||||
now := time.Now()
|
now := time.Now()
|
||||||
for _, v := range NotUserIDs {
|
for _, v := range NotUserIDs {
|
||||||
temp := new(relationtb.ConversationModel)
|
temp := new(relationtb.ConversationModel)
|
||||||
if err := utils.CopyStructFields(temp, conversation); err != nil {
|
if err = utils.CopyStructFields(temp, conversation); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
temp.OwnerUserID = v
|
temp.OwnerUserID = v
|
||||||
|
|||||||
@ -846,7 +846,7 @@ func (db *commonMsgDatabase) deleteMsgRecursion(ctx context.Context, conversatio
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(delMsgIndexs) > 0 {
|
if len(delMsgIndexs) > 0 {
|
||||||
if err := db.msgDocDatabase.DeleteMsgsInOneDocByIndex(ctx, msgDocModel.DocID, delMsgIndexs); err != nil {
|
if err = db.msgDocDatabase.DeleteMsgsInOneDocByIndex(ctx, msgDocModel.DocID, delMsgIndexs); err != nil {
|
||||||
log.ZError(ctx, "deleteMsgRecursion DeleteMsgsInOneDocByIndex failed", err, "conversationID", conversationID, "index", index)
|
log.ZError(ctx, "deleteMsgRecursion DeleteMsgsInOneDocByIndex failed", err, "conversationID", conversationID, "index", index)
|
||||||
}
|
}
|
||||||
delStruct.minSeq = int64(msgDocModel.Msg[delMsgIndexs[len(delMsgIndexs)-1]].Msg.Seq)
|
delStruct.minSeq = int64(msgDocModel.Msg[delMsgIndexs[len(delMsgIndexs)-1]].Msg.Seq)
|
||||||
|
|||||||
@ -106,7 +106,7 @@ func (c *Controller) InitiateUpload(ctx context.Context, hash string, size int64
|
|||||||
partNumber++
|
partNumber++
|
||||||
}
|
}
|
||||||
if maxParts > 0 && partNumber > 0 && partNumber < maxParts {
|
if maxParts > 0 && partNumber > 0 && partNumber < maxParts {
|
||||||
return nil, errors.New(fmt.Sprintf("too many parts: %d", partNumber))
|
return nil, fmt.Errorf("too many parts: %d", partNumber)
|
||||||
}
|
}
|
||||||
if info, err := c.StatObject(ctx, c.HashPath(hash)); err == nil {
|
if info, err := c.StatObject(ctx, c.HashPath(hash)); err == nil {
|
||||||
return nil, &HashAlreadyExistsError{Object: info}
|
return nil, &HashAlreadyExistsError{Object: info}
|
||||||
|
|||||||
@ -52,8 +52,8 @@ const (
|
|||||||
const successCode = http.StatusOK
|
const successCode = http.StatusOK
|
||||||
|
|
||||||
const (
|
const (
|
||||||
videoSnapshotImagePng = "png"
|
// videoSnapshotImagePng = "png"
|
||||||
videoSnapshotImageJpg = "jpg"
|
// videoSnapshotImageJpg = "jpg"
|
||||||
)
|
)
|
||||||
|
|
||||||
func NewCos() (s3.Interface, error) {
|
func NewCos() (s3.Interface, error) {
|
||||||
|
|||||||
@ -140,7 +140,7 @@ func (m *Minio) initMinio(ctx context.Context) error {
|
|||||||
return fmt.Errorf("check bucket exists error: %w", err)
|
return fmt.Errorf("check bucket exists error: %w", err)
|
||||||
}
|
}
|
||||||
if !exists {
|
if !exists {
|
||||||
if err := m.core.Client.MakeBucket(ctx, conf.Bucket, minio.MakeBucketOptions{}); err != nil {
|
if err = m.core.Client.MakeBucket(ctx, conf.Bucket, minio.MakeBucketOptions{}); err != nil {
|
||||||
return fmt.Errorf("make bucket error: %w", err)
|
return fmt.Errorf("make bucket error: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user