mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-12-03 19:02:31 +08:00
Merge branch 'openimsdk:main' into main
This commit is contained in:
commit
6f0362c4cb
@ -18,11 +18,13 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
|
"net/http"
|
||||||
_ "net/http/pprof"
|
_ "net/http/pprof"
|
||||||
|
"os"
|
||||||
|
"os/signal"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"syscall"
|
||||||
ginProm "github.com/openimsdk/open-im-server/v3/pkg/common/ginprometheus"
|
"time"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
|
||||||
|
|
||||||
"github.com/OpenIMSDK/protocol/constant"
|
"github.com/OpenIMSDK/protocol/constant"
|
||||||
"github.com/OpenIMSDK/tools/discoveryregistry"
|
"github.com/OpenIMSDK/tools/discoveryregistry"
|
||||||
@ -33,6 +35,8 @@ import (
|
|||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
|
||||||
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
|
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
|
||||||
|
ginProm "github.com/openimsdk/open-im-server/v3/pkg/common/ginprometheus"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
@ -51,13 +55,12 @@ func run(port int, proPort int) error {
|
|||||||
if port == 0 || proPort == 0 {
|
if port == 0 || proPort == 0 {
|
||||||
err := "port or proPort is empty:" + strconv.Itoa(port) + "," + strconv.Itoa(proPort)
|
err := "port or proPort is empty:" + strconv.Itoa(port) + "," + strconv.Itoa(proPort)
|
||||||
log.ZError(context.Background(), err, nil)
|
log.ZError(context.Background(), err, nil)
|
||||||
|
|
||||||
return fmt.Errorf(err)
|
return fmt.Errorf(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
rdb, err := cache.NewRedis()
|
rdb, err := cache.NewRedis()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.ZError(context.Background(), "Failed to initialize Redis", err)
|
log.ZError(context.Background(), "Failed to initialize Redis", err)
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
log.ZInfo(context.Background(), "api start init discov client")
|
log.ZInfo(context.Background(), "api start init discov client")
|
||||||
@ -68,30 +71,29 @@ func run(port int, proPort int) error {
|
|||||||
client, err = kdisc.NewDiscoveryRegister(config.Config.Envs.Discovery)
|
client, err = kdisc.NewDiscoveryRegister(config.Config.Envs.Discovery)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.ZError(context.Background(), "Failed to initialize discovery register", err)
|
log.ZError(context.Background(), "Failed to initialize discovery register", err)
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = client.CreateRpcRootNodes(config.Config.GetServiceNames()); err != nil {
|
if err = client.CreateRpcRootNodes(config.Config.GetServiceNames()); err != nil {
|
||||||
log.ZError(context.Background(), "Failed to create RPC root nodes", err)
|
log.ZError(context.Background(), "Failed to create RPC root nodes", err)
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
log.ZInfo(context.Background(), "api register public config to discov")
|
log.ZInfo(context.Background(), "api register public config to discov")
|
||||||
if err = client.RegisterConf2Registry(constant.OpenIMCommonConfigKey, config.Config.EncodeConfig()); err != nil {
|
if err = client.RegisterConf2Registry(constant.OpenIMCommonConfigKey, config.Config.EncodeConfig()); err != nil {
|
||||||
log.ZError(context.Background(), "Failed to register public config to discov", err)
|
log.ZError(context.Background(), "Failed to register public config to discov", err)
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
log.ZInfo(context.Background(), "api register public config to discov success")
|
log.ZInfo(context.Background(), "api register public config to discov success")
|
||||||
router := api.NewGinRouter(client, rdb)
|
router := api.NewGinRouter(client, rdb)
|
||||||
//////////////////////////////
|
|
||||||
if config.Config.Prometheus.Enable {
|
if config.Config.Prometheus.Enable {
|
||||||
p := ginProm.NewPrometheus("app", prommetrics.GetGinCusMetrics("Api"))
|
p := ginProm.NewPrometheus("app", prommetrics.GetGinCusMetrics("Api"))
|
||||||
p.SetListenAddress(fmt.Sprintf(":%d", proPort))
|
p.SetListenAddress(fmt.Sprintf(":%d", proPort))
|
||||||
p.Use(router)
|
p.Use(router)
|
||||||
}
|
}
|
||||||
/////////////////////////////////
|
|
||||||
log.ZInfo(context.Background(), "api init router success")
|
log.ZInfo(context.Background(), "api init router success")
|
||||||
|
|
||||||
var address string
|
var address string
|
||||||
if config.Config.Api.ListenIP != "" {
|
if config.Config.Api.ListenIP != "" {
|
||||||
address = net.JoinHostPort(config.Config.Api.ListenIP, strconv.Itoa(port))
|
address = net.JoinHostPort(config.Config.Api.ListenIP, strconv.Itoa(port))
|
||||||
@ -100,10 +102,25 @@ func run(port int, proPort int) error {
|
|||||||
}
|
}
|
||||||
log.ZInfo(context.Background(), "start api server", "address", address, "OpenIM version", config.Version)
|
log.ZInfo(context.Background(), "start api server", "address", address, "OpenIM version", config.Version)
|
||||||
|
|
||||||
err = router.Run(address)
|
server := http.Server{Addr: address, Handler: router}
|
||||||
if err != nil {
|
go func() {
|
||||||
|
err = server.ListenAndServe()
|
||||||
|
if err != nil && err != http.ErrServerClosed {
|
||||||
log.ZError(context.Background(), "api run failed", err, "address", address)
|
log.ZError(context.Background(), "api run failed", err, "address", address)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
sigs := make(chan os.Signal, 1)
|
||||||
|
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
|
||||||
|
<-sigs
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// graceful shutdown operation.
|
||||||
|
if err := server.Shutdown(ctx); err != nil {
|
||||||
|
log.ZError(context.Background(), "failed to api-server shutdown", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -84,8 +84,8 @@ $ sudo sealos run labring/kubernetes:v1.25.0 labring/helm:v3.8.2 labring/calico:
|
|||||||
If you are local, you can also use Kind and Minikube to test, for example, using Kind:
|
If you are local, you can also use Kind and Minikube to test, for example, using Kind:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
$ sGO111MODULE="on" go get sigs.k8s.io/kind@v0.11.1
|
$ GO111MODULE="on" go get sigs.k8s.io/kind@v0.11.1
|
||||||
$ skind create cluster
|
$ kind create cluster
|
||||||
```
|
```
|
||||||
|
|
||||||
### Installing helm
|
### Installing helm
|
||||||
|
|||||||
@ -18,9 +18,12 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
|
cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
|
||||||
|
|
||||||
|
utils2 "github.com/OpenIMSDK/tools/utils"
|
||||||
|
|
||||||
"github.com/redis/go-redis/v9"
|
"github.com/redis/go-redis/v9"
|
||||||
|
|
||||||
"github.com/OpenIMSDK/protocol/constant"
|
"github.com/OpenIMSDK/protocol/constant"
|
||||||
|
"github.com/OpenIMSDK/protocol/conversation"
|
||||||
"github.com/OpenIMSDK/protocol/msg"
|
"github.com/OpenIMSDK/protocol/msg"
|
||||||
"github.com/OpenIMSDK/protocol/sdkws"
|
"github.com/OpenIMSDK/protocol/sdkws"
|
||||||
"github.com/OpenIMSDK/tools/errs"
|
"github.com/OpenIMSDK/tools/errs"
|
||||||
@ -89,10 +92,7 @@ func (m *msgServer) SetConversationHasReadSeq(
|
|||||||
return &msg.SetConversationHasReadSeqResp{}, nil
|
return &msg.SetConversationHasReadSeqResp{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *msgServer) MarkMsgsAsRead(
|
func (m *msgServer) MarkMsgsAsRead(ctx context.Context, req *msg.MarkMsgsAsReadReq) (resp *msg.MarkMsgsAsReadResp, err error) {
|
||||||
ctx context.Context,
|
|
||||||
req *msg.MarkMsgsAsReadReq,
|
|
||||||
) (resp *msg.MarkMsgsAsReadResp, err error) {
|
|
||||||
if len(req.Seqs) < 1 {
|
if len(req.Seqs) < 1 {
|
||||||
return nil, errs.ErrArgs.Wrap("seqs must not be empty")
|
return nil, errs.ErrArgs.Wrap("seqs must not be empty")
|
||||||
}
|
}
|
||||||
@ -128,10 +128,7 @@ func (m *msgServer) MarkMsgsAsRead(
|
|||||||
return &msg.MarkMsgsAsReadResp{}, nil
|
return &msg.MarkMsgsAsReadResp{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *msgServer) MarkConversationAsRead(
|
func (m *msgServer) MarkConversationAsRead(ctx context.Context, req *msg.MarkConversationAsReadReq) (resp *msg.MarkConversationAsReadResp, err error) {
|
||||||
ctx context.Context,
|
|
||||||
req *msg.MarkConversationAsReadReq,
|
|
||||||
) (resp *msg.MarkConversationAsReadResp, err error) {
|
|
||||||
conversation, err := m.Conversation.GetConversation(ctx, req.UserID, req.ConversationID)
|
conversation, err := m.Conversation.GetConversation(ctx, req.UserID, req.ConversationID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -140,47 +137,34 @@ func (m *msgServer) MarkConversationAsRead(
|
|||||||
if err != nil && errs.Unwrap(err) != redis.Nil {
|
if err != nil && errs.Unwrap(err) != redis.Nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
seqs := generateSeqs(hasReadSeq, req)
|
||||||
|
|
||||||
|
if len(seqs) > 0 || req.HasReadSeq > hasReadSeq {
|
||||||
|
err = m.updateReadStatus(ctx, req, conversation, seqs, hasReadSeq)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return &msg.MarkConversationAsReadResp{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func generateSeqs(hasReadSeq int64, req *msg.MarkConversationAsReadReq) []int64 {
|
||||||
var seqs []int64
|
var seqs []int64
|
||||||
|
for _, val := range req.Seqs {
|
||||||
log.ZDebug(ctx, "MarkConversationAsRead", "hasReadSeq", hasReadSeq,
|
if val > hasReadSeq && !utils2.Contain(val, seqs...) {
|
||||||
"req.HasReadSeq", req.HasReadSeq)
|
seqs = append(seqs, val)
|
||||||
if conversation.ConversationType == constant.SingleChatType {
|
|
||||||
for i := hasReadSeq + 1; i <= req.HasReadSeq; i++ {
|
|
||||||
seqs = append(seqs, i)
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
return seqs
|
||||||
|
}
|
||||||
|
|
||||||
if len(seqs) > 0 {
|
func (m *msgServer) updateReadStatus(ctx context.Context, req *msg.MarkConversationAsReadReq, conversation *conversation.Conversation, seqs []int64, hasReadSeq int64) error {
|
||||||
|
if conversation.ConversationType == constant.SingleChatType && len(seqs) > 0 {
|
||||||
log.ZDebug(ctx, "MarkConversationAsRead", "seqs", seqs, "conversationID", req.ConversationID)
|
log.ZDebug(ctx, "MarkConversationAsRead", "seqs", seqs, "conversationID", req.ConversationID)
|
||||||
if err = m.MsgDatabase.MarkSingleChatMsgsAsRead(ctx, req.UserID, req.ConversationID, seqs); err != nil {
|
if err := m.MsgDatabase.MarkSingleChatMsgsAsRead(ctx, req.UserID, req.ConversationID, seqs); err != nil {
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if req.HasReadSeq > hasReadSeq {
|
|
||||||
err = m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, req.HasReadSeq)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
hasReadSeq = req.HasReadSeq
|
|
||||||
}
|
|
||||||
if err = m.sendMarkAsReadNotification(ctx, req.ConversationID, conversation.ConversationType, req.UserID,
|
|
||||||
m.conversationAndGetRecvID(conversation, req.UserID), seqs, hasReadSeq); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
} else if conversation.ConversationType == constant.SuperGroupChatType ||
|
|
||||||
conversation.ConversationType == constant.NotificationChatType {
|
|
||||||
if req.HasReadSeq > hasReadSeq {
|
|
||||||
err = m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, req.HasReadSeq)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
hasReadSeq = req.HasReadSeq
|
|
||||||
}
|
|
||||||
if err = m.sendMarkAsReadNotification(ctx, req.ConversationID, constant.SingleChatType, req.UserID,
|
|
||||||
req.UserID, seqs, hasReadSeq); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
reqCall := &cbapi.CallbackGroupMsgReadReq{
|
reqCall := &cbapi.CallbackGroupMsgReadReq{
|
||||||
SendID: conversation.OwnerUserID,
|
SendID: conversation.OwnerUserID,
|
||||||
ReceiveID: req.UserID,
|
ReceiveID: req.UserID,
|
||||||
@ -191,7 +175,18 @@ func (m *msgServer) MarkConversationAsRead(
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return &msg.MarkConversationAsReadResp{}, nil
|
if req.HasReadSeq > hasReadSeq {
|
||||||
|
if err := m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, req.HasReadSeq); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
recvID := m.conversationAndGetRecvID(conversation, req.UserID)
|
||||||
|
if conversation.ConversationType == constant.SuperGroupChatType || conversation.ConversationType == constant.NotificationChatType {
|
||||||
|
recvID = req.UserID
|
||||||
|
}
|
||||||
|
|
||||||
|
return m.sendMarkAsReadNotification(ctx, req.ConversationID, conversation.ConversationType, req.UserID, recvID, seqs, req.HasReadSeq)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *msgServer) sendMarkAsReadNotification(
|
func (m *msgServer) sendMarkAsReadNotification(
|
||||||
|
|||||||
@ -15,8 +15,6 @@
|
|||||||
package convert
|
package convert
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/OpenIMSDK/protocol/sdkws"
|
"github.com/OpenIMSDK/protocol/sdkws"
|
||||||
|
|
||||||
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
|
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
|
||||||
@ -43,7 +41,6 @@ func UserPb2DB(user *sdkws.UserInfo) *relationtb.UserModel {
|
|||||||
userDB.Nickname = user.Nickname
|
userDB.Nickname = user.Nickname
|
||||||
userDB.FaceURL = user.FaceURL
|
userDB.FaceURL = user.FaceURL
|
||||||
userDB.Ex = user.Ex
|
userDB.Ex = user.Ex
|
||||||
userDB.CreateTime = time.UnixMilli(user.CreateTime)
|
|
||||||
userDB.AppMangerLevel = user.AppMangerLevel
|
userDB.AppMangerLevel = user.AppMangerLevel
|
||||||
userDB.GlobalRecvMsgOpt = user.GlobalRecvMsgOpt
|
userDB.GlobalRecvMsgOpt = user.GlobalRecvMsgOpt
|
||||||
return &userDB
|
return &userDB
|
||||||
|
|||||||
28
pkg/common/db/cache/msg.go
vendored
28
pkg/common/db/cache/msg.go
vendored
@ -645,19 +645,35 @@ func (c *msgCache) PipeDeleteMessages(ctx context.Context, conversationID string
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *msgCache) CleanUpOneConversationAllMsg(ctx context.Context, conversationID string) error {
|
func (c *msgCache) CleanUpOneConversationAllMsg(ctx context.Context, conversationID string) error {
|
||||||
vals, err := c.rdb.Keys(ctx, c.allMessageCacheKey(conversationID)).Result()
|
var (
|
||||||
if errors.Is(err, redis.Nil) {
|
cursor uint64
|
||||||
return nil
|
keys []string
|
||||||
}
|
err error
|
||||||
|
|
||||||
|
key = c.allMessageCacheKey(conversationID)
|
||||||
|
)
|
||||||
|
|
||||||
|
for {
|
||||||
|
// scan up to 10000 at a time, the count (10000) param refers to the number of scans on redis server.
|
||||||
|
// if the count is too small, needs to be run scan on redis frequently.
|
||||||
|
var limit int64 = 10000
|
||||||
|
keys, cursor, err = c.rdb.Scan(ctx, cursor, key, limit).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errs.Wrap(err)
|
return errs.Wrap(err)
|
||||||
}
|
}
|
||||||
for _, v := range vals {
|
|
||||||
if err := c.rdb.Del(ctx, v).Err(); err != nil {
|
for _, key := range keys {
|
||||||
|
err := c.rdb.Del(ctx, key).Err()
|
||||||
|
if err != nil {
|
||||||
return errs.Wrap(err)
|
return errs.Wrap(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// scan end
|
||||||
|
if cursor == 0 {
|
||||||
return nil
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *msgCache) DelMsgFromCache(ctx context.Context, userID string, seqs []int64) error {
|
func (c *msgCache) DelMsgFromCache(ctx context.Context, userID string, seqs []int64) error {
|
||||||
|
|||||||
47
pkg/common/db/cache/msg_test.go
vendored
47
pkg/common/db/cache/msg_test.go
vendored
@ -385,3 +385,50 @@ func testParallelDeleteMessagesMix(t *testing.T, cid string, seqs []int64, input
|
|||||||
assert.EqualValues(t, 1, val) // exists
|
assert.EqualValues(t, 1, val) // exists
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCleanUpOneConversationAllMsg(t *testing.T) {
|
||||||
|
rdb := redis.NewClient(&redis.Options{})
|
||||||
|
defer rdb.Close()
|
||||||
|
|
||||||
|
cacher := msgCache{rdb: rdb}
|
||||||
|
count := 1000
|
||||||
|
prefix := fmt.Sprintf("%v", rand.Int63())
|
||||||
|
|
||||||
|
ids := []string{}
|
||||||
|
for i := 0; i < count; i++ {
|
||||||
|
id := fmt.Sprintf("%v-cid-%v", prefix, rand.Int63())
|
||||||
|
ids = append(ids, id)
|
||||||
|
|
||||||
|
key := cacher.allMessageCacheKey(id)
|
||||||
|
rdb.Set(context.Background(), key, "openim", 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
// delete 100 keys with scan.
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
pickedKey := ids[i]
|
||||||
|
err := cacher.CleanUpOneConversationAllMsg(context.Background(), pickedKey)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
|
||||||
|
ls, err := rdb.Keys(context.Background(), pickedKey).Result()
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, 0, len(ls))
|
||||||
|
|
||||||
|
rcode, err := rdb.Exists(context.Background(), pickedKey).Result()
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.EqualValues(t, 0, rcode) // non-exists
|
||||||
|
}
|
||||||
|
|
||||||
|
sid := fmt.Sprintf("%v-cid-*", prefix)
|
||||||
|
ls, err := rdb.Keys(context.Background(), cacher.allMessageCacheKey(sid)).Result()
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, count-100, len(ls))
|
||||||
|
|
||||||
|
// delete fuzzy matching keys.
|
||||||
|
err = cacher.CleanUpOneConversationAllMsg(context.Background(), sid)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
|
||||||
|
// don't contains keys matched `{prefix}-cid-{random}` on redis
|
||||||
|
ls, err = rdb.Keys(context.Background(), cacher.allMessageCacheKey(sid)).Result()
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, 0, len(ls))
|
||||||
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user