mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-05 20:11:14 +08:00
fix: wrap the error of group user and thrid (#2005)
* fix: wrap the error of group user and thrid * fix: del the chinese comment * fix: fix the make_lint error * fix: fix the ApiTest error
This commit is contained in:
parent
c7dad1a5c1
commit
52b8efba73
@ -120,12 +120,12 @@ func (s *groupServer) NotificationUserInfoUpdate(ctx context.Context, req *pbgro
|
|||||||
groupIDs = append(groupIDs, member.GroupID)
|
groupIDs = append(groupIDs, member.GroupID)
|
||||||
}
|
}
|
||||||
for _, groupID := range groupIDs {
|
for _, groupID := range groupIDs {
|
||||||
if err := s.Notification.GroupMemberInfoSetNotification(ctx, groupID, req.UserID); err != nil {
|
if err = s.Notification.GroupMemberInfoSetNotification(ctx, groupID, req.UserID); err != nil {
|
||||||
log.ZError(ctx, "NotificationUserInfoUpdate setGroupMemberInfo notification failed", err, "groupID", groupID)
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err := s.db.DeleteGroupMemberHash(ctx, groupIDs); err != nil {
|
if err = s.db.DeleteGroupMemberHash(ctx, groupIDs); err != nil {
|
||||||
log.ZError(ctx, "NotificationUserInfoUpdate DeleteGroupMemberHash", err, "groupID", groupIDs)
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return &pbgroup.NotificationUserInfoUpdateResp{}, nil
|
return &pbgroup.NotificationUserInfoUpdateResp{}, nil
|
||||||
|
@ -209,7 +209,7 @@ func (t *thirdServer) InitiateFormData(ctx context.Context, req *third.InitiateF
|
|||||||
}
|
}
|
||||||
uid, err := uuid.NewRandom()
|
uid, err := uuid.NewRandom()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, errs.Wrap(err, "uuid NewRandom failed")
|
||||||
}
|
}
|
||||||
if key == "" {
|
if key == "" {
|
||||||
date := time.Now().Format("20060102")
|
date := time.Now().Format("20060102")
|
||||||
@ -224,7 +224,7 @@ func (t *thirdServer) InitiateFormData(ctx context.Context, req *third.InitiateF
|
|||||||
}
|
}
|
||||||
mateData, err := json.Marshal(&mate)
|
mateData, err := json.Marshal(&mate)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, errs.Wrap(err, "marshal failed")
|
||||||
}
|
}
|
||||||
resp, err := t.s3dataBase.FormData(ctx, key, req.Size, req.ContentType, duration)
|
resp, err := t.s3dataBase.FormData(ctx, key, req.Size, req.ContentType, duration)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -17,6 +17,7 @@ package third
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/OpenIMSDK/tools/errs"
|
||||||
"net/url"
|
"net/url"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -50,9 +51,9 @@ func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryReg
|
|||||||
}
|
}
|
||||||
apiURL := config.Object.ApiURL
|
apiURL := config.Object.ApiURL
|
||||||
if apiURL == "" {
|
if apiURL == "" {
|
||||||
return fmt.Errorf("api url is empty")
|
return errs.Wrap(fmt.Errorf("api is empty"))
|
||||||
}
|
}
|
||||||
if _, parseErr := url.Parse(config.Object.ApiURL); parseErr != nil {
|
if _, err := url.Parse(config.Object.ApiURL); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if apiURL[len(apiURL)-1] != '/' {
|
if apiURL[len(apiURL)-1] != '/' {
|
||||||
@ -63,7 +64,7 @@ func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryReg
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// 根据配置文件策略选择 oss 方式
|
// Select the oss method according to the profile policy
|
||||||
enable := config.Object.Enable
|
enable := config.Object.Enable
|
||||||
var o s3.Interface
|
var o s3.Interface
|
||||||
switch enable {
|
switch enable {
|
||||||
|
@ -16,7 +16,7 @@ package user
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
@ -70,7 +70,7 @@ func Start(config *config.GlobalConfig, client registry.SvcDiscoveryRegistry, se
|
|||||||
}
|
}
|
||||||
users := make([]*tablerelation.UserModel, 0)
|
users := make([]*tablerelation.UserModel, 0)
|
||||||
if len(config.IMAdmin.UserID) != len(config.IMAdmin.Nickname) {
|
if len(config.IMAdmin.UserID) != len(config.IMAdmin.Nickname) {
|
||||||
return errors.New("len(s.config.AppNotificationAdmin.AppManagerUid) != len(s.config.AppNotificationAdmin.Nickname)")
|
return errs.Wrap(fmt.Errorf("the count of ImAdmin.UserID is not equal to the count of ImAdmin.Nickname"))
|
||||||
}
|
}
|
||||||
for k, v := range config.IMAdmin.UserID {
|
for k, v := range config.IMAdmin.UserID {
|
||||||
users = append(users, &tablerelation.UserModel{UserID: v, Nickname: config.IMAdmin.Nickname[k], AppMangerLevel: constant.AppNotificationAdmin})
|
users = append(users, &tablerelation.UserModel{UserID: v, Nickname: config.IMAdmin.Nickname[k], AppMangerLevel: constant.AppNotificationAdmin})
|
||||||
@ -105,9 +105,6 @@ func (s *userServer) GetDesignateUsers(ctx context.Context, req *pbuser.GetDesig
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
resp.UsersInfo = convert.UsersDB2Pb(users)
|
resp.UsersInfo = convert.UsersDB2Pb(users)
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -131,7 +128,7 @@ func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserI
|
|||||||
}
|
}
|
||||||
if req.UserInfo.Nickname != "" || req.UserInfo.FaceURL != "" {
|
if req.UserInfo.Nickname != "" || req.UserInfo.FaceURL != "" {
|
||||||
if err = s.groupRpcClient.NotificationUserInfoUpdate(ctx, req.UserInfo.UserID); err != nil {
|
if err = s.groupRpcClient.NotificationUserInfoUpdate(ctx, req.UserInfo.UserID); err != nil {
|
||||||
log.ZError(ctx, "NotificationUserInfoUpdate", err)
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for _, friendID := range friends {
|
for _, friendID := range friends {
|
||||||
@ -141,7 +138,7 @@ func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserI
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err = s.groupRpcClient.NotificationUserInfoUpdate(ctx, req.UserInfo.UserID); err != nil {
|
if err = s.groupRpcClient.NotificationUserInfoUpdate(ctx, req.UserInfo.UserID); err != nil {
|
||||||
log.ZError(ctx, "NotificationUserInfoUpdate", err, "userID", req.UserInfo.UserID)
|
return nil, err
|
||||||
}
|
}
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
@ -166,7 +163,7 @@ func (s *userServer) UpdateUserInfoEx(ctx context.Context, req *pbuser.UpdateUse
|
|||||||
}
|
}
|
||||||
if req.UserInfo.Nickname != nil || req.UserInfo.FaceURL != nil {
|
if req.UserInfo.Nickname != nil || req.UserInfo.FaceURL != nil {
|
||||||
if err := s.groupRpcClient.NotificationUserInfoUpdate(ctx, req.UserInfo.UserID); err != nil {
|
if err := s.groupRpcClient.NotificationUserInfoUpdate(ctx, req.UserInfo.UserID); err != nil {
|
||||||
log.ZError(ctx, "NotificationUserInfoUpdate", err)
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for _, friendID := range friends {
|
for _, friendID := range friends {
|
||||||
@ -176,7 +173,7 @@ func (s *userServer) UpdateUserInfoEx(ctx context.Context, req *pbuser.UpdateUse
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err := s.groupRpcClient.NotificationUserInfoUpdate(ctx, req.UserInfo.UserID); err != nil {
|
if err := s.groupRpcClient.NotificationUserInfoUpdate(ctx, req.UserInfo.UserID); err != nil {
|
||||||
log.ZError(ctx, "NotificationUserInfoUpdate", err, "userID", req.UserInfo.UserID)
|
return nil, err
|
||||||
}
|
}
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
4
pkg/common/db/cache/meta_cache.go
vendored
4
pkg/common/db/cache/meta_cache.go
vendored
@ -129,7 +129,7 @@ func getCache[T any](ctx context.Context, rcClient *rockscache.Client, key strin
|
|||||||
v, err := rcClient.Fetch2(ctx, key, expire, func() (s string, err error) {
|
v, err := rcClient.Fetch2(ctx, key, expire, func() (s string, err error) {
|
||||||
t, err = fn(ctx)
|
t, err = fn(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", errs.Wrap(err)
|
return "", err
|
||||||
}
|
}
|
||||||
bs, err := json.Marshal(t)
|
bs, err := json.Marshal(t)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -204,7 +204,7 @@ func batchGetCache2[T any, K comparable](
|
|||||||
fns func(ctx context.Context, key K) (T, error),
|
fns func(ctx context.Context, key K) (T, error),
|
||||||
) ([]T, error) {
|
) ([]T, error) {
|
||||||
if len(keys) == 0 {
|
if len(keys) == 0 {
|
||||||
return nil, errs.ErrArgs.Wrap("groupID is empty")
|
return nil, nil
|
||||||
}
|
}
|
||||||
res := make([]T, 0, len(keys))
|
res := make([]T, 0, len(keys))
|
||||||
for _, key := range keys {
|
for _, key := range keys {
|
||||||
|
2
pkg/common/db/cache/msg.go
vendored
2
pkg/common/db/cache/msg.go
vendored
@ -471,7 +471,7 @@ func (c *msgCache) ParallelSetMessageToCache(ctx context.Context, conversationID
|
|||||||
|
|
||||||
err := wg.Wait()
|
err := wg.Wait()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, errs.Wrap(err, "wg.Wait failed")
|
||||||
}
|
}
|
||||||
|
|
||||||
return len(msgs), nil
|
return len(msgs), nil
|
||||||
|
@ -16,6 +16,7 @@ package mgo
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"github.com/OpenIMSDK/tools/errs"
|
||||||
|
|
||||||
"github.com/OpenIMSDK/tools/mgoutil"
|
"github.com/OpenIMSDK/tools/mgoutil"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
|
||||||
@ -33,7 +34,7 @@ func NewS3Mongo(db *mongo.Database) (relation.ObjectInfoModelInterface, error) {
|
|||||||
Options: options.Index().SetUnique(true),
|
Options: options.Index().SetUnique(true),
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, errs.Wrap(err)
|
||||||
}
|
}
|
||||||
return &S3Mongo{coll: coll}, nil
|
return &S3Mongo{coll: coll}, nil
|
||||||
}
|
}
|
||||||
|
@ -157,7 +157,7 @@ func (u *UserMgo) AddUserCommand(ctx context.Context, userID string, Type int32,
|
|||||||
}
|
}
|
||||||
|
|
||||||
_, err := collection.InsertOne(ctx, doc)
|
_, err := collection.InsertOne(ctx, doc)
|
||||||
return err
|
return errs.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (u *UserMgo) DeleteUserCommand(ctx context.Context, userID string, Type int32, UUID string) error {
|
func (u *UserMgo) DeleteUserCommand(ctx context.Context, userID string, Type int32, UUID string) error {
|
||||||
@ -170,7 +170,7 @@ func (u *UserMgo) DeleteUserCommand(ctx context.Context, userID string, Type int
|
|||||||
// No records found to update
|
// No records found to update
|
||||||
return errs.Wrap(errs.ErrRecordNotFound)
|
return errs.Wrap(errs.ErrRecordNotFound)
|
||||||
}
|
}
|
||||||
return err
|
return errs.Wrap(err)
|
||||||
}
|
}
|
||||||
func (u *UserMgo) UpdateUserCommand(ctx context.Context, userID string, Type int32, UUID string, val map[string]any) error {
|
func (u *UserMgo) UpdateUserCommand(ctx context.Context, userID string, Type int32, UUID string, val map[string]any) error {
|
||||||
if len(val) == 0 {
|
if len(val) == 0 {
|
||||||
@ -184,7 +184,7 @@ func (u *UserMgo) UpdateUserCommand(ctx context.Context, userID string, Type int
|
|||||||
|
|
||||||
result, err := collection.UpdateOne(ctx, filter, update)
|
result, err := collection.UpdateOne(ctx, filter, update)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return errs.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if result.MatchedCount == 0 {
|
if result.MatchedCount == 0 {
|
||||||
@ -233,7 +233,7 @@ func (u *UserMgo) GetUserCommand(ctx context.Context, userID string, Type int32)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err := cursor.Err(); err != nil {
|
if err := cursor.Err(); err != nil {
|
||||||
return nil, err
|
return nil, errs.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return commands, nil
|
return commands, nil
|
||||||
@ -244,7 +244,7 @@ func (u *UserMgo) GetAllUserCommand(ctx context.Context, userID string) ([]*user
|
|||||||
|
|
||||||
cursor, err := collection.Find(ctx, filter)
|
cursor, err := collection.Find(ctx, filter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, errs.Wrap(err)
|
||||||
}
|
}
|
||||||
defer cursor.Close(ctx)
|
defer cursor.Close(ctx)
|
||||||
|
|
||||||
@ -261,7 +261,7 @@ func (u *UserMgo) GetAllUserCommand(ctx context.Context, userID string) ([]*user
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err := cursor.Decode(&document); err != nil {
|
if err := cursor.Decode(&document); err != nil {
|
||||||
return nil, err
|
return nil, errs.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
commandInfo := &user.AllCommandInfoResp{
|
commandInfo := &user.AllCommandInfoResp{
|
||||||
@ -276,7 +276,7 @@ func (u *UserMgo) GetAllUserCommand(ctx context.Context, userID string) ([]*user
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err := cursor.Err(); err != nil {
|
if err := cursor.Err(); err != nil {
|
||||||
return nil, err
|
return nil, errs.Wrap(err)
|
||||||
}
|
}
|
||||||
return commands, nil
|
return commands, nil
|
||||||
}
|
}
|
||||||
|
@ -47,27 +47,27 @@ func resizeImage(img image.Image, maxWidth, maxHeight int) image.Image {
|
|||||||
imgWidth := bounds.Max.X
|
imgWidth := bounds.Max.X
|
||||||
imgHeight := bounds.Max.Y
|
imgHeight := bounds.Max.Y
|
||||||
|
|
||||||
// 计算缩放比例
|
// Calculating scaling
|
||||||
scaleWidth := float64(maxWidth) / float64(imgWidth)
|
scaleWidth := float64(maxWidth) / float64(imgWidth)
|
||||||
scaleHeight := float64(maxHeight) / float64(imgHeight)
|
scaleHeight := float64(maxHeight) / float64(imgHeight)
|
||||||
|
|
||||||
// 如果都为0,则不缩放,返回原始图片
|
// If both are 0, then no scaling is done and the original image is returned
|
||||||
if maxWidth == 0 && maxHeight == 0 {
|
if maxWidth == 0 && maxHeight == 0 {
|
||||||
return img
|
return img
|
||||||
}
|
}
|
||||||
|
|
||||||
// 如果宽度和高度都大于0,则选择较小的缩放比例,以保持宽高比
|
// If both width and height are greater than 0, select a smaller zoom ratio to maintain the aspect ratio
|
||||||
if maxWidth > 0 && maxHeight > 0 {
|
if maxWidth > 0 && maxHeight > 0 {
|
||||||
scale := scaleWidth
|
scale := scaleWidth
|
||||||
if scaleHeight < scaleWidth {
|
if scaleHeight < scaleWidth {
|
||||||
scale = scaleHeight
|
scale = scaleHeight
|
||||||
}
|
}
|
||||||
|
|
||||||
// 计算缩略图尺寸
|
// Calculate Thumbnail Size
|
||||||
thumbnailWidth := int(float64(imgWidth) * scale)
|
thumbnailWidth := int(float64(imgWidth) * scale)
|
||||||
thumbnailHeight := int(float64(imgHeight) * scale)
|
thumbnailHeight := int(float64(imgHeight) * scale)
|
||||||
|
|
||||||
// 使用"image"库的Resample方法生成缩略图
|
// Thumbnails are generated using the Resample method of the "image" library.
|
||||||
thumbnail := image.NewRGBA(image.Rect(0, 0, thumbnailWidth, thumbnailHeight))
|
thumbnail := image.NewRGBA(image.Rect(0, 0, thumbnailWidth, thumbnailHeight))
|
||||||
for y := 0; y < thumbnailHeight; y++ {
|
for y := 0; y < thumbnailHeight; y++ {
|
||||||
for x := 0; x < thumbnailWidth; x++ {
|
for x := 0; x < thumbnailWidth; x++ {
|
||||||
@ -80,12 +80,12 @@ func resizeImage(img image.Image, maxWidth, maxHeight int) image.Image {
|
|||||||
return thumbnail
|
return thumbnail
|
||||||
}
|
}
|
||||||
|
|
||||||
// 如果只指定了宽度或高度,则根据最大不超过的规则生成缩略图
|
// If only width or height is specified, thumbnails are generated based on the maximum not to exceed rule
|
||||||
if maxWidth > 0 {
|
if maxWidth > 0 {
|
||||||
thumbnailWidth := maxWidth
|
thumbnailWidth := maxWidth
|
||||||
thumbnailHeight := int(float64(imgHeight) * scaleWidth)
|
thumbnailHeight := int(float64(imgHeight) * scaleWidth)
|
||||||
|
|
||||||
// 使用"image"库的Resample方法生成缩略图
|
// Thumbnails are generated using the Resample method of the "image" library.
|
||||||
thumbnail := image.NewRGBA(image.Rect(0, 0, thumbnailWidth, thumbnailHeight))
|
thumbnail := image.NewRGBA(image.Rect(0, 0, thumbnailWidth, thumbnailHeight))
|
||||||
for y := 0; y < thumbnailHeight; y++ {
|
for y := 0; y < thumbnailHeight; y++ {
|
||||||
for x := 0; x < thumbnailWidth; x++ {
|
for x := 0; x < thumbnailWidth; x++ {
|
||||||
@ -102,7 +102,7 @@ func resizeImage(img image.Image, maxWidth, maxHeight int) image.Image {
|
|||||||
thumbnailWidth := int(float64(imgWidth) * scaleHeight)
|
thumbnailWidth := int(float64(imgWidth) * scaleHeight)
|
||||||
thumbnailHeight := maxHeight
|
thumbnailHeight := maxHeight
|
||||||
|
|
||||||
// 使用"image"库的Resample方法生成缩略图
|
// Thumbnails are generated using the Resample method of the "image" library.
|
||||||
thumbnail := image.NewRGBA(image.Rect(0, 0, thumbnailWidth, thumbnailHeight))
|
thumbnail := image.NewRGBA(image.Rect(0, 0, thumbnailWidth, thumbnailHeight))
|
||||||
for y := 0; y < thumbnailHeight; y++ {
|
for y := 0; y < thumbnailHeight; y++ {
|
||||||
for x := 0; x < thumbnailWidth; x++ {
|
for x := 0; x < thumbnailWidth; x++ {
|
||||||
@ -115,6 +115,6 @@ func resizeImage(img image.Image, maxWidth, maxHeight int) image.Image {
|
|||||||
return thumbnail
|
return thumbnail
|
||||||
}
|
}
|
||||||
|
|
||||||
// 默认情况下,返回原始图片
|
// By default, the original image is returned
|
||||||
return img
|
return img
|
||||||
}
|
}
|
||||||
|
@ -102,12 +102,11 @@ func buildMongoURI(config *config.GlobalConfig) string {
|
|||||||
maxPoolSize = fmt.Sprint(config.Mongo.MaxPoolSize)
|
maxPoolSize = fmt.Sprint(config.Mongo.MaxPoolSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
uriFormat := "mongodb://%s/%s?maxPoolSize=%s"
|
|
||||||
if username != "" && password != "" {
|
if username != "" && password != "" {
|
||||||
uriFormat = "mongodb://%s:%s@%s/%s?maxPoolSize=%s"
|
|
||||||
return fmt.Sprintf(uriFormat, username, password, address, database, maxPoolSize)
|
return fmt.Sprintf("mongodb://%s:%s@%s/%s?maxPoolSize=%s", username, password, address, database, maxPoolSize)
|
||||||
}
|
}
|
||||||
return fmt.Sprintf(uriFormat, address, database, maxPoolSize)
|
return fmt.Sprintf("mongodb://%s/%s?maxPoolSize=%s", address, database, maxPoolSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
func shouldRetry(err error) bool {
|
func shouldRetry(err error) bool {
|
||||||
|
@ -100,7 +100,7 @@ func (cd *ConnDirect) GetConns(ctx context.Context,
|
|||||||
for _, port := range ports {
|
for _, port := range ports {
|
||||||
conn, err := cd.dialServiceWithoutResolver(ctx, fmt.Sprintf(cd.config.Rpc.ListenIP+":%d", port), append(cd.additionalOpts, opts...)...)
|
conn, err := cd.dialServiceWithoutResolver(ctx, fmt.Sprintf(cd.config.Rpc.ListenIP+":%d", port), append(cd.additionalOpts, opts...)...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("connect to port %d failed,serviceName %s, IP %s\n", port, serviceName, cd.config.Rpc.ListenIP)
|
return nil, errs.Wrap(fmt.Errorf("connect to port %d failed,serviceName %s, IP %s", port, serviceName, cd.config.Rpc.ListenIP))
|
||||||
}
|
}
|
||||||
connections = append(connections, conn)
|
connections = append(connections, conn)
|
||||||
}
|
}
|
||||||
@ -166,7 +166,7 @@ func (cd *ConnDirect) dialServiceWithoutResolver(ctx context.Context, address st
|
|||||||
conn, err := grpc.DialContext(ctx, address, options...)
|
conn, err := grpc.DialContext(ctx, address, options...)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, errs.Wrap(err)
|
||||||
}
|
}
|
||||||
return conn, nil
|
return conn, nil
|
||||||
}
|
}
|
||||||
|
@ -18,12 +18,13 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/OpenIMSDK/tools/errs"
|
||||||
|
util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
|
||||||
|
|
||||||
"github.com/OpenIMSDK/protocol/constant"
|
"github.com/OpenIMSDK/protocol/constant"
|
||||||
"github.com/OpenIMSDK/protocol/msg"
|
"github.com/OpenIMSDK/protocol/msg"
|
||||||
"github.com/OpenIMSDK/protocol/sdkws"
|
"github.com/OpenIMSDK/protocol/sdkws"
|
||||||
"github.com/OpenIMSDK/tools/discoveryregistry"
|
"github.com/OpenIMSDK/tools/discoveryregistry"
|
||||||
"github.com/OpenIMSDK/tools/errs"
|
|
||||||
"github.com/OpenIMSDK/tools/log"
|
"github.com/OpenIMSDK/tools/log"
|
||||||
"github.com/OpenIMSDK/tools/utils"
|
"github.com/OpenIMSDK/tools/utils"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||||
@ -135,7 +136,7 @@ type Message struct {
|
|||||||
func NewMessage(discov discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) *Message {
|
func NewMessage(discov discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) *Message {
|
||||||
conn, err := discov.GetConn(context.Background(), config.RpcRegisterName.OpenImMsgName)
|
conn, err := discov.GetConn(context.Background(), config.RpcRegisterName.OpenImMsgName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
util.ExitWithError(err)
|
||||||
}
|
}
|
||||||
client := msg.NewMsgClient(conn)
|
client := msg.NewMsgClient(conn)
|
||||||
return &Message{discov: discov, conn: conn, Client: client, Config: config}
|
return &Message{discov: discov, conn: conn, Client: client, Config: config}
|
||||||
|
@ -161,7 +161,7 @@ func NewTask[A interface{ TableName() string }, B any, C any](gormDB *gorm.DB, m
|
|||||||
tableName := zero.TableName()
|
tableName := zero.TableName()
|
||||||
coll, err := getColl(obj)
|
coll, err := getColl(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("get mongo collection %s failed, err: %w", tableName, err)
|
return errs.Wrap(fmt.Errorf("get mongo collection %s failed, err: %w", tableName, err))
|
||||||
}
|
}
|
||||||
var count int
|
var count int
|
||||||
defer func() {
|
defer func() {
|
||||||
@ -174,7 +174,7 @@ func NewTask[A interface{ TableName() string }, B any, C any](gormDB *gorm.DB, m
|
|||||||
if mysqlErr, ok := err.(*mysql.MySQLError); ok && mysqlErr.Number == 1146 {
|
if mysqlErr, ok := err.(*mysql.MySQLError); ok && mysqlErr.Number == 1146 {
|
||||||
return nil // table not exist
|
return nil // table not exist
|
||||||
}
|
}
|
||||||
return fmt.Errorf("find mysql table %s failed, err: %w", tableName, err)
|
return errs.Wrap(fmt.Errorf("find mysql table %s failed, err: %w", tableName, err))
|
||||||
}
|
}
|
||||||
if len(res) == 0 {
|
if len(res) == 0 {
|
||||||
return nil
|
return nil
|
||||||
@ -184,7 +184,7 @@ func NewTask[A interface{ TableName() string }, B any, C any](gormDB *gorm.DB, m
|
|||||||
temp[i] = convert(res[i])
|
temp[i] = convert(res[i])
|
||||||
}
|
}
|
||||||
if err := insertMany(coll, temp); err != nil {
|
if err := insertMany(coll, temp); err != nil {
|
||||||
return fmt.Errorf("insert mongo table %s failed, err: %w", tableName, err)
|
return errs.Wrap(fmt.Errorf("insert mongo table %s failed, err: %w", tableName, err))
|
||||||
}
|
}
|
||||||
count += len(res)
|
count += len(res)
|
||||||
if len(res) < batch {
|
if len(res) < batch {
|
||||||
@ -197,7 +197,7 @@ func NewTask[A interface{ TableName() string }, B any, C any](gormDB *gorm.DB, m
|
|||||||
func insertMany(coll *mongo.Collection, objs []any) error {
|
func insertMany(coll *mongo.Collection, objs []any) error {
|
||||||
if _, err := coll.InsertMany(context.Background(), objs); err != nil {
|
if _, err := coll.InsertMany(context.Background(), objs); err != nil {
|
||||||
if !mongo.IsDuplicateKeyError(err) {
|
if !mongo.IsDuplicateKeyError(err) {
|
||||||
return err
|
return errs.Wrap(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for i := range objs {
|
for i := range objs {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user