mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-06 04:15:46 +08:00
Merge branch 'tuoyun' into superGroup
# Conflicts: # config/config.yaml # internal/msg_gateway/gate/logic.go # internal/msg_gateway/gate/ws_server.go # internal/push/logic/push_to_client.go # internal/rpc/group/group.go # internal/rpc/msg/send_msg.go # pkg/common/config/config.go # pkg/proto/sdk_ws/ws.pb.go
This commit is contained in:
commit
a0581618a4
@ -235,8 +235,6 @@ groupMessageHasReadReceiptEnable: false
|
||||
#单聊已读开启
|
||||
singleMessageHasReadReceiptEnable: false
|
||||
|
||||
|
||||
|
||||
#token config
|
||||
tokenpolicy:
|
||||
accessSecret: "open_im_server" #token生成相关,默认即可
|
||||
@ -487,30 +485,6 @@ notification:
|
||||
defaultTips:
|
||||
tips: "group member info set"
|
||||
|
||||
groupMemberSetToOrdinaryUser:
|
||||
conversation:
|
||||
reliabilityLevel: 3
|
||||
unreadCount: false
|
||||
offlinePush:
|
||||
switch: false
|
||||
title: "groupMemberSetToOrdinaryUser title"
|
||||
desc: "groupMemberSetToOrdinaryUser desc"
|
||||
ext: "groupMemberSetToOrdinaryUser ext"
|
||||
defaultTips:
|
||||
tips: "was set to ordinaryUser"
|
||||
|
||||
groupMemberSetToAdmin:
|
||||
conversation:
|
||||
reliabilityLevel: 3
|
||||
unreadCount: false
|
||||
offlinePush:
|
||||
switch: false
|
||||
title: "groupMemberSetToAdmin title"
|
||||
desc: "groupMemberSetToAdmin desc"
|
||||
ext: "groupMemberSetToAdmin ext"
|
||||
defaultTips:
|
||||
tips: "was set to admin"
|
||||
#############################organization#################################
|
||||
|
||||
organizationChanged:
|
||||
conversation:
|
||||
@ -727,4 +701,4 @@ demo:
|
||||
imAPIURL: http://127.0.0.1:10002
|
||||
|
||||
rtc:
|
||||
signalTimeout: 300
|
||||
signalTimeout: 60
|
||||
|
@ -21,9 +21,7 @@ func GetRTCInvitationInfo(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req)
|
||||
var ok bool
|
||||
var errInfo string
|
||||
ok, _, errInfo = token_verify.GetUserIDFromToken(c.Request.Header.Get("token"), req.OperationID)
|
||||
ok, userID, errInfo := token_verify.GetUserIDFromToken(c.Request.Header.Get("token"), req.OperationID)
|
||||
if !ok {
|
||||
errMsg := req.OperationID + " " + "GetUserIDFromToken failed " + errInfo + " token:" + c.Request.Header.Get("token")
|
||||
log.NewError(req.OperationID, errMsg)
|
||||
@ -37,6 +35,10 @@ func GetRTCInvitationInfo(c *gin.Context) {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": err.Error()})
|
||||
return
|
||||
}
|
||||
if err := db.DB.DelUserSignalList(userID); err != nil {
|
||||
log.NewError(req.OperationID, utils.GetSelfFuncName(), "DelUserSignalList result:", err.Error())
|
||||
}
|
||||
|
||||
resp.Data.OpUserID = invitationInfo.OpUserID
|
||||
resp.Data.Invitation.RoomID = invitationInfo.Invitation.RoomID
|
||||
resp.Data.Invitation.SessionType = invitationInfo.Invitation.SessionType
|
||||
@ -45,6 +47,9 @@ func GetRTCInvitationInfo(c *gin.Context) {
|
||||
resp.Data.Invitation.InviteeUserIDList = invitationInfo.Invitation.InviteeUserIDList
|
||||
resp.Data.Invitation.MediaType = invitationInfo.Invitation.MediaType
|
||||
resp.Data.Invitation.Timeout = invitationInfo.Invitation.Timeout
|
||||
resp.Data.Invitation.InitiateTime = invitationInfo.Invitation.InitiateTime
|
||||
resp.Data.Invitation.PlatformID = invitationInfo.Invitation.PlatformID
|
||||
resp.Data.Invitation.CustomData = invitationInfo.Invitation.CustomData
|
||||
c.JSON(http.StatusOK, resp)
|
||||
}
|
||||
|
||||
@ -83,6 +88,9 @@ func GetRTCInvitationInfoStartApp(c *gin.Context) {
|
||||
resp.Data.Invitation.InviteeUserIDList = invitationInfo.Invitation.InviteeUserIDList
|
||||
resp.Data.Invitation.MediaType = invitationInfo.Invitation.MediaType
|
||||
resp.Data.Invitation.Timeout = invitationInfo.Invitation.Timeout
|
||||
resp.Data.Invitation.InitiateTime = invitationInfo.Invitation.InitiateTime
|
||||
resp.Data.Invitation.PlatformID = invitationInfo.Invitation.PlatformID
|
||||
resp.Data.Invitation.CustomData = invitationInfo.Invitation.CustomData
|
||||
c.JSON(http.StatusOK, resp)
|
||||
|
||||
}
|
||||
|
@ -91,9 +91,9 @@ func (ws *WServer) getSeqReq(conn *UserConn, m *Req) {
|
||||
ws.getSeqResp(conn, m, nReply)
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
func (ws *WServer) getSeqResp(conn *UserConn, m *Req, pb *sdk_ws.GetMaxAndMinSeqResp) {
|
||||
|
||||
func (ws *WServer) getSeqResp(conn *UserConn, m *Req, pb *pbChat.GetMaxAndMinSeqResp) {
|
||||
log.Debug(m.OperationID, "getSeqResp come here ", pb.String())
|
||||
b, _ := proto.Marshal(pb)
|
||||
mReply := Resp{
|
||||
@ -108,7 +108,7 @@ func (ws *WServer) getSeqResp(conn *UserConn, m *Req, pb *sdk_ws.GetMaxAndMinSeq
|
||||
}
|
||||
|
||||
func (ws *WServer) pullMsgBySeqListReq(conn *UserConn, m *Req) {
|
||||
log.NewInfo(m.OperationID, "Ws call success to pullMsgBySeqListReq start", m.SendID, m.ReqIdentifier, m.MsgIncr)
|
||||
log.NewInfo(m.OperationID, "Ws call success to pullMsgBySeqListReq start", m.SendID, m.ReqIdentifier, m.MsgIncr, string(m.Data))
|
||||
nReply := new(sdk_ws.PullMessageBySeqListResp)
|
||||
isPass, errCode, errMsg, data := ws.argsValidate(m, constant.WSPullMsgBySeqList)
|
||||
if isPass {
|
||||
@ -117,7 +117,7 @@ func (ws *WServer) pullMsgBySeqListReq(conn *UserConn, m *Req) {
|
||||
rpcReq.UserID = m.SendID
|
||||
rpcReq.OperationID = m.OperationID
|
||||
rpcReq.GroupSeqList = data.(sdk_ws.PullMessageBySeqListReq).GroupSeqList
|
||||
log.NewInfo(m.OperationID, "Ws call success to pullMsgBySeqListReq middle", m.SendID, m.ReqIdentifier, m.MsgIncr, data.(sdk_ws.PullMessageBySeqListReq).SeqList, data.(sdk_ws.PullMessageBySeqListReq).GroupSeqList)
|
||||
log.NewInfo(m.OperationID, "Ws call success to pullMsgBySeqListReq middle", m.SendID, m.ReqIdentifier, m.MsgIncr, data.(sdk_ws.PullMessageBySeqListReq).SeqList)
|
||||
grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName)
|
||||
msgClient := pbChat.NewChatClient(grpcConn)
|
||||
reply, err := msgClient.PullMessageBySeqList(context.Background(), &rpcReq)
|
||||
@ -157,7 +157,6 @@ func (ws *WServer) sendMsgReq(conn *UserConn, m *Req) {
|
||||
sendMsgAllCountLock.Lock()
|
||||
sendMsgAllCount++
|
||||
sendMsgAllCountLock.Unlock()
|
||||
//stat.GaugeVecApiMethod.WithLabelValues("ws_send_message_count").Inc()
|
||||
log.NewInfo(m.OperationID, "Ws call success to sendMsgReq start", m.MsgIncr, m.ReqIdentifier, m.SendID, m.Data)
|
||||
|
||||
nReply := new(pbChat.SendMsgResp)
|
||||
@ -248,14 +247,8 @@ func (ws *WServer) sendSignalMsgReq(conn *UserConn, m *Req) {
|
||||
nReply.ErrMsg = err.Error()
|
||||
ws.sendSignalMsgResp(conn, 200, err.Error(), m, &signalResp)
|
||||
} else {
|
||||
log.NewInfo(pbData.OperationID, "rpc call success to sendMsgReq", reply.String())
|
||||
// save invitation info for offline push
|
||||
if err := db.DB.NewCacheSignalInfo(pbData.MsgData); err != nil {
|
||||
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), m, &signalResp)
|
||||
ws.sendSignalMsgResp(conn, 200, err.Error(), m, &signalResp)
|
||||
} else {
|
||||
ws.sendSignalMsgResp(conn, 0, "", m, &signalResp)
|
||||
}
|
||||
log.NewInfo(pbData.OperationID, "rpc call success to sendMsgReq", reply.String(), signalResp.String(), m)
|
||||
ws.sendSignalMsgResp(conn, 0, "", m, &signalResp)
|
||||
}
|
||||
} else {
|
||||
log.NewError(m.OperationID, utils.GetSelfFuncName(), respPb.IsPass, respPb.CommonResp.ErrCode, respPb.CommonResp.ErrMsg)
|
||||
|
@ -89,11 +89,25 @@ func (ws *WServer) readMsg(conn *UserConn) {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (ws *WServer) SetWriteTimeout(conn *UserConn, timeout int) {
|
||||
conn.w.Lock()
|
||||
defer conn.w.Unlock()
|
||||
conn.SetWriteDeadline(time.Now().Add(time.Duration(timeout) * time.Second))
|
||||
}
|
||||
|
||||
func (ws *WServer) writeMsg(conn *UserConn, a int, msg []byte) error {
|
||||
conn.w.Lock()
|
||||
defer conn.w.Unlock()
|
||||
conn.SetWriteDeadline(time.Now().Add(time.Duration(60) * time.Second))
|
||||
return conn.WriteMessage(a, msg)
|
||||
}
|
||||
|
||||
func (ws *WServer) SetWriteTimeoutWriteMsg(conn *UserConn, a int, msg []byte, timeout int) error {
|
||||
conn.w.Lock()
|
||||
defer conn.w.Unlock()
|
||||
conn.SetWriteDeadline(time.Now().Add(time.Duration(timeout) * time.Second))
|
||||
return conn.WriteMessage(a, msg)
|
||||
}
|
||||
func (ws *WServer) MultiTerminalLoginChecker(uid string, platformID int, newConn *UserConn, token string, operationID string) {
|
||||
switch config.Config.MultiLoginPolicy {
|
||||
@ -104,7 +118,7 @@ func (ws *WServer) MultiTerminalLoginChecker(uid string, platformID int, newConn
|
||||
ws.sendKickMsg(oldConn, newConn)
|
||||
m, err := db.DB.GetTokenMapByUidPid(uid, constant.PlatformIDToName(platformID))
|
||||
if err != nil && err != redis.ErrNil {
|
||||
log.NewError(operationID, "get token from redis err", err.Error())
|
||||
log.NewError(operationID, "get token from redis err", err.Error(), uid)
|
||||
return
|
||||
}
|
||||
if m == nil {
|
||||
@ -160,7 +174,7 @@ func (ws *WServer) sendKickMsg(oldConn, newConn *UserConn) {
|
||||
}
|
||||
err = ws.writeMsg(oldConn, websocket.BinaryMessage, b.Bytes())
|
||||
if err != nil {
|
||||
log.NewError(mReply.OperationID, mReply.ReqIdentifier, mReply.ErrCode, mReply.ErrMsg, "WS WriteMsg error", oldConn.RemoteAddr().String(), newConn.RemoteAddr().String(), err.Error())
|
||||
log.NewError(mReply.OperationID, mReply.ReqIdentifier, mReply.ErrCode, mReply.ErrMsg, "sendKickMsg WS WriteMsg error", oldConn.RemoteAddr().String(), newConn.RemoteAddr().String(), err.Error())
|
||||
}
|
||||
}
|
||||
func (ws *WServer) addUserConn(uid string, platformID int, conn *UserConn, token string) {
|
||||
|
@ -460,7 +460,9 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS
|
||||
//och.chArrays[channelID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: []*pbMsg.MsgDataToMQ{&msgFromMQ}, triggerID: msgFromMQ.OperationID}}
|
||||
//sess.MarkMessage(msg, "")
|
||||
rwLock.Lock()
|
||||
cMsg = append(cMsg, msg)
|
||||
if len(msg.Value) != 0 {
|
||||
cMsg = append(cMsg, msg)
|
||||
}
|
||||
rwLock.Unlock()
|
||||
sess.MarkMessage(msg, "")
|
||||
//och.TriggerCmd(OnlineTopicBusy)
|
||||
@ -539,7 +541,7 @@ func sendMessageToPush(message *pbMsg.MsgDataToMQ, pushToUserID string) {
|
||||
grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImPushName)
|
||||
if grpcConn == nil {
|
||||
log.Error(rpcPushMsg.OperationID, "rpc dial failed", "push data", rpcPushMsg.String())
|
||||
pid, offset, err := producer.SendMessage(&mqPushMsg)
|
||||
pid, offset, err := producer.SendMessage(&mqPushMsg, mqPushMsg.PushToUserID, rpcPushMsg.OperationID)
|
||||
if err != nil {
|
||||
log.Error(mqPushMsg.OperationID, "kafka send failed", "send data", message.String(), "pid", pid, "offset", offset, "err", err.Error())
|
||||
}
|
||||
@ -549,7 +551,7 @@ func sendMessageToPush(message *pbMsg.MsgDataToMQ, pushToUserID string) {
|
||||
_, err := msgClient.PushMsg(context.Background(), &rpcPushMsg)
|
||||
if err != nil {
|
||||
log.Error(rpcPushMsg.OperationID, "rpc send failed", rpcPushMsg.OperationID, "push data", rpcPushMsg.String(), "err", err.Error())
|
||||
pid, offset, err := producer.SendMessage(&mqPushMsg)
|
||||
pid, offset, err := producer.SendMessage(&mqPushMsg, mqPushMsg.PushToUserID, rpcPushMsg.OperationID)
|
||||
if err != nil {
|
||||
log.Error(message.OperationID, "kafka send failed", mqPushMsg.OperationID, "send data", mqPushMsg.String(), "pid", pid, "offset", offset, "err", err.Error())
|
||||
}
|
||||
|
@ -34,7 +34,7 @@ func (pc *PersistentConsumerHandler) Init() {
|
||||
|
||||
func (pc *PersistentConsumerHandler) handleChatWs2Mysql(cMsg *sarama.ConsumerMessage, msgKey string, _ sarama.ConsumerGroupSession) {
|
||||
msg := cMsg.Value
|
||||
log.NewInfo("msg come here mysql!!!", "", "msg", string(msg))
|
||||
log.NewInfo("msg come here mysql!!!", "", "msg", string(msg), msgKey)
|
||||
var tag bool
|
||||
msgFromMQ := pbMsg.MsgDataToMQ{}
|
||||
err := proto.Unmarshal(msg, &msgFromMQ)
|
||||
@ -42,6 +42,7 @@ func (pc *PersistentConsumerHandler) handleChatWs2Mysql(cMsg *sarama.ConsumerMes
|
||||
log.NewError(msgFromMQ.OperationID, "msg_transfer Unmarshal msg err", "msg", string(msg), "err", err.Error())
|
||||
return
|
||||
}
|
||||
log.Debug(msgFromMQ.OperationID, "proto.Unmarshal MsgDataToMQ", msgFromMQ.String())
|
||||
//Control whether to store history messages (mysql)
|
||||
isPersist := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsPersistent)
|
||||
//Only process receiver data
|
||||
@ -73,8 +74,12 @@ func (PersistentConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error {
|
||||
func (pc *PersistentConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
|
||||
claim sarama.ConsumerGroupClaim) error {
|
||||
for msg := range claim.Messages() {
|
||||
log.NewDebug("", "kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value))
|
||||
pc.msgHandle[msg.Topic](msg, string(msg.Key), sess)
|
||||
log.NewDebug("", "kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key))
|
||||
if len(msg.Value) != 0 {
|
||||
pc.msgHandle[msg.Topic](msg, string(msg.Key), sess)
|
||||
} else {
|
||||
log.Error("", "msg get from kafka but is nil", msg.Key)
|
||||
}
|
||||
sess.MarkMessage(msg, "")
|
||||
}
|
||||
return nil
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
"Open_IM/internal/push"
|
||||
"Open_IM/pkg/common/config"
|
||||
"Open_IM/pkg/common/constant"
|
||||
"Open_IM/pkg/common/db"
|
||||
"Open_IM/pkg/common/log"
|
||||
"Open_IM/pkg/grpc-etcdv3/getcdv3"
|
||||
pbCache "Open_IM/pkg/proto/cache"
|
||||
@ -62,11 +63,15 @@ func MsgToUser(pushMsg *pbPush.PushMsgReq) {
|
||||
log.NewInfo(pushMsg.OperationID, "push_result", wsResult, "sendData", pushMsg.MsgData)
|
||||
successCount++
|
||||
if isOfflinePush && pushMsg.PushToUserID != pushMsg.MsgData.SendID {
|
||||
// save invitation info for offline push
|
||||
for _, v := range wsResult {
|
||||
if v.OnlinePush {
|
||||
return
|
||||
}
|
||||
}
|
||||
if err := db.DB.HandleSignalInfo(pushMsg.OperationID, pushMsg.MsgData); err != nil {
|
||||
log.NewError(pushMsg.OperationID, utils.GetSelfFuncName(), err.Error(), pushMsg.MsgData)
|
||||
}
|
||||
//Use offline push messaging
|
||||
var UIDList []string
|
||||
UIDList = append(UIDList, pushMsg.PushToUserID)
|
||||
@ -246,9 +251,7 @@ func MsgToSuperGroupUser(pushMsg *pbPush.PushMsgReq) {
|
||||
} else {
|
||||
log.NewDebug(pushMsg.OperationID, "offline push return result is ", pushResult, pushMsg.MsgData)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func GetOfflinePushOpts(pushMsg *pbPush.PushMsgReq) (opts push.PushOpts, err error) {
|
||||
@ -267,3 +270,45 @@ func GetOfflinePushOpts(pushMsg *pbPush.PushMsgReq) (opts push.PushOpts, err err
|
||||
}
|
||||
return opts, nil
|
||||
}
|
||||
|
||||
//func SendMsgByWS(m *pbChat.WSToMsgSvrChatMsg) {
|
||||
// m.MsgID = rpcChat.GetMsgID(m.SendID)
|
||||
// m.ClientMsgID = m.MsgID
|
||||
// switch m.SessionType {
|
||||
// case constant.SingleChatType:
|
||||
// sendMsgToKafka(m, m.SendID, "msgKey--sendID")
|
||||
// sendMsgToKafka(m, m.RecvID, "msgKey--recvID")
|
||||
// case constant.GroupChatType:
|
||||
// etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImGroupName)
|
||||
// client := pbGroup.NewGroupClient(etcdConn)
|
||||
// req := &pbGroup.Req{
|
||||
// GroupID: m.RecvID,
|
||||
// Token: config.Config.Secret,
|
||||
// OperationID: m.OperationID,
|
||||
// }
|
||||
// reply, err := client.(context.Background(), req)
|
||||
// if err != nil {
|
||||
// log.Error(m.Token, m.OperationID, "rpc getGroupInfo failed, err = %s", err.Error())
|
||||
// return
|
||||
// }
|
||||
// if reply.ErrorCode != 0 {
|
||||
// log.Error(m.Token, m.OperationID, "rpc getGroupInfo failed, err = %s", reply.ErrorMsg)
|
||||
// return
|
||||
// }
|
||||
// groupID := m.RecvID
|
||||
// for i, v := range reply.MemberList {
|
||||
// m.RecvID = v.UserId + " " + groupID
|
||||
// sendMsgToKafka(m, utils.IntToString(i), "msgKey--recvID+\" \"+groupID")
|
||||
// }
|
||||
// default:
|
||||
//
|
||||
// }
|
||||
//}
|
||||
//
|
||||
//func sendMsgToKafka(m *pbChat.WSToMsgSvrChatMsg, key string, flag string) {
|
||||
// pid, offset, err := producer.SendMessage(m, key)
|
||||
// if err != nil {
|
||||
// log.ErrorByKv("kafka send failed", m.OperationID, "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), flag, key)
|
||||
// }
|
||||
//
|
||||
//}
|
||||
|
@ -113,7 +113,6 @@ func userRelationshipVerification(data *pbChat.SendMsgReq) (bool, int32, string)
|
||||
} else {
|
||||
return true, 0, ""
|
||||
}
|
||||
|
||||
}
|
||||
func (rpc *rpcChat) encapsulateMsgData(msg *sdk_ws.MsgData) {
|
||||
msg.ServerMsgID = GetMsgID(msg.SendID)
|
||||
|
@ -75,6 +75,9 @@ type GetRTCInvitationInfoResp struct {
|
||||
Timeout int32 `json:"timeout"`
|
||||
MediaType string `json:"mediaType"`
|
||||
SessionType int32 `json:"sessionType"`
|
||||
InitiateTime int32 `json:"initiateTime"`
|
||||
PlatformID int32 `json:"platformID"`
|
||||
CustomData string `json:"customData"`
|
||||
} `json:"invitation"`
|
||||
OfflinePushInfo struct{} `json:"offlinePushInfo"`
|
||||
} `json:"data"`
|
||||
|
@ -230,7 +230,7 @@ var ContentType2PushContent = map[int64]string{
|
||||
AtText: "[有人@你]",
|
||||
GroupMsg: "你收到一条群聊消息",
|
||||
Common: "你收到一条新消息",
|
||||
SignalMsg: "音視頻通話邀請",
|
||||
SignalMsg: "音视频通话邀请",
|
||||
}
|
||||
|
||||
const (
|
||||
|
@ -95,42 +95,50 @@ func (d *DataBases) CleanUpOneUserAllMsgFromRedis(userID string, operationID str
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *DataBases) NewCacheSignalInfo(msg *pbCommon.MsgData) error {
|
||||
func (d *DataBases) HandleSignalInfo(operationID string, msg *pbCommon.MsgData) error {
|
||||
req := &pbRtc.SignalReq{}
|
||||
if err := proto.Unmarshal(msg.Content, req); err != nil {
|
||||
return err
|
||||
}
|
||||
//log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "SignalReq: ", req.String())
|
||||
var inviteeUserIDList []string
|
||||
switch invitationInfo := req.Payload.(type) {
|
||||
var isInviteSignal bool
|
||||
switch signalInfo := req.Payload.(type) {
|
||||
case *pbRtc.SignalReq_Invite:
|
||||
inviteeUserIDList = invitationInfo.Invite.Invitation.InviteeUserIDList
|
||||
inviteeUserIDList = signalInfo.Invite.Invitation.InviteeUserIDList
|
||||
isInviteSignal = true
|
||||
case *pbRtc.SignalReq_InviteInGroup:
|
||||
inviteeUserIDList = invitationInfo.InviteInGroup.Invitation.InviteeUserIDList
|
||||
inviteeUserIDList = signalInfo.InviteInGroup.Invitation.InviteeUserIDList
|
||||
isInviteSignal = true
|
||||
case *pbRtc.SignalReq_HungUp, *pbRtc.SignalReq_Cancel, *pbRtc.SignalReq_Reject, *pbRtc.SignalReq_Accept:
|
||||
return errors.New("signalInfo do not need offlinePush")
|
||||
default:
|
||||
log2.NewDebug("", utils.GetSelfFuncName(), "req type not invite", string(msg.Content))
|
||||
log2.NewDebug(operationID, utils.GetSelfFuncName(), "req invalid type", string(msg.Content))
|
||||
return nil
|
||||
}
|
||||
for _, userID := range inviteeUserIDList {
|
||||
timeout, err := strconv.Atoi(config.Config.Rtc.SignalTimeout)
|
||||
if err != nil {
|
||||
return err
|
||||
if isInviteSignal {
|
||||
log2.NewInfo(operationID, utils.GetSelfFuncName(), "invite userID list:", inviteeUserIDList)
|
||||
for _, userID := range inviteeUserIDList {
|
||||
log2.NewInfo(operationID, utils.GetSelfFuncName(), "invite userID:", userID)
|
||||
timeout, err := strconv.Atoi(config.Config.Rtc.SignalTimeout)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
keyList := SignalListCache + userID
|
||||
err = d.rdb.LPush(context.Background(), keyList, msg.ClientMsgID).Err()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = d.rdb.Expire(context.Background(), keyList, time.Duration(timeout)*time.Second).Err()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
key := SignalCache + msg.ClientMsgID
|
||||
err = d.rdb.Set(context.Background(), key, msg.Content, time.Duration(timeout)*time.Second).Err()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
keyList := SignalListCache + userID
|
||||
err = d.rdb.LPush(context.Background(), keyList, msg.ClientMsgID).Err()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = d.rdb.Expire(context.Background(), keyList, time.Duration(timeout)*time.Second).Err()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
key := SignalCache + msg.ClientMsgID
|
||||
err = d.rdb.Set(context.Background(), key, msg.Content, time.Duration(timeout)*time.Second).Err()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -149,15 +157,17 @@ func (d *DataBases) GetSignalInfoFromCacheByClientMsgID(clientMsgID string) (inv
|
||||
switch req2 := req.Payload.(type) {
|
||||
case *pbRtc.SignalReq_Invite:
|
||||
invitationInfo.Invitation = req2.Invite.Invitation
|
||||
invitationInfo.OpUserID = req2.Invite.OpUserID
|
||||
case *pbRtc.SignalReq_InviteInGroup:
|
||||
invitationInfo.Invitation = req2.InviteInGroup.Invitation
|
||||
invitationInfo.OpUserID = req2.InviteInGroup.OpUserID
|
||||
}
|
||||
return invitationInfo, err
|
||||
}
|
||||
|
||||
func (d *DataBases) GetAvailableSignalInvitationInfo(userID string) (invitationInfo *pbRtc.SignalInviteReq, err error) {
|
||||
keyList := SignalListCache + userID
|
||||
result := d.rdb.RPop(context.Background(), keyList)
|
||||
result := d.rdb.LPop(context.Background(), keyList)
|
||||
if err = result.Err(); err != nil {
|
||||
return nil, utils.Wrap(err, "GetAvailableSignalInvitationInfo failed")
|
||||
}
|
||||
@ -170,14 +180,14 @@ func (d *DataBases) GetAvailableSignalInvitationInfo(userID string) (invitationI
|
||||
if err != nil {
|
||||
return nil, utils.Wrap(err, "GetSignalInfoFromCacheByClientMsgID")
|
||||
}
|
||||
err = d.delUserSingalList(userID)
|
||||
err = d.DelUserSignalList(userID)
|
||||
if err != nil {
|
||||
return nil, utils.Wrap(err, "GetSignalInfoFromCacheByClientMsgID")
|
||||
}
|
||||
return invitationInfo, nil
|
||||
}
|
||||
|
||||
func (d *DataBases) delUserSingalList(userID string) error {
|
||||
func (d *DataBases) DelUserSignalList(userID string) error {
|
||||
keyList := SignalListCache + userID
|
||||
err := d.rdb.Del(context.Background(), keyList).Err()
|
||||
return err
|
||||
|
@ -2,6 +2,7 @@ package kafka
|
||||
|
||||
import (
|
||||
log2 "Open_IM/pkg/common/log"
|
||||
"errors"
|
||||
"github.com/Shopify/sarama"
|
||||
"github.com/golang/protobuf/proto"
|
||||
)
|
||||
@ -32,18 +33,22 @@ func NewKafkaProducer(addr []string, topic string) *Producer {
|
||||
return &p
|
||||
}
|
||||
|
||||
func (p *Producer) SendMessage(m proto.Message, key ...string) (int32, int64, error) {
|
||||
func (p *Producer) SendMessage(m proto.Message, key string, operationID string) (int32, int64, error) {
|
||||
log2.Info(operationID, "SendMessage", "key ", key, m.String(), p.producer)
|
||||
kMsg := &sarama.ProducerMessage{}
|
||||
kMsg.Topic = p.topic
|
||||
if len(key) == 1 {
|
||||
kMsg.Key = sarama.StringEncoder(key[0])
|
||||
}
|
||||
kMsg.Key = sarama.StringEncoder(key)
|
||||
bMsg, err := proto.Marshal(m)
|
||||
if err != nil {
|
||||
log2.Error("", "", "proto marshal err = %s", err.Error())
|
||||
log2.Error(operationID, "", "proto marshal err = %s", err.Error())
|
||||
return -1, -1, err
|
||||
}
|
||||
if len(bMsg) == 0 {
|
||||
return 0, 0, errors.New("msg content is nil")
|
||||
}
|
||||
kMsg.Value = sarama.ByteEncoder(bMsg)
|
||||
|
||||
return p.producer.SendMessage(kMsg)
|
||||
log2.Info(operationID, "ByteEncoder SendMessage begin", "key ", kMsg, p.producer)
|
||||
a, b, c := p.producer.SendMessage(kMsg)
|
||||
log2.Info(operationID, "ByteEncoder SendMessage end", "key ", kMsg, p.producer)
|
||||
return a, b, c
|
||||
}
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -122,8 +122,9 @@ message InvitationInfo {
|
||||
string roomID = 5;
|
||||
int32 timeout = 6;
|
||||
string mediaType = 7;
|
||||
int32 platformID = 8;
|
||||
int32 sessionType = 9;
|
||||
int32 platformID = 8;
|
||||
int32 sessionType = 9;
|
||||
int32 initiateTime = 10;
|
||||
}
|
||||
|
||||
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -519,8 +519,9 @@ message InvitationInfo {
|
||||
string roomID = 5;
|
||||
int32 timeout = 6;
|
||||
string mediaType = 7;
|
||||
int32 platformID = 8;
|
||||
int32 sessionType = 9;
|
||||
int32 platformID = 8;
|
||||
int32 sessionType = 9;
|
||||
int32 initiateTime = 10;
|
||||
}
|
||||
|
||||
message ParticipantMetaData{
|
||||
|
Loading…
x
Reference in New Issue
Block a user