mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-11-05 11:52:10 +08:00
feat: optimize openim config code
This commit is contained in:
parent
3150e5377a
commit
f499cecc3c
@ -452,7 +452,7 @@ Next, let's demonstrate several ways to use the `WrapMsg` function:
|
|||||||
```go
|
```go
|
||||||
// "github.com/OpenIMSDK/tools/errs"
|
// "github.com/OpenIMSDK/tools/errs"
|
||||||
err := errors.New("original error")
|
err := errors.New("original error")
|
||||||
wrappedErr := WrapMsg(err, "")
|
wrappedErr := errs.WrapMsg(err, "")
|
||||||
// wrappedErr will contain the original error and its call stack
|
// wrappedErr will contain the original error and its call stack
|
||||||
```
|
```
|
||||||
|
|
||||||
@ -461,7 +461,7 @@ wrappedErr := WrapMsg(err, "")
|
|||||||
```go
|
```go
|
||||||
// "github.com/OpenIMSDK/tools/errs"
|
// "github.com/OpenIMSDK/tools/errs"
|
||||||
err := errors.New("original error")
|
err := errors.New("original error")
|
||||||
wrappedErr := WrapMsg(err, "additional error information")
|
wrappedErr := errs.WrapMsg(err, "additional error information")
|
||||||
// wrappedErr will contain the original error, call stack, and "additional error information"
|
// wrappedErr will contain the original error, call stack, and "additional error information"
|
||||||
```
|
```
|
||||||
|
|
||||||
@ -470,7 +470,7 @@ wrappedErr := WrapMsg(err, "additional error information")
|
|||||||
```go
|
```go
|
||||||
// "github.com/OpenIMSDK/tools/errs"
|
// "github.com/OpenIMSDK/tools/errs"
|
||||||
err := errors.New("original error")
|
err := errors.New("original error")
|
||||||
wrappedErr := WrapMsg(err, "problem occurred", "code", 404, "url", "http://example.com")
|
wrappedErr := errs.WrapMsg(err, "problem occurred", "code", 404, "url", "http://example.com")
|
||||||
// wrappedErr will contain the original error, call stack, and "problem occurred code=404, url=http://example.com"
|
// wrappedErr will contain the original error, call stack, and "problem occurred code=404, url=http://example.com"
|
||||||
```
|
```
|
||||||
|
|
||||||
@ -479,8 +479,29 @@ wrappedErr := WrapMsg(err, "problem occurred", "code", 404, "url", "http://examp
|
|||||||
```go
|
```go
|
||||||
// "github.com/OpenIMSDK/tools/errs"
|
// "github.com/OpenIMSDK/tools/errs"
|
||||||
err := errors.New("original error")
|
err := errors.New("original error")
|
||||||
wrappedErr := WrapMsg(err, "", "user", "john_doe", "action", "login")
|
wrappedErr := errs.WrapMsg(err, "", "user", "john_doe", "action", "login")
|
||||||
// wrappedErr will contain the original error, call stack, and "user=john_doe, action=login"
|
// wrappedErr will contain the original error, call stack, and "user=john_doe, action=login"
|
||||||
```
|
```
|
||||||
|
|
||||||
These examples demonstrate how the `WrapMsg` function can flexibly handle error messages and context data, helping developers to more effectively track and debug their programs.
|
> [!TIP] WThese examples demonstrate how the `errs.WrapMsg` function can flexibly handle error messages and context data, helping developers to more effectively track and debug their programs.
|
||||||
|
|
||||||
|
|
||||||
|
### Example 5: Dynamic Key-Value Pairs from Context
|
||||||
|
Suppose we have some runtime context variables, such as a user ID and the type of operation being performed, and we want to include these variables in the error message. This can help with later debugging and identifying the specific environment of the issue.
|
||||||
|
|
||||||
|
```go
|
||||||
|
// Define some context variables
|
||||||
|
userID := "user123"
|
||||||
|
operation := "update profile"
|
||||||
|
errorCode := 500
|
||||||
|
requestURL := "http://example.com/updateProfile"
|
||||||
|
|
||||||
|
// Create a new error
|
||||||
|
err := errors.New("original error")
|
||||||
|
|
||||||
|
// Wrap the error, including dynamic key-value pairs from the context
|
||||||
|
wrappedErr := errs.WrapMsg(err, "operation failed", "user", userID, "action", operation, "code", errorCode, "url", requestURL)
|
||||||
|
// wrappedErr will contain the original error, call stack, and "operation failed user=user123, action=update profile, code=500, url=http://example.com/updateProfile"
|
||||||
|
```
|
||||||
|
|
||||||
|
> [!TIP]In this example, the `WrapMsg` function accepts not just a static error message and additional information, but also dynamic key-value pairs generated from the code's execution context, such as the user ID, operation type, error code, and the URL of the request. Including this contextual information in the error message makes it easier for developers to understand and resolve the issue.
|
||||||
@ -62,12 +62,13 @@ func (u *UserMap) Set(key string, v *Client) {
|
|||||||
oldClients := allClients.([]*Client)
|
oldClients := allClients.([]*Client)
|
||||||
oldClients = append(oldClients, v)
|
oldClients = append(oldClients, v)
|
||||||
u.m.Store(key, oldClients)
|
u.m.Store(key, oldClients)
|
||||||
} else {
|
|
||||||
log.ZDebug(context.Background(), "Set not existed", "user_id", key, "client_user_id", v.UserID)
|
|
||||||
var clients []*Client
|
|
||||||
clients = append(clients, v)
|
|
||||||
u.m.Store(key, clients)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.ZDebug(context.Background(), "Set not existed", "user_id", key, "client_user_id", v.UserID)
|
||||||
|
|
||||||
|
var clients []*Client
|
||||||
|
clients = append(clients, v)
|
||||||
|
u.m.Store(key, clients)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (u *UserMap) delete(key string, connRemoteAddr string) (isDeleteUser bool) {
|
func (u *UserMap) delete(key string, connRemoteAddr string) (isDeleteUser bool) {
|
||||||
|
|||||||
@ -92,12 +92,7 @@ func Start(config *config.GlobalConfig, prometheusPort int) error {
|
|||||||
return msgTransfer.Start(prometheusPort, config)
|
return msgTransfer.Start(prometheusPort, config)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMsgTransfer(
|
func NewMsgTransfer(kafkaConf *config.Kafka,msgDatabase controller.CommonMsgDatabase,conversationRpcClient *rpcclient.ConversationRpcClient,groupRpcClient *rpcclient.GroupRpcClient) (*MsgTransfer, error) {
|
||||||
kafkaConf *config.Kafka,
|
|
||||||
msgDatabase controller.CommonMsgDatabase,
|
|
||||||
conversationRpcClient *rpcclient.ConversationRpcClient,
|
|
||||||
groupRpcClient *rpcclient.GroupRpcClient,
|
|
||||||
) (*MsgTransfer, error) {
|
|
||||||
historyCH, err := NewOnlineHistoryRedisConsumerHandler(kafkaConf, msgDatabase, conversationRpcClient, groupRpcClient)
|
historyCH, err := NewOnlineHistoryRedisConsumerHandler(kafkaConf, msgDatabase, conversationRpcClient, groupRpcClient)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -116,8 +111,9 @@ func NewMsgTransfer(
|
|||||||
func (m *MsgTransfer) Start(prometheusPort int, config *config.GlobalConfig) error {
|
func (m *MsgTransfer) Start(prometheusPort int, config *config.GlobalConfig) error {
|
||||||
fmt.Println("start msg transfer", "prometheusPort:", prometheusPort)
|
fmt.Println("start msg transfer", "prometheusPort:", prometheusPort)
|
||||||
if prometheusPort <= 0 {
|
if prometheusPort <= 0 {
|
||||||
return errs.Wrap(errors.New("prometheusPort not correct"))
|
return errs.WrapMsg(errors.New("invalid prometheus port"), "prometheusPort validation failed", "providedPort", prometheusPort)
|
||||||
}
|
}
|
||||||
|
|
||||||
m.ctx, m.cancel = context.WithCancel(context.Background())
|
m.ctx, m.cancel = context.WithCancel(context.Background())
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -138,7 +134,7 @@ func (m *MsgTransfer) Start(prometheusPort int, config *config.GlobalConfig) err
|
|||||||
http.Handle("/metrics", promhttp.HandlerFor(proreg, promhttp.HandlerOpts{Registry: proreg}))
|
http.Handle("/metrics", promhttp.HandlerFor(proreg, promhttp.HandlerOpts{Registry: proreg}))
|
||||||
err := http.ListenAndServe(fmt.Sprintf(":%d", prometheusPort), nil)
|
err := http.ListenAndServe(fmt.Sprintf(":%d", prometheusPort), nil)
|
||||||
if err != nil && err != http.ErrServerClosed {
|
if err != nil && err != http.ErrServerClosed {
|
||||||
netErr = errs.WrapMsg(err, fmt.Sprintf("prometheus start err: %d", prometheusPort))
|
netErr = errs.WrapMsg(err, "prometheus start error", "prometheusPort", prometheusPort)
|
||||||
netDone <- struct{}{}
|
netDone <- struct{}{}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|||||||
@ -81,12 +81,7 @@ type OnlineHistoryRedisConsumerHandler struct {
|
|||||||
groupRpcClient *rpcclient.GroupRpcClient
|
groupRpcClient *rpcclient.GroupRpcClient
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewOnlineHistoryRedisConsumerHandler(
|
func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database controller.CommonMsgDatabase, conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) (*OnlineHistoryRedisConsumerHandler, error) {
|
||||||
kafkaConf *config.Kafka,
|
|
||||||
database controller.CommonMsgDatabase,
|
|
||||||
conversationRpcClient *rpcclient.ConversationRpcClient,
|
|
||||||
groupRpcClient *rpcclient.GroupRpcClient,
|
|
||||||
) (*OnlineHistoryRedisConsumerHandler, error) {
|
|
||||||
var och OnlineHistoryRedisConsumerHandler
|
var och OnlineHistoryRedisConsumerHandler
|
||||||
och.msgDatabase = database
|
och.msgDatabase = database
|
||||||
och.msgDistributionCh = make(chan Cmd2Value) // no buffer channel
|
och.msgDistributionCh = make(chan Cmd2Value) // no buffer channel
|
||||||
@ -95,6 +90,7 @@ func NewOnlineHistoryRedisConsumerHandler(
|
|||||||
och.chArrays[i] = make(chan Cmd2Value, 50)
|
och.chArrays[i] = make(chan Cmd2Value, 50)
|
||||||
go och.Run(i)
|
go och.Run(i)
|
||||||
}
|
}
|
||||||
|
|
||||||
och.conversationRpcClient = conversationRpcClient
|
och.conversationRpcClient = conversationRpcClient
|
||||||
och.groupRpcClient = groupRpcClient
|
och.groupRpcClient = groupRpcClient
|
||||||
var err error
|
var err error
|
||||||
@ -267,22 +263,13 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(
|
func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, key, conversationID string, msgs []*sdkws.MsgData) {
|
||||||
ctx context.Context,
|
|
||||||
key, conversationID string,
|
|
||||||
msgs []*sdkws.MsgData,
|
|
||||||
) {
|
|
||||||
for _, v := range msgs {
|
for _, v := range msgs {
|
||||||
och.msgDatabase.MsgToPushMQ(ctx, key, conversationID, v) // nolint: errcheck
|
och.msgDatabase.MsgToPushMQ(ctx, key, conversationID, v) // nolint: errcheck
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (och *OnlineHistoryRedisConsumerHandler) handleMsg(
|
func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key, conversationID string, storageList, notStorageList []*sdkws.MsgData) {
|
||||||
ctx context.Context,
|
|
||||||
key, conversationID string,
|
|
||||||
storageList, notStorageList []*sdkws.MsgData,
|
|
||||||
) {
|
|
||||||
och.toPushTopic(ctx, key, conversationID, notStorageList)
|
och.toPushTopic(ctx, key, conversationID, notStorageList)
|
||||||
if len(storageList) > 0 {
|
if len(storageList) > 0 {
|
||||||
lastSeq, isNewConversation, err := och.msgDatabase.BatchInsertChat2Cache(ctx, conversationID, storageList)
|
lastSeq, isNewConversation, err := och.msgDatabase.BatchInsertChat2Cache(ctx, conversationID, storageList)
|
||||||
|
|||||||
@ -66,12 +66,7 @@ func NewOnlineHistoryMongoConsumerHandler(kafkaConf *config.Kafka, database cont
|
|||||||
return mc, nil
|
return mc, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(
|
func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Context, cMsg *sarama.ConsumerMessage, key string, session sarama.ConsumerGroupSession) {
|
||||||
ctx context.Context,
|
|
||||||
cMsg *sarama.ConsumerMessage,
|
|
||||||
key string,
|
|
||||||
session sarama.ConsumerGroupSession,
|
|
||||||
) {
|
|
||||||
msg := cMsg.Value
|
msg := cMsg.Value
|
||||||
msgFromMQ := pbmsg.MsgDataToMongoByMQ{}
|
msgFromMQ := pbmsg.MsgDataToMongoByMQ{}
|
||||||
err := proto.Unmarshal(msg, &msgFromMQ)
|
err := proto.Unmarshal(msg, &msgFromMQ)
|
||||||
|
|||||||
@ -27,13 +27,7 @@ import (
|
|||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/http"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/http"
|
||||||
)
|
)
|
||||||
|
|
||||||
func callbackOfflinePush(
|
func callbackOfflinePush(ctx context.Context, callback *config.Callback, userIDs []string, msg *sdkws.MsgData, offlinePushUserIDs *[]string) error {
|
||||||
ctx context.Context,
|
|
||||||
callback *config.Callback,
|
|
||||||
userIDs []string,
|
|
||||||
msg *sdkws.MsgData,
|
|
||||||
offlinePushUserIDs *[]string,
|
|
||||||
) error {
|
|
||||||
if !callback.CallbackOfflinePush.Enable || msg.ContentType == constant.Typing {
|
if !callback.CallbackOfflinePush.Enable || msg.ContentType == constant.Typing {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@ -101,6 +101,7 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
|
func (ConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
|
||||||
func (ConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
|
func (ConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
|
||||||
func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
|
func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
|
||||||
|
|||||||
@ -26,7 +26,6 @@ func GetContent(msg *sdkws.MsgData) string {
|
|||||||
_ = proto.Unmarshal(msg.Content, &tips)
|
_ = proto.Unmarshal(msg.Content, &tips)
|
||||||
content := tips.JsonDetail
|
content := tips.JsonDetail
|
||||||
return content
|
return content
|
||||||
} else {
|
|
||||||
return string(msg.Content)
|
|
||||||
}
|
}
|
||||||
|
return string(msg.Content)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -196,9 +196,7 @@ func (c *conversationServer) SetConversation(ctx context.Context, req *pbconvers
|
|||||||
}
|
}
|
||||||
|
|
||||||
// nolint
|
// nolint
|
||||||
func (c *conversationServer) SetConversations(ctx context.Context,
|
func (c *conversationServer) SetConversations(ctx context.Context, req *pbconversation.SetConversationsReq) (*pbconversation.SetConversationsResp, error) {
|
||||||
req *pbconversation.SetConversationsReq,
|
|
||||||
) (*pbconversation.SetConversationsResp, error) {
|
|
||||||
if req.Conversation == nil {
|
if req.Conversation == nil {
|
||||||
return nil, errs.ErrArgs.WrapMsg("conversation must not be nil")
|
return nil, errs.ErrArgs.WrapMsg("conversation must not be nil")
|
||||||
}
|
}
|
||||||
@ -279,6 +277,7 @@ func (c *conversationServer) SetConversations(ctx context.Context,
|
|||||||
conversation2.IsPrivateChat = req.Conversation.IsPrivateChat.Value
|
conversation2.IsPrivateChat = req.Conversation.IsPrivateChat.Value
|
||||||
conversations = append(conversations, &conversation2)
|
conversations = append(conversations, &conversation2)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := c.conversationDatabase.SyncPeerUserPrivateConversationTx(ctx, conversations); err != nil {
|
if err := c.conversationDatabase.SyncPeerUserPrivateConversationTx(ctx, conversations); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -293,20 +292,24 @@ func (c *conversationServer) SetConversations(ctx context.Context,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if req.Conversation.BurnDuration != nil {
|
if req.Conversation.BurnDuration != nil {
|
||||||
m["burn_duration"] = req.Conversation.BurnDuration.Value
|
m["burn_duration"] = req.Conversation.BurnDuration.Value
|
||||||
if req.Conversation.BurnDuration.Value != conv.BurnDuration {
|
if req.Conversation.BurnDuration.Value != conv.BurnDuration {
|
||||||
unequal++
|
unequal++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := c.conversationDatabase.SetUsersConversationFieldTx(ctx, req.UserIDs, &conversation, m); err != nil {
|
if err := c.conversationDatabase.SetUsersConversationFieldTx(ctx, req.UserIDs, &conversation, m); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if unequal > 0 {
|
if unequal > 0 {
|
||||||
for _, v := range req.UserIDs {
|
for _, v := range req.UserIDs {
|
||||||
c.conversationNotificationSender.ConversationChangeNotification(ctx, v, []string{req.Conversation.ConversationID})
|
c.conversationNotificationSender.ConversationChangeNotification(ctx, v, []string{req.Conversation.ConversationID})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return &pbconversation.SetConversationsResp{}, nil
|
return &pbconversation.SetConversationsResp{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -401,10 +404,7 @@ func (c *conversationServer) GetConversationsByConversationID(
|
|||||||
return &pbconversation.GetConversationsByConversationIDResp{Conversations: convert.ConversationsDB2Pb(conversations)}, nil
|
return &pbconversation.GetConversationsByConversationIDResp{Conversations: convert.ConversationsDB2Pb(conversations)}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *conversationServer) GetConversationOfflinePushUserIDs(
|
func (c *conversationServer) GetConversationOfflinePushUserIDs(ctx context.Context, req *pbconversation.GetConversationOfflinePushUserIDsReq) (*pbconversation.GetConversationOfflinePushUserIDsResp, error) {
|
||||||
ctx context.Context,
|
|
||||||
req *pbconversation.GetConversationOfflinePushUserIDsReq,
|
|
||||||
) (*pbconversation.GetConversationOfflinePushUserIDsResp, error) {
|
|
||||||
if req.ConversationID == "" {
|
if req.ConversationID == "" {
|
||||||
return nil, errs.ErrArgs.WrapMsg("conversationID is empty")
|
return nil, errs.ErrArgs.WrapMsg("conversationID is empty")
|
||||||
}
|
}
|
||||||
@ -428,12 +428,7 @@ func (c *conversationServer) GetConversationOfflinePushUserIDs(
|
|||||||
return &pbconversation.GetConversationOfflinePushUserIDsResp{UserIDs: utils.Keys(userIDSet)}, nil
|
return &pbconversation.GetConversationOfflinePushUserIDsResp{UserIDs: utils.Keys(userIDSet)}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *conversationServer) conversationSort(
|
func (c *conversationServer) conversationSort(conversations map[int64]string, resp *pbconversation.GetSortedConversationListResp, conversation_unreadCount map[string]int64, conversationMsg map[string]*pbconversation.ConversationElem) {
|
||||||
conversations map[int64]string,
|
|
||||||
resp *pbconversation.GetSortedConversationListResp,
|
|
||||||
conversation_unreadCount map[string]int64,
|
|
||||||
conversationMsg map[string]*pbconversation.ConversationElem,
|
|
||||||
) {
|
|
||||||
keys := []int64{}
|
keys := []int64{}
|
||||||
for key := range conversations {
|
for key := range conversations {
|
||||||
keys = append(keys, key)
|
keys = append(keys, key)
|
||||||
@ -527,10 +522,7 @@ func (c *conversationServer) getConversationInfo(
|
|||||||
return conversationMsg, nil
|
return conversationMsg, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *conversationServer) GetConversationNotReceiveMessageUserIDs(
|
func (c *conversationServer) GetConversationNotReceiveMessageUserIDs(ctx context.Context, req *pbconversation.GetConversationNotReceiveMessageUserIDsReq) (*pbconversation.GetConversationNotReceiveMessageUserIDsResp, error) {
|
||||||
ctx context.Context,
|
|
||||||
req *pbconversation.GetConversationNotReceiveMessageUserIDsReq,
|
|
||||||
) (*pbconversation.GetConversationNotReceiveMessageUserIDsResp, error) {
|
|
||||||
userIDs, err := c.conversationDatabase.GetConversationNotReceiveMessageUserIDs(ctx, req.ConversationID)
|
userIDs, err := c.conversationDatabase.GetConversationNotReceiveMessageUserIDs(ctx, req.ConversationID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|||||||
@ -58,10 +58,13 @@ func (s *friendServer) RemoveBlack(ctx context.Context, req *pbfriend.RemoveBlac
|
|||||||
if err := s.userRpcClient.Access(ctx, req.OwnerUserID); err != nil {
|
if err := s.userRpcClient.Access(ctx, req.OwnerUserID); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := s.blackDatabase.Delete(ctx, []*relation.BlackModel{{OwnerUserID: req.OwnerUserID, BlockUserID: req.BlackUserID}}); err != nil {
|
if err := s.blackDatabase.Delete(ctx, []*relation.BlackModel{{OwnerUserID: req.OwnerUserID, BlockUserID: req.BlackUserID}}); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
s.notificationSender.BlackDeletedNotification(ctx, req)
|
s.notificationSender.BlackDeletedNotification(ctx, req)
|
||||||
|
|
||||||
return &pbfriend.RemoveBlackResp{}, nil
|
return &pbfriend.RemoveBlackResp{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -77,6 +77,7 @@ func CallbackAfterSetFriendRemark(ctx context.Context, callback *config.Callback
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func CallbackBeforeAddBlack(ctx context.Context, callback *config.Callback, req *pbfriend.AddBlackReq) error {
|
func CallbackBeforeAddBlack(ctx context.Context, callback *config.Callback, req *pbfriend.AddBlackReq) error {
|
||||||
if !callback.CallbackBeforeAddBlack.Enable {
|
if !callback.CallbackBeforeAddBlack.Enable {
|
||||||
return nil
|
return nil
|
||||||
@ -92,6 +93,7 @@ func CallbackBeforeAddBlack(ctx context.Context, callback *config.Callback, req
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func CallbackAfterAddFriend(ctx context.Context, callback *config.Callback, req *pbfriend.ApplyToAddFriendReq) error {
|
func CallbackAfterAddFriend(ctx context.Context, callback *config.Callback, req *pbfriend.ApplyToAddFriendReq) error {
|
||||||
if !callback.CallbackAfterAddFriend.Enable {
|
if !callback.CallbackAfterAddFriend.Enable {
|
||||||
return nil
|
return nil
|
||||||
@ -109,6 +111,7 @@ func CallbackAfterAddFriend(ctx context.Context, callback *config.Callback, req
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func CallbackBeforeAddFriendAgree(ctx context.Context, callback *config.Callback, req *pbfriend.RespondFriendApplyReq) error {
|
func CallbackBeforeAddFriendAgree(ctx context.Context, callback *config.Callback, req *pbfriend.RespondFriendApplyReq) error {
|
||||||
if !callback.CallbackBeforeAddFriendAgree.Enable {
|
if !callback.CallbackBeforeAddFriendAgree.Enable {
|
||||||
return nil
|
return nil
|
||||||
@ -126,6 +129,7 @@ func CallbackBeforeAddFriendAgree(ctx context.Context, callback *config.Callback
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func CallbackAfterDeleteFriend(ctx context.Context, callback *config.Callback, req *pbfriend.DeleteFriendReq) error {
|
func CallbackAfterDeleteFriend(ctx context.Context, callback *config.Callback, req *pbfriend.DeleteFriendReq) error {
|
||||||
if !callback.CallbackAfterDeleteFriend.Enable {
|
if !callback.CallbackAfterDeleteFriend.Enable {
|
||||||
return nil
|
return nil
|
||||||
@ -141,6 +145,7 @@ func CallbackAfterDeleteFriend(ctx context.Context, callback *config.Callback, r
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func CallbackBeforeImportFriends(ctx context.Context, callback *config.Callback, req *pbfriend.ImportFriendReq) error {
|
func CallbackBeforeImportFriends(ctx context.Context, callback *config.Callback, req *pbfriend.ImportFriendReq) error {
|
||||||
if !callback.CallbackBeforeImportFriends.Enable {
|
if !callback.CallbackBeforeImportFriends.Enable {
|
||||||
return nil
|
return nil
|
||||||
@ -159,6 +164,7 @@ func CallbackBeforeImportFriends(ctx context.Context, callback *config.Callback,
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func CallbackAfterImportFriends(ctx context.Context, callback *config.Callback, req *pbfriend.ImportFriendReq) error {
|
func CallbackAfterImportFriends(ctx context.Context, callback *config.Callback, req *pbfriend.ImportFriendReq) error {
|
||||||
if !callback.CallbackAfterImportFriends.Enable {
|
if !callback.CallbackAfterImportFriends.Enable {
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@ -154,6 +154,7 @@ func (s *friendServer) ImportFriends(ctx context.Context, req *pbfriend.ImportFr
|
|||||||
if utils.Duplicate(req.FriendUserIDs) {
|
if utils.Duplicate(req.FriendUserIDs) {
|
||||||
return nil, errs.ErrArgs.WrapMsg("friend userID repeated")
|
return nil, errs.ErrArgs.WrapMsg("friend userID repeated")
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := CallbackBeforeImportFriends(ctx, &s.config.Callback, req); err != nil {
|
if err := CallbackBeforeImportFriends(ctx, &s.config.Callback, req); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|||||||
@ -61,6 +61,7 @@ func CheckAdmin(ctx context.Context, manager *config.Manager, imAdmin *config.IM
|
|||||||
}
|
}
|
||||||
return errs.ErrNoPermission.WrapMsg(fmt.Sprintf("user %s is not admin userID", mcontext.GetOpUserID(ctx)))
|
return errs.ErrNoPermission.WrapMsg(fmt.Sprintf("user %s is not admin userID", mcontext.GetOpUserID(ctx)))
|
||||||
}
|
}
|
||||||
|
|
||||||
func CheckIMAdmin(ctx context.Context, config *config.GlobalConfig) error {
|
func CheckIMAdmin(ctx context.Context, config *config.GlobalConfig) error {
|
||||||
if utils.Contain(mcontext.GetOpUserID(ctx), config.IMAdmin.UserID...) {
|
if utils.Contain(mcontext.GetOpUserID(ctx), config.IMAdmin.UserID...) {
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@ -110,7 +110,7 @@ func (c *ConversationRpcClient) GetConversationsByConversationID(ctx context.Con
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if len(resp.Conversations) == 0 {
|
if len(resp.Conversations) == 0 {
|
||||||
return nil, errs.ErrRecordNotFound.Wrap(fmt.Sprintf("conversationIDs: %v not found", conversationIDs))
|
return nil, errs.ErrRecordNotFound.WrapMsg(fmt.Sprintf("conversationIDs: %v not found", conversationIDs))
|
||||||
}
|
}
|
||||||
return resp.Conversations, nil
|
return resp.Conversations, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@ -63,7 +63,7 @@ func (g *GroupRpcClient) GetGroupInfos(
|
|||||||
if ids := utils.Single(groupIDs, utils.Slice(resp.GroupInfos, func(e *sdkws.GroupInfo) string {
|
if ids := utils.Single(groupIDs, utils.Slice(resp.GroupInfos, func(e *sdkws.GroupInfo) string {
|
||||||
return e.GroupID
|
return e.GroupID
|
||||||
})); len(ids) > 0 {
|
})); len(ids) > 0 {
|
||||||
return nil, errs.ErrGroupIDNotFound.Wrap(strings.Join(ids, ","))
|
return nil, errs.ErrGroupIDNotFound.WrapMsg(strings.Join(ids, ","))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return resp.GroupInfos, nil
|
return resp.GroupInfos, nil
|
||||||
@ -108,7 +108,7 @@ func (g *GroupRpcClient) GetGroupMemberInfos(
|
|||||||
if ids := utils.Single(userIDs, utils.Slice(resp.Members, func(e *sdkws.GroupMemberFullInfo) string {
|
if ids := utils.Single(userIDs, utils.Slice(resp.Members, func(e *sdkws.GroupMemberFullInfo) string {
|
||||||
return e.UserID
|
return e.UserID
|
||||||
})); len(ids) > 0 {
|
})); len(ids) > 0 {
|
||||||
return nil, errs.ErrNotInGroupYet.Wrap(strings.Join(ids, ","))
|
return nil, errs.ErrNotInGroupYet.WrapMsg(strings.Join(ids, ","))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return resp.Members, nil
|
return resp.Members, nil
|
||||||
|
|||||||
@ -262,8 +262,7 @@ func (s *NotificationSender) NotificationWithSesstionType(ctx context.Context, s
|
|||||||
n := sdkws.NotificationElem{Detail: utils.StructToJsonString(m)}
|
n := sdkws.NotificationElem{Detail: utils.StructToJsonString(m)}
|
||||||
content, err := json.Marshal(&n)
|
content, err := json.Marshal(&n)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errInfo := fmt.Sprintf("MsgClient Notification json.Marshal failed, sendID:%s, recvID:%s, contentType:%d, msg:%s", sendID, recvID, contentType, m)
|
return errs.WrapMsg(err, "json.Marshal failed", "sendID", sendID, "recvID", recvID, "contentType", contentType, "msg", utils.StructToJsonString(m))
|
||||||
return errs.Wrap(err, errInfo)
|
|
||||||
}
|
}
|
||||||
notificationOpt := ¬ificationOpt{}
|
notificationOpt := ¬ificationOpt{}
|
||||||
for _, opt := range opts {
|
for _, opt := range opts {
|
||||||
@ -275,15 +274,12 @@ func (s *NotificationSender) NotificationWithSesstionType(ctx context.Context, s
|
|||||||
if notificationOpt.WithRpcGetUsername && s.getUserInfo != nil {
|
if notificationOpt.WithRpcGetUsername && s.getUserInfo != nil {
|
||||||
userInfo, err = s.getUserInfo(ctx, sendID)
|
userInfo, err = s.getUserInfo(ctx, sendID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errInfo := fmt.Sprintf("getUserInfo failed, sendID:%s", sendID)
|
return errs.WrapMsg(err, "getUserInfo failed", "sendID", sendID)
|
||||||
return errs.Wrap(err, errInfo)
|
|
||||||
} else {
|
|
||||||
msg.SenderNickname = userInfo.Nickname
|
|
||||||
msg.SenderFaceURL = userInfo.FaceURL
|
|
||||||
}
|
}
|
||||||
|
msg.SenderNickname = userInfo.Nickname
|
||||||
|
msg.SenderFaceURL = userInfo.FaceURL
|
||||||
}
|
}
|
||||||
var offlineInfo sdkws.OfflinePushInfo
|
var offlineInfo sdkws.OfflinePushInfo
|
||||||
var title, desc, ex string
|
|
||||||
msg.SendID = sendID
|
msg.SendID = sendID
|
||||||
msg.RecvID = recvID
|
msg.RecvID = recvID
|
||||||
msg.Content = content
|
msg.Content = content
|
||||||
@ -302,15 +298,11 @@ func (s *NotificationSender) NotificationWithSesstionType(ctx context.Context, s
|
|||||||
options := config.GetOptionsByNotification(optionsConfig)
|
options := config.GetOptionsByNotification(optionsConfig)
|
||||||
s.SetOptionsByContentType(ctx, options, contentType)
|
s.SetOptionsByContentType(ctx, options, contentType)
|
||||||
msg.Options = options
|
msg.Options = options
|
||||||
offlineInfo.Title = title
|
|
||||||
offlineInfo.Desc = desc
|
|
||||||
offlineInfo.Ex = ex
|
|
||||||
msg.OfflinePushInfo = &offlineInfo
|
msg.OfflinePushInfo = &offlineInfo
|
||||||
req.MsgData = &msg
|
req.MsgData = &msg
|
||||||
_, err = s.sendMsg(ctx, &req)
|
_, err = s.sendMsg(ctx, &req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errInfo := fmt.Sprintf("MsgClient Notification SendMsg failed, req:%s", &req)
|
return errs.WrapMsg(err, "SendMsg failed", "req", fmt.Sprintf("%+v", req))
|
||||||
return errs.Wrap(err, errInfo)
|
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@ -198,12 +198,14 @@ func (f *FriendNotificationSender) FriendRemarkSetNotification(ctx context.Conte
|
|||||||
tips.FromToUserID.ToUserID = toUserID
|
tips.FromToUserID.ToUserID = toUserID
|
||||||
return f.Notification(ctx, fromUserID, toUserID, constant.FriendRemarkSetNotification, &tips)
|
return f.Notification(ctx, fromUserID, toUserID, constant.FriendRemarkSetNotification, &tips)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FriendNotificationSender) FriendsInfoUpdateNotification(ctx context.Context, toUserID string, friendIDs []string) error {
|
func (f *FriendNotificationSender) FriendsInfoUpdateNotification(ctx context.Context, toUserID string, friendIDs []string) error {
|
||||||
tips := sdkws.FriendsInfoUpdateTips{FromToUserID: &sdkws.FromToUserID{}}
|
tips := sdkws.FriendsInfoUpdateTips{FromToUserID: &sdkws.FromToUserID{}}
|
||||||
tips.FromToUserID.ToUserID = toUserID
|
tips.FromToUserID.ToUserID = toUserID
|
||||||
tips.FriendIDs = friendIDs
|
tips.FriendIDs = friendIDs
|
||||||
return f.Notification(ctx, toUserID, toUserID, constant.FriendsInfoUpdateNotification, &tips)
|
return f.Notification(ctx, toUserID, toUserID, constant.FriendsInfoUpdateNotification, &tips)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FriendNotificationSender) BlackAddedNotification(ctx context.Context, req *pbfriend.AddBlackReq) error {
|
func (f *FriendNotificationSender) BlackAddedNotification(ctx context.Context, req *pbfriend.AddBlackReq) error {
|
||||||
tips := sdkws.BlackAddedTips{FromToUserID: &sdkws.FromToUserID{}}
|
tips := sdkws.BlackAddedTips{FromToUserID: &sdkws.FromToUserID{}}
|
||||||
tips.FromToUserID.FromUserID = req.OwnerUserID
|
tips.FromToUserID.FromUserID = req.OwnerUserID
|
||||||
@ -221,11 +223,7 @@ func (f *FriendNotificationSender) BlackDeletedNotification(ctx context.Context,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FriendNotificationSender) FriendInfoUpdatedNotification(
|
func (f *FriendNotificationSender) FriendInfoUpdatedNotification(ctx context.Context, changedUserID string, needNotifiedUserID string) {
|
||||||
ctx context.Context,
|
|
||||||
changedUserID string,
|
|
||||||
needNotifiedUserID string,
|
|
||||||
) {
|
|
||||||
tips := sdkws.UserInfoUpdatedTips{UserID: changedUserID}
|
tips := sdkws.UserInfoUpdatedTips{UserID: changedUserID}
|
||||||
if err := f.Notification(ctx, mcontext.GetOpUserID(ctx), needNotifiedUserID,
|
if err := f.Notification(ctx, mcontext.GetOpUserID(ctx), needNotifiedUserID,
|
||||||
constant.FriendInfoUpdatedNotification, &tips); err != nil {
|
constant.FriendInfoUpdatedNotification, &tips); err != nil {
|
||||||
|
|||||||
@ -33,15 +33,9 @@ import (
|
|||||||
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
|
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
|
||||||
)
|
)
|
||||||
|
|
||||||
func NewGroupNotificationSender(
|
func NewGroupNotificationSender(db controller.GroupDatabase, msgRpcClient *rpcclient.MessageRpcClient, userRpcClient *rpcclient.UserRpcClient, config *config.GlobalConfig, fn func(ctx context.Context, userIDs []string) ([]CommonUser, error)) *GroupNotificationSender {
|
||||||
db controller.GroupDatabase,
|
|
||||||
msgRpcClient *rpcclient.MessageRpcClient,
|
|
||||||
userRpcClient *rpcclient.UserRpcClient,
|
|
||||||
config *config.GlobalConfig,
|
|
||||||
fn func(ctx context.Context, userIDs []string) ([]CommonUser, error),
|
|
||||||
) *GroupNotificationSender {
|
|
||||||
return &GroupNotificationSender{
|
return &GroupNotificationSender{
|
||||||
NotificationSender: rpcclient.NewNotificationSender(config, rpcclient.WithRpcClient(msgRpcClient), rpcclient.WithUserRpcClient(userRpcClient)),
|
NotificationSender: rpcclient.NewNotificationSender(&config.Notification, rpcclient.WithRpcClient(msgRpcClient), rpcclient.WithUserRpcClient(userRpcClient)),
|
||||||
getUsersInfo: fn,
|
getUsersInfo: fn,
|
||||||
db: db,
|
db: db,
|
||||||
config: config,
|
config: config,
|
||||||
@ -96,7 +90,7 @@ func (g *GroupNotificationSender) getUser(ctx context.Context, userID string) (*
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if len(users) == 0 {
|
if len(users) == 0 {
|
||||||
return nil, errs.ErrUserIDNotFound.Wrap(fmt.Sprintf("user %s not found", userID))
|
return nil, errs.ErrUserIDNotFound.WrapMsg(fmt.Sprintf("user %s not found", userID))
|
||||||
}
|
}
|
||||||
return &sdkws.PublicUserInfo{
|
return &sdkws.PublicUserInfo{
|
||||||
UserID: users[0].GetUserID(),
|
UserID: users[0].GetUserID(),
|
||||||
@ -178,7 +172,7 @@ func (g *GroupNotificationSender) getGroupMember(ctx context.Context, groupID st
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if len(members) == 0 {
|
if len(members) == 0 {
|
||||||
return nil, errs.ErrInternalServer.Wrap(fmt.Sprintf("group %s member %s not found", groupID, userID))
|
return nil, errs.ErrInternalServer.WrapMsg(fmt.Sprintf("group %s member %s not found", groupID, userID))
|
||||||
}
|
}
|
||||||
return members[0], nil
|
return members[0], nil
|
||||||
}
|
}
|
||||||
@ -253,7 +247,7 @@ func (g *GroupNotificationSender) fillOpUser(ctx context.Context, opUser **sdkws
|
|||||||
}
|
}
|
||||||
userID := mcontext.GetOpUserID(ctx)
|
userID := mcontext.GetOpUserID(ctx)
|
||||||
if groupID != "" {
|
if groupID != "" {
|
||||||
if authverify.IsManagerUserID(userID, g.config) {
|
if authverify.IsManagerUserID(userID, &g.config.Manager, &g.config.IMAdmin) {
|
||||||
*opUser = &sdkws.GroupMemberFullInfo{
|
*opUser = &sdkws.GroupMemberFullInfo{
|
||||||
GroupID: groupID,
|
GroupID: groupID,
|
||||||
UserID: userID,
|
UserID: userID,
|
||||||
|
|||||||
@ -29,7 +29,7 @@ type MsgNotificationSender struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewMsgNotificationSender(config *config.GlobalConfig, opts ...rpcclient.NotificationSenderOptions) *MsgNotificationSender {
|
func NewMsgNotificationSender(config *config.GlobalConfig, opts ...rpcclient.NotificationSenderOptions) *MsgNotificationSender {
|
||||||
return &MsgNotificationSender{rpcclient.NewNotificationSender(config, opts...)}
|
return &MsgNotificationSender{rpcclient.NewNotificationSender(&config.Notification, opts...)}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MsgNotificationSender) UserDeleteMsgsNotification(ctx context.Context, userID, conversationID string, seqs []int64) error {
|
func (m *MsgNotificationSender) UserDeleteMsgsNotification(ctx context.Context, userID, conversationID string, seqs []int64) error {
|
||||||
|
|||||||
@ -85,7 +85,7 @@ func (u *UserRpcClient) GetUsersInfo(ctx context.Context, userIDs []string) ([]*
|
|||||||
if ids := utils.Single(userIDs, utils.Slice(resp.UsersInfo, func(e *sdkws.UserInfo) string {
|
if ids := utils.Single(userIDs, utils.Slice(resp.UsersInfo, func(e *sdkws.UserInfo) string {
|
||||||
return e.UserID
|
return e.UserID
|
||||||
})); len(ids) > 0 {
|
})); len(ids) > 0 {
|
||||||
return nil, errs.ErrUserIDNotFound.Wrap(strings.Join(ids, ","))
|
return nil, errs.ErrUserIDNotFound.WrapMsg(strings.Join(ids, ","))
|
||||||
}
|
}
|
||||||
return resp.UsersInfo, nil
|
return resp.UsersInfo, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@ -23,21 +23,24 @@ import (
|
|||||||
"github.com/OpenIMSDK/tools/errs"
|
"github.com/OpenIMSDK/tools/errs"
|
||||||
)
|
)
|
||||||
|
|
||||||
// OutDir creates the absolute path name from path and checks path exists.
|
// OutDir creates the absolute path name from path and checks if the path exists and is a directory.
|
||||||
// Returns absolute path including trailing '/' or error if path does not exist.
|
// Returns absolute path including trailing '/' or error if the path does not exist or is not a directory.
|
||||||
func OutDir(path string) (string, error) {
|
func OutDir(path string) (string, error) {
|
||||||
outDir, err := filepath.Abs(path)
|
outDir, err := filepath.Abs(path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", errs.WrapMsg(err, "output directory %s does not exist", path)
|
return "", errs.WrapMsg(err, "failed to resolve absolute path", "path", path)
|
||||||
}
|
}
|
||||||
|
|
||||||
stat, err := os.Stat(outDir)
|
stat, err := os.Stat(outDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", errs.WrapMsg(err, "output directory %s does not exist", outDir)
|
if os.IsNotExist(err) {
|
||||||
|
return "", errs.WrapMsg(err, "output directory does not exist", "path", outDir)
|
||||||
|
}
|
||||||
|
return "", errs.WrapMsg(err, "failed to stat output directory", "path", outDir)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !stat.IsDir() {
|
if !stat.IsDir() {
|
||||||
return "", errs.WrapMsg(err, "output directory %s is not a directory", outDir)
|
return "", errs.Wrap(fmt.Errorf("specified path %s is not a directory", outDir)) // Correctly constructs a new error as 'err' would be nil here
|
||||||
}
|
}
|
||||||
outDir += "/"
|
outDir += "/"
|
||||||
return outDir, nil
|
return outDir, nil
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user