mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-26 19:46:57 +08:00
Error code standardization
This commit is contained in:
parent
c318c62542
commit
7338b633c8
@ -1,27 +1,11 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
rpcConversation "Open_IM/internal/rpc/conversation"
|
"Open_IM/internal/rpc/conversation"
|
||||||
|
"Open_IM/internal/startrpc"
|
||||||
"Open_IM/pkg/common/config"
|
"Open_IM/pkg/common/config"
|
||||||
"Open_IM/pkg/common/constant"
|
|
||||||
prome "Open_IM/pkg/common/prome"
|
|
||||||
"flag"
|
|
||||||
"fmt"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
defaultPorts := config.Config.RpcPort.OpenImConversationPort
|
startrpc.Start(config.Config.RpcPort.OpenImConversationPort, config.Config.RpcRegisterName.OpenImConversationName, config.Config.Prometheus.ConversationPrometheusPort, conversation.Start)
|
||||||
rpcPort := flag.Int("port", defaultPorts[0], "RpcConversation default listen port 11300")
|
|
||||||
prometheusPort := flag.Int("prometheus_port", config.Config.Prometheus.ConversationPrometheusPort[0], "conversationPrometheusPort default listen port")
|
|
||||||
flag.Parse()
|
|
||||||
fmt.Println("start conversation rpc server, port: ", *rpcPort, ", OpenIM version: ", constant.CurrentVersion, "\n")
|
|
||||||
rpcServer := rpcConversation.NewRpcConversationServer(*rpcPort)
|
|
||||||
go func() {
|
|
||||||
err := prome.StartPromeSrv(*prometheusPort)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
rpcServer.Run()
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -2,6 +2,7 @@ package check
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"Open_IM/pkg/common/config"
|
"Open_IM/pkg/common/config"
|
||||||
|
"Open_IM/pkg/common/constant"
|
||||||
discoveryRegistry "Open_IM/pkg/discoveryregistry"
|
discoveryRegistry "Open_IM/pkg/discoveryregistry"
|
||||||
"Open_IM/pkg/proto/friend"
|
"Open_IM/pkg/proto/friend"
|
||||||
sdkws "Open_IM/pkg/proto/sdkws"
|
sdkws "Open_IM/pkg/proto/sdkws"
|
||||||
@ -50,11 +51,25 @@ func (f *FriendChecker) IsFriend(ctx context.Context, possibleFriendUserID, user
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (f *FriendChecker) GetAllPageFriends(ctx context.Context, ownerUserID string) (resp []*sdkws.FriendInfo, err error) {
|
func (f *FriendChecker) GetAllPageFriends(ctx context.Context, ownerUserID string) (resp []*sdkws.FriendInfo, err error) {
|
||||||
|
|
||||||
cc, err := f.getConn()
|
cc, err := f.getConn()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
page := int32(0)
|
||||||
resp, err := friend.NewFriendClient(cc).GetPaginationFriends(ctx)
|
req := friend.GetPaginationFriendsReq{UserID: ownerUserID}
|
||||||
|
for {
|
||||||
|
req.Pagination = &sdkws.RequestPagination{PageNumber: page, ShowNumber: constant.ShowNumber}
|
||||||
|
tmp, err := friend.NewFriendClient(cc).GetPaginationFriends(ctx, &req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if len(tmp.FriendsInfo) == 0 {
|
||||||
|
if tmp.Total == int32(len(resp)) {
|
||||||
|
return resp, nil
|
||||||
|
}
|
||||||
|
return nil, constant.ErrData.Wrap("total != resp, but result is nil")
|
||||||
|
}
|
||||||
|
resp = append(resp, tmp.FriendsInfo...)
|
||||||
|
page++
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -2,132 +2,41 @@ package conversation
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"Open_IM/internal/common/check"
|
"Open_IM/internal/common/check"
|
||||||
chat "Open_IM/internal/rpc/msg"
|
|
||||||
"Open_IM/pkg/common/constant"
|
"Open_IM/pkg/common/constant"
|
||||||
"Open_IM/pkg/common/db/cache"
|
"Open_IM/pkg/common/db/cache"
|
||||||
"Open_IM/pkg/common/db/controller"
|
"Open_IM/pkg/common/db/controller"
|
||||||
"Open_IM/pkg/common/db/relation"
|
"Open_IM/pkg/common/db/relation"
|
||||||
tableRelation "Open_IM/pkg/common/db/table/relation"
|
tableRelation "Open_IM/pkg/common/db/table/relation"
|
||||||
|
"github.com/OpenIMSDK/openKeeper"
|
||||||
|
|
||||||
"Open_IM/pkg/common/log"
|
"Open_IM/internal/common/notification"
|
||||||
promePkg "Open_IM/pkg/common/prome"
|
|
||||||
pbConversation "Open_IM/pkg/proto/conversation"
|
pbConversation "Open_IM/pkg/proto/conversation"
|
||||||
"Open_IM/pkg/utils"
|
"Open_IM/pkg/utils"
|
||||||
"context"
|
"context"
|
||||||
"github.com/dtm-labs/rockscache"
|
|
||||||
"net"
|
|
||||||
"strconv"
|
|
||||||
"strings"
|
|
||||||
|
|
||||||
grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
|
|
||||||
|
|
||||||
"Open_IM/pkg/common/config"
|
|
||||||
|
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
type conversationServer struct {
|
type conversationServer struct {
|
||||||
rpcPort int
|
groupChecker *check.GroupChecker
|
||||||
rpcRegisterName string
|
|
||||||
etcdSchema string
|
|
||||||
etcdAddr []string
|
|
||||||
groupChecker *check.GroupChecker
|
|
||||||
controller.ConversationInterface
|
controller.ConversationInterface
|
||||||
|
notify *notification.Check
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewConversationServer(port int) *conversationServer {
|
func Start(client *openKeeper.ZkClient, server *grpc.Server) error {
|
||||||
log.NewPrivateLog(constant.LogFileName)
|
db, err := relation.NewGormDB()
|
||||||
c := conversationServer{
|
|
||||||
rpcPort: port,
|
|
||||||
rpcRegisterName: config.Config.RpcRegisterName.OpenImConversationName,
|
|
||||||
etcdSchema: config.Config.Etcd.EtcdSchema,
|
|
||||||
etcdAddr: config.Config.Etcd.EtcdAddr,
|
|
||||||
groupChecker: check.NewGroupChecker(),
|
|
||||||
}
|
|
||||||
var cDB relation.Conversation
|
|
||||||
var cCache cache.ConversationCache
|
|
||||||
//mysql init
|
|
||||||
var mysql relation.Mysql
|
|
||||||
err := mysql.InitConn().AutoMigrateModel(&tableRelation.ConversationModel{})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic("db init err:" + err.Error())
|
return err
|
||||||
}
|
}
|
||||||
if mysql.GormConn() != nil {
|
if err := db.AutoMigrate(&tableRelation.ConversationModel{}); err != nil {
|
||||||
//get gorm model
|
return err
|
||||||
cDB = relation.NewConversationGorm(mysql.GormConn())
|
|
||||||
} else {
|
|
||||||
panic("db init err:" + "conn is nil")
|
|
||||||
}
|
}
|
||||||
//redis init
|
pbConversation.RegisterConversationServer(server, &conversationServer{
|
||||||
var redis cache.RedisClient
|
groupChecker: check.NewGroupChecker(client),
|
||||||
redis.InitRedis()
|
ConversationInterface: controller.NewConversationController(controller.NewConversationDataBase(controller.NewConversationGorm(db), cache.NewConversationRedis(nil))),
|
||||||
rcClient := rockscache.NewClient(redis.GetClient(), rockscache.Options{
|
|
||||||
RandomExpireAdjustment: 0.2,
|
|
||||||
DisableCacheRead: false,
|
|
||||||
DisableCacheDelete: false,
|
|
||||||
StrongConsistency: true,
|
|
||||||
})
|
})
|
||||||
cCache = cache.NewConversationRedis(rcClient)
|
controller.NewConversationDataBase()
|
||||||
|
controller.NewConversationController()
|
||||||
database := controller.NewConversationDataBase(cDB, cCache)
|
return nil
|
||||||
c.ConversationInterface = controller.NewConversationController(database)
|
|
||||||
return &c
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *conversationServer) Run() {
|
|
||||||
log.NewInfo("0", "rpc conversation start...")
|
|
||||||
|
|
||||||
listenIP := ""
|
|
||||||
if config.Config.ListenIP == "" {
|
|
||||||
listenIP = "0.0.0.0"
|
|
||||||
} else {
|
|
||||||
listenIP = config.Config.ListenIP
|
|
||||||
}
|
|
||||||
address := listenIP + ":" + strconv.Itoa(c.rpcPort)
|
|
||||||
|
|
||||||
listener, err := net.Listen("tcp", address)
|
|
||||||
if err != nil {
|
|
||||||
panic("listening err:" + err.Error() + c.rpcRegisterName)
|
|
||||||
}
|
|
||||||
log.NewInfo("0", "listen network success, ", address, listener)
|
|
||||||
//grpc server
|
|
||||||
var grpcOpts []grpc.ServerOption
|
|
||||||
if config.Config.Prometheus.Enable {
|
|
||||||
promePkg.NewGrpcRequestCounter()
|
|
||||||
promePkg.NewGrpcRequestFailedCounter()
|
|
||||||
promePkg.NewGrpcRequestSuccessCounter()
|
|
||||||
grpcOpts = append(grpcOpts, []grpc.ServerOption{
|
|
||||||
// grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme),
|
|
||||||
grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor),
|
|
||||||
grpc.UnaryInterceptor(grpcPrometheus.UnaryServerInterceptor),
|
|
||||||
}...)
|
|
||||||
}
|
|
||||||
srv := grpc.NewServer(grpcOpts...)
|
|
||||||
defer srv.GracefulStop()
|
|
||||||
|
|
||||||
//service registers with etcd
|
|
||||||
pbConversation.RegisterConversationServer(srv, c)
|
|
||||||
rpcRegisterIP := config.Config.RpcRegisterIP
|
|
||||||
if config.Config.RpcRegisterIP == "" {
|
|
||||||
rpcRegisterIP, err = utils.GetLocalIP()
|
|
||||||
if err != nil {
|
|
||||||
log.Error("", "GetLocalIP failed ", err.Error())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
log.NewInfo("", "rpcRegisterIP", rpcRegisterIP)
|
|
||||||
err = rpc.RegisterEtcd(c.etcdSchema, strings.Join(c.etcdAddr, ","), rpcRegisterIP, c.rpcPort, c.rpcRegisterName, 10, "")
|
|
||||||
if err != nil {
|
|
||||||
log.NewError("0", "RegisterEtcd failed ", err.Error(),
|
|
||||||
c.etcdSchema, strings.Join(c.etcdAddr, ","), rpcRegisterIP, c.rpcPort, c.rpcRegisterName)
|
|
||||||
panic(utils.Wrap(err, "register conversation module rpc to etcd err"))
|
|
||||||
}
|
|
||||||
log.NewInfo("0", "RegisterConversationServer ok ", c.etcdSchema, strings.Join(c.etcdAddr, ","), rpcRegisterIP, c.rpcPort, c.rpcRegisterName)
|
|
||||||
err = srv.Serve(listener)
|
|
||||||
if err != nil {
|
|
||||||
log.NewError("0", "Serve failed ", err.Error())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
log.NewInfo("0", "rpc conversation ok")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *conversationServer) GetConversation(ctx context.Context, req *pbConversation.GetConversationReq) (*pbConversation.GetConversationResp, error) {
|
func (c *conversationServer) GetConversation(ctx context.Context, req *pbConversation.GetConversationReq) (*pbConversation.GetConversationResp, error) {
|
||||||
@ -179,7 +88,7 @@ func (c *conversationServer) BatchSetConversations(ctx context.Context, req *pbC
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
chat.ConversationChangeNotification(ctx, req.OwnerUserID)
|
c.notify.ConversationChangeNotification(ctx, req.OwnerUserID)
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -196,7 +105,7 @@ func (c *conversationServer) ModifyConversationField(ctx context.Context, req *p
|
|||||||
var err error
|
var err error
|
||||||
isSyncConversation := true
|
isSyncConversation := true
|
||||||
if req.Conversation.ConversationType == constant.GroupChatType {
|
if req.Conversation.ConversationType == constant.GroupChatType {
|
||||||
groupInfo, err := c.groupChecker.GetGroupInfo(req.Conversation.GroupID)
|
groupInfo, err := c.groupChecker.GetGroupInfo(ctx, req.Conversation.GroupID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -213,7 +122,7 @@ func (c *conversationServer) ModifyConversationField(ctx context.Context, req *p
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
chat.ConversationSetPrivateNotification(ctx, req.Conversation.OwnerUserID, req.Conversation.UserID, req.Conversation.IsPrivateChat)
|
c.notify.ConversationSetPrivateNotification(ctx, req.Conversation.OwnerUserID, req.Conversation.UserID, req.Conversation.IsPrivateChat)
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
//haveUserID, err := c.ConversationInterface.GetUserIDExistConversation(ctx, req.UserIDList, req.Conversation.ConversationID)
|
//haveUserID, err := c.ConversationInterface.GetUserIDExistConversation(ctx, req.UserIDList, req.Conversation.ConversationID)
|
||||||
@ -247,11 +156,11 @@ func (c *conversationServer) ModifyConversationField(ctx context.Context, req *p
|
|||||||
|
|
||||||
if isSyncConversation {
|
if isSyncConversation {
|
||||||
for _, v := range req.UserIDList {
|
for _, v := range req.UserIDList {
|
||||||
chat.ConversationChangeNotification(ctx, v)
|
c.notify.ConversationChangeNotification(ctx, v)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
for _, v := range req.UserIDList {
|
for _, v := range req.UserIDList {
|
||||||
chat.ConversationUnreadChangeNotification(ctx, v, req.Conversation.ConversationID, req.Conversation.UpdateUnreadCountTime)
|
c.notify.ConversationUnreadChangeNotification(ctx, v, req.Conversation.ConversationID, req.Conversation.UpdateUnreadCountTime)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return resp, nil
|
return resp, nil
|
||||||
|
@ -7,10 +7,10 @@ import (
|
|||||||
"Open_IM/pkg/common/constant"
|
"Open_IM/pkg/common/constant"
|
||||||
"Open_IM/pkg/common/db/controller"
|
"Open_IM/pkg/common/db/controller"
|
||||||
"Open_IM/pkg/common/db/relation"
|
"Open_IM/pkg/common/db/relation"
|
||||||
relationTb "Open_IM/pkg/common/db/table/relation"
|
tablerelation "Open_IM/pkg/common/db/table/relation"
|
||||||
"Open_IM/pkg/common/tokenverify"
|
"Open_IM/pkg/common/tokenverify"
|
||||||
"Open_IM/pkg/common/tracelog"
|
"Open_IM/pkg/common/tracelog"
|
||||||
discoveryRegistry "Open_IM/pkg/discoveryregistry"
|
registry "Open_IM/pkg/discoveryregistry"
|
||||||
pbfriend "Open_IM/pkg/proto/friend"
|
pbfriend "Open_IM/pkg/proto/friend"
|
||||||
"Open_IM/pkg/utils"
|
"Open_IM/pkg/utils"
|
||||||
"context"
|
"context"
|
||||||
@ -23,7 +23,7 @@ type friendServer struct {
|
|||||||
controller.BlackInterface
|
controller.BlackInterface
|
||||||
notification *notification.Check
|
notification *notification.Check
|
||||||
userCheck *check.UserCheck
|
userCheck *check.UserCheck
|
||||||
RegisterCenter discoveryRegistry.SvcDiscoveryRegistry
|
RegisterCenter registry.SvcDiscoveryRegistry
|
||||||
}
|
}
|
||||||
|
|
||||||
func Start(client *openKeeper.ZkClient, server *grpc.Server) error {
|
func Start(client *openKeeper.ZkClient, server *grpc.Server) error {
|
||||||
@ -31,7 +31,7 @@ func Start(client *openKeeper.ZkClient, server *grpc.Server) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := mysql.AutoMigrate(&relationTb.FriendModel{}, &relationTb.FriendRequestModel{}, &relationTb.BlackModel{}); err != nil {
|
if err := mysql.AutoMigrate(&tablerelation.FriendModel{}, &tablerelation.FriendRequestModel{}, &tablerelation.BlackModel{}); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
pbfriend.RegisterFriendServer(server, &friendServer{
|
pbfriend.RegisterFriendServer(server, &friendServer{
|
||||||
@ -102,7 +102,7 @@ func (s *friendServer) RespondFriendApply(ctx context.Context, req *pbfriend.Res
|
|||||||
if err := s.userCheck.Access(ctx, req.ToUserID); err != nil {
|
if err := s.userCheck.Access(ctx, req.ToUserID); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
friendRequest := relationTb.FriendRequestModel{FromUserID: req.FromUserID, ToUserID: req.ToUserID, HandleMsg: req.HandleMsg, HandleResult: req.HandleResult}
|
friendRequest := tablerelation.FriendRequestModel{FromUserID: req.FromUserID, ToUserID: req.ToUserID, HandleMsg: req.HandleMsg, HandleResult: req.HandleResult}
|
||||||
if req.HandleResult == constant.FriendResponseAgree {
|
if req.HandleResult == constant.FriendResponseAgree {
|
||||||
err := s.AgreeFriendRequest(ctx, &friendRequest)
|
err := s.AgreeFriendRequest(ctx, &friendRequest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -158,20 +158,21 @@ func (s *friendServer) SetFriendRemark(ctx context.Context, req *pbfriend.SetFri
|
|||||||
|
|
||||||
// ok
|
// ok
|
||||||
func (s *friendServer) GetDesignatedFriends(ctx context.Context, req *pbfriend.GetDesignatedFriendsReq) (resp *pbfriend.GetDesignatedFriendsResp, err error) {
|
func (s *friendServer) GetDesignatedFriends(ctx context.Context, req *pbfriend.GetDesignatedFriendsReq) (resp *pbfriend.GetDesignatedFriendsResp, err error) {
|
||||||
|
|
||||||
resp = &pbfriend.GetDesignatedFriendsResp{}
|
resp = &pbfriend.GetDesignatedFriendsResp{}
|
||||||
if err := s.userCheck.Access(ctx, req.UserID); err != nil {
|
|
||||||
return nil, err
|
if utils.Duplicate(req.FriendUserIDs) {
|
||||||
|
return nil, constant.ErrArgs.Wrap("friend userID repeated")
|
||||||
}
|
}
|
||||||
friends, total, err := s.FriendInterface.PageOwnerFriends(ctx, req.UserID, req.Pagination.PageNumber, req.Pagination.ShowNumber)
|
friends, err := s.FriendInterface.FindFriendsWithError(ctx, req.OwnerUserID, req.FriendUserIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
resp.FriendsInfo, err = (*convert.NewDBFriend(nil, s.RegisterCenter)).DB2PB(ctx, friends)
|
if resp.FriendsInfo, err = (*convert.NewDBFriend(nil, s.RegisterCenter)).DB2PB(ctx, friends); err != nil {
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
resp.Total = int32(total)
|
|
||||||
return resp, nil
|
return resp, nil
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ok 获取接收到的好友申请(即别人主动申请的)
|
// ok 获取接收到的好友申请(即别人主动申请的)
|
||||||
@ -223,15 +224,17 @@ func (s *friendServer) IsFriend(ctx context.Context, req *pbfriend.IsFriendReq)
|
|||||||
// ok
|
// ok
|
||||||
func (s *friendServer) GetPaginationFriends(ctx context.Context, req *pbfriend.GetPaginationFriendsReq) (resp *pbfriend.GetPaginationFriendsResp, err error) {
|
func (s *friendServer) GetPaginationFriends(ctx context.Context, req *pbfriend.GetPaginationFriendsReq) (resp *pbfriend.GetPaginationFriendsResp, err error) {
|
||||||
resp = &pbfriend.GetPaginationFriendsResp{}
|
resp = &pbfriend.GetPaginationFriendsResp{}
|
||||||
if utils.Duplicate(req.FriendUserIDs) {
|
if err := s.userCheck.Access(ctx, req.UserID); err != nil {
|
||||||
return nil, constant.ErrArgs.Wrap("friend userID repeated")
|
return nil, err
|
||||||
}
|
}
|
||||||
friends, err := s.FriendInterface.FindFriendsWithError(ctx, req.OwnerUserID, req.FriendUserIDs)
|
friends, total, err := s.FriendInterface.PageOwnerFriends(ctx, req.UserID, req.Pagination.PageNumber, req.Pagination.ShowNumber)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if resp.FriendsInfo, err = (*convert.NewDBFriend(nil, s.RegisterCenter)).DB2PB(ctx, friends); err != nil {
|
resp.FriendsInfo, err = (*convert.NewDBFriend(nil, s.RegisterCenter)).DB2PB(ctx, friends)
|
||||||
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
resp.Total = int32(total)
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
@ -10,7 +10,7 @@ import (
|
|||||||
tablerelation "Open_IM/pkg/common/db/table/relation"
|
tablerelation "Open_IM/pkg/common/db/table/relation"
|
||||||
"Open_IM/pkg/common/tokenverify"
|
"Open_IM/pkg/common/tokenverify"
|
||||||
"Open_IM/pkg/common/tracelog"
|
"Open_IM/pkg/common/tracelog"
|
||||||
discoveryRegistry "Open_IM/pkg/discoveryregistry"
|
registry "Open_IM/pkg/discoveryregistry"
|
||||||
"Open_IM/pkg/proto/sdkws"
|
"Open_IM/pkg/proto/sdkws"
|
||||||
pbuser "Open_IM/pkg/proto/user"
|
pbuser "Open_IM/pkg/proto/user"
|
||||||
"Open_IM/pkg/utils"
|
"Open_IM/pkg/utils"
|
||||||
@ -24,7 +24,8 @@ type userServer struct {
|
|||||||
notification *notification.Check
|
notification *notification.Check
|
||||||
userCheck *check.UserCheck
|
userCheck *check.UserCheck
|
||||||
ConversationChecker *check.ConversationChecker
|
ConversationChecker *check.ConversationChecker
|
||||||
RegisterCenter discoveryRegistry.SvcDiscoveryRegistry
|
RegisterCenter registry.SvcDiscoveryRegistry
|
||||||
|
friendCheck *check.FriendChecker
|
||||||
}
|
}
|
||||||
|
|
||||||
func Start(client *openKeeper.ZkClient, server *grpc.Server) error {
|
func Start(client *openKeeper.ZkClient, server *grpc.Server) error {
|
||||||
@ -58,11 +59,6 @@ func (s *userServer) GetDesignateUsers(ctx context.Context, req *pbuser.GetDesig
|
|||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *userServer) GetAllPageFriends(ctx context.Context, ownerUserID string) (resp []*sdkws.FriendInfo, err error) {
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// ok
|
// ok
|
||||||
func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserInfoReq) (resp *pbuser.UpdateUserInfoResp, err error) {
|
func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserInfoReq) (resp *pbuser.UpdateUserInfoResp, err error) {
|
||||||
resp = &pbuser.UpdateUserInfoResp{}
|
resp = &pbuser.UpdateUserInfoResp{}
|
||||||
@ -78,7 +74,7 @@ func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserI
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
friends, err := s.GetAllPageFriends(ctx, req.UserInfo.UserID)
|
friends, err := s.friendCheck.GetAllPageFriends(ctx, req.UserInfo.UserID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -87,9 +83,7 @@ func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserI
|
|||||||
s.notification.FriendInfoUpdatedNotification(ctx, req.UserInfo.UserID, v.FriendUser.UserID, tracelog.GetOpUserID(ctx))
|
s.notification.FriendInfoUpdatedNotification(ctx, req.UserInfo.UserID, v.FriendUser.UserID, tracelog.GetOpUserID(ctx))
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
s.notification.UserInfoUpdatedNotification(ctx, tracelog.GetOpUserID(ctx), req.UserInfo.UserID)
|
s.notification.UserInfoUpdatedNotification(ctx, tracelog.GetOpUserID(ctx), req.UserInfo.UserID)
|
||||||
|
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -315,10 +315,6 @@ func GroupIsBanPrivateChat(status int32) bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
|
||||||
TokenKicked = 1001
|
|
||||||
)
|
|
||||||
|
|
||||||
const BigVersion = "v2"
|
const BigVersion = "v2"
|
||||||
|
|
||||||
const LogFileName = "OpenIM.log"
|
const LogFileName = "OpenIM.log"
|
||||||
|
33
pkg/common/db/cache/redis.go
vendored
33
pkg/common/db/cache/redis.go
vendored
@ -35,6 +35,8 @@ const (
|
|||||||
sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:"
|
sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:"
|
||||||
userBadgeUnreadCountSum = "USER_BADGE_UNREAD_COUNT_SUM:"
|
userBadgeUnreadCountSum = "USER_BADGE_UNREAD_COUNT_SUM:"
|
||||||
exTypeKeyLocker = "EX_LOCK:"
|
exTypeKeyLocker = "EX_LOCK:"
|
||||||
|
|
||||||
|
uidPidToken = "UID_PID_TOKEN_STATUS:"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Cache interface {
|
type Cache interface {
|
||||||
@ -52,7 +54,9 @@ type Cache interface {
|
|||||||
SetGroupMinSeq(ctx context.Context, groupID string, minSeq int64) error
|
SetGroupMinSeq(ctx context.Context, groupID string, minSeq int64) error
|
||||||
|
|
||||||
AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error
|
AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error
|
||||||
GetTokenMapByUidPid(ctx context.Context, userID, platformID string) (map[string]int, error)
|
|
||||||
|
GetTokensWithoutError(ctx context.Context, userID, platformID string) (map[string]int, error)
|
||||||
|
|
||||||
SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error
|
SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error
|
||||||
DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error
|
DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error
|
||||||
GetMessageListBySeq(ctx context.Context, userID string, seqList []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error)
|
GetMessageListBySeq(ctx context.Context, userID string, seqList []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error)
|
||||||
@ -209,11 +213,12 @@ func (r *RedisClient) SetGroupMinSeq(ctx context.Context, groupID string, minSeq
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Store userid and platform class to redis
|
// Store userid and platform class to redis
|
||||||
func (r *RedisClient) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error {
|
func (r *RedisClient) AddTokenFlag(ctx context.Context, userID string, platform string, token string, flag int) error {
|
||||||
key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID)
|
key := uidPidToken + userID + ":" + platform
|
||||||
return r.rdb.HSet(context.Background(), key, token, flag).Err()
|
return r.rdb.HSet(context.Background(), key, token, flag).Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//key:userID+platform-> <token, flag>
|
||||||
func (r *RedisClient) GetTokenMapByUidPid(ctx context.Context, userID, platformID string) (map[string]int, error) {
|
func (r *RedisClient) GetTokenMapByUidPid(ctx context.Context, userID, platformID string) (map[string]int, error) {
|
||||||
key := uidPidToken + userID + ":" + platformID
|
key := uidPidToken + userID + ":" + platformID
|
||||||
m, err := r.rdb.HGetAll(context.Background(), key).Result()
|
m, err := r.rdb.HGetAll(context.Background(), key).Result()
|
||||||
@ -223,8 +228,22 @@ func (r *RedisClient) GetTokenMapByUidPid(ctx context.Context, userID, platformI
|
|||||||
}
|
}
|
||||||
return mm, err
|
return mm, err
|
||||||
}
|
}
|
||||||
func (r *RedisClient) SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error {
|
|
||||||
key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID)
|
func (r *RedisClient) GetTokensWithoutError(ctx context.Context, userID, platform string) (map[string]int, error) {
|
||||||
|
key := uidPidToken + userID + ":" + platform
|
||||||
|
m, err := r.rdb.HGetAll(context.Background(), key).Result()
|
||||||
|
if err != nil && err == redis.Nil {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
mm := make(map[string]int)
|
||||||
|
for k, v := range m {
|
||||||
|
mm[k] = utils.StringToInt(v)
|
||||||
|
}
|
||||||
|
return mm, utils.Wrap(err, "")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RedisClient) SetTokenMapByUidPid(ctx context.Context, userID string, platform string, m map[string]int) error {
|
||||||
|
key := uidPidToken + userID + ":" + platform
|
||||||
mm := make(map[string]interface{})
|
mm := make(map[string]interface{})
|
||||||
for k, v := range m {
|
for k, v := range m {
|
||||||
mm[k] = v
|
mm[k] = v
|
||||||
@ -232,8 +251,8 @@ func (r *RedisClient) SetTokenMapByUidPid(ctx context.Context, userID string, pl
|
|||||||
return r.rdb.HSet(context.Background(), key, mm).Err()
|
return r.rdb.HSet(context.Background(), key, mm).Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *RedisClient) DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error {
|
func (r *RedisClient) DeleteTokenByUidPid(ctx context.Context, userID string, platform string, fields []string) error {
|
||||||
key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID)
|
key := uidPidToken + userID + ":" + platform
|
||||||
return r.rdb.HDel(context.Background(), key, fields...).Err()
|
return r.rdb.HDel(context.Background(), key, fields...).Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
34
pkg/common/db/cache/token.go
vendored
34
pkg/common/db/cache/token.go
vendored
@ -5,14 +5,9 @@ import (
|
|||||||
"Open_IM/pkg/common/tokenverify"
|
"Open_IM/pkg/common/tokenverify"
|
||||||
"Open_IM/pkg/utils"
|
"Open_IM/pkg/utils"
|
||||||
"context"
|
"context"
|
||||||
"github.com/go-redis/redis/v8"
|
|
||||||
"github.com/golang-jwt/jwt/v4"
|
"github.com/golang-jwt/jwt/v4"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
|
||||||
uidPidToken = "UID_PID_TOKEN_STATUS:"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Token interface {
|
type Token interface {
|
||||||
//结果为空 不返回错误
|
//结果为空 不返回错误
|
||||||
GetTokensWithoutError(ctx context.Context, userID, platform string) (map[string]int, error)
|
GetTokensWithoutError(ctx context.Context, userID, platform string) (map[string]int, error)
|
||||||
@ -21,9 +16,9 @@ type Token interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type TokenRedis struct {
|
type TokenRedis struct {
|
||||||
RedisClient *RedisClient
|
redisClient *RedisClient
|
||||||
AccessSecret string
|
accessSecret string
|
||||||
AccessExpire int64
|
accessExpire int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTokenRedis(redisClient *RedisClient, accessSecret string, accessExpire int64) *TokenRedis {
|
func NewTokenRedis(redisClient *RedisClient, accessSecret string, accessExpire int64) *TokenRedis {
|
||||||
@ -32,21 +27,12 @@ func NewTokenRedis(redisClient *RedisClient, accessSecret string, accessExpire i
|
|||||||
|
|
||||||
// 结果为空 不返回错误
|
// 结果为空 不返回错误
|
||||||
func (t *TokenRedis) GetTokensWithoutError(ctx context.Context, userID, platform string) (map[string]int, error) {
|
func (t *TokenRedis) GetTokensWithoutError(ctx context.Context, userID, platform string) (map[string]int, error) {
|
||||||
key := uidPidToken + userID + ":" + platform
|
return t.redisClient.GetTokensWithoutError(ctx, userID, platform)
|
||||||
m, err := t.RedisClient.GetClient().HGetAll(context.Background(), key).Result()
|
|
||||||
if err != nil && err == redis.Nil {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
mm := make(map[string]int)
|
|
||||||
for k, v := range m {
|
|
||||||
mm[k] = utils.StringToInt(v)
|
|
||||||
}
|
|
||||||
return mm, utils.Wrap(err, "")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// 创建token
|
// 创建token
|
||||||
func (t *TokenRedis) CreateToken(ctx context.Context, userID string, platform string) (string, error) {
|
func (t *TokenRedis) CreateToken(ctx context.Context, userID string, platform string) (string, error) {
|
||||||
tokens, err := t.GetTokensWithoutError(ctx, userID, platform)
|
tokens, err := t.redisClient.GetTokensWithoutError(ctx, userID, platform)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
@ -58,18 +44,16 @@ func (t *TokenRedis) CreateToken(ctx context.Context, userID string, platform st
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(deleteTokenKey) != 0 {
|
if len(deleteTokenKey) != 0 {
|
||||||
key := uidPidToken + userID + ":" + platform
|
err := t.redisClient.DeleteTokenByUidPid(ctx, userID, platform, deleteTokenKey)
|
||||||
err := t.RedisClient.GetClient().HDel(context.Background(), key, deleteTokenKey...).Err()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
claims := tokenverify.BuildClaims(userID, platform, t.AccessExpire)
|
claims := tokenverify.BuildClaims(userID, platform, t.accessExpire)
|
||||||
token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
|
token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
|
||||||
tokenString, err := token.SignedString([]byte(t.AccessSecret))
|
tokenString, err := token.SignedString([]byte(t.accessSecret))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", utils.Wrap(err, "")
|
return "", utils.Wrap(err, "")
|
||||||
}
|
}
|
||||||
key := uidPidToken + userID + ":" + platform
|
return tokenString, t.redisClient.AddTokenFlag(ctx, userID, platform, tokenString, constant.NormalToken)
|
||||||
return "", utils.Wrap(t.RedisClient.GetClient().HSet(context.Background(), key, tokenString, constant.NormalToken).Err(), "")
|
|
||||||
}
|
}
|
||||||
|
@ -9,7 +9,6 @@ import (
|
|||||||
type AuthInterface interface {
|
type AuthInterface interface {
|
||||||
//结果为空 不返回错误
|
//结果为空 不返回错误
|
||||||
GetTokensWithoutError(ctx context.Context, userID, platform string) (map[string]int, error)
|
GetTokensWithoutError(ctx context.Context, userID, platform string) (map[string]int, error)
|
||||||
|
|
||||||
//创建token
|
//创建token
|
||||||
CreateToken(ctx context.Context, userID string, platform string) (string, error)
|
CreateToken(ctx context.Context, userID string, platform string) (string, error)
|
||||||
}
|
}
|
||||||
|
@ -30,8 +30,11 @@ message parseTokenResp{
|
|||||||
}
|
}
|
||||||
|
|
||||||
service Auth {
|
service Auth {
|
||||||
|
//生成token
|
||||||
rpc userToken(userTokenReq) returns(userTokenResp);
|
rpc userToken(userTokenReq) returns(userTokenResp);
|
||||||
|
//强制退出登录
|
||||||
rpc forceLogout(forceLogoutReq) returns(forceLogoutResp);
|
rpc forceLogout(forceLogoutReq) returns(forceLogoutResp);
|
||||||
|
//解析token
|
||||||
rpc parseToken(parseTokenReq)returns(parseTokenResp);
|
rpc parseToken(parseTokenReq)returns(parseTokenResp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user