mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-27 03:58:55 +08:00
send message by message receive opt
This commit is contained in:
parent
cd14f3e649
commit
33b9685985
@ -5,6 +5,7 @@ import (
|
|||||||
"Open_IM/internal/push/content_struct"
|
"Open_IM/internal/push/content_struct"
|
||||||
"Open_IM/pkg/common/config"
|
"Open_IM/pkg/common/config"
|
||||||
"Open_IM/pkg/common/constant"
|
"Open_IM/pkg/common/constant"
|
||||||
|
"Open_IM/pkg/common/db"
|
||||||
http2 "Open_IM/pkg/common/http"
|
http2 "Open_IM/pkg/common/http"
|
||||||
"Open_IM/pkg/common/log"
|
"Open_IM/pkg/common/log"
|
||||||
"Open_IM/pkg/grpc-etcdv3/getcdv3"
|
"Open_IM/pkg/grpc-etcdv3/getcdv3"
|
||||||
@ -105,9 +106,17 @@ func (rpc *rpcChat) UserSendMsg(_ context.Context, pb *pbChat.UserSendMsgReq) (*
|
|||||||
}
|
}
|
||||||
switch pbData.SessionType {
|
switch pbData.SessionType {
|
||||||
case constant.SingleChatType:
|
case constant.SingleChatType:
|
||||||
|
isSend := modifyMessageByUserMessageReceiveOpt(pbData.RecvID, pbData.SendID, constant.SingleChatType, &pbData)
|
||||||
|
if isSend {
|
||||||
err1 := rpc.sendMsgToKafka(&pbData, pbData.RecvID)
|
err1 := rpc.sendMsgToKafka(&pbData, pbData.RecvID)
|
||||||
|
if err1 != nil {
|
||||||
|
log.NewError(pbData.OperationID, "kafka send msg err:RecvID", pbData.RecvID, pbData.String())
|
||||||
|
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
|
||||||
|
}
|
||||||
|
}
|
||||||
err2 := rpc.sendMsgToKafka(&pbData, pbData.SendID)
|
err2 := rpc.sendMsgToKafka(&pbData, pbData.SendID)
|
||||||
if err1 != nil || err2 != nil {
|
if err2 != nil {
|
||||||
|
log.NewError(pbData.OperationID, "kafka send msg err:SendID", pbData.SendID, pbData.String())
|
||||||
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
|
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
|
||||||
}
|
}
|
||||||
return returnMsg(&replay, pb, 0, "", serverMsgID, pbData.SendTime)
|
return returnMsg(&replay, pb, 0, "", serverMsgID, pbData.SendTime)
|
||||||
@ -154,18 +163,27 @@ func (rpc *rpcChat) UserSendMsg(_ context.Context, pb *pbChat.UserSendMsgReq) (*
|
|||||||
groupID := pbData.RecvID
|
groupID := pbData.RecvID
|
||||||
for i, v := range reply.MemberList {
|
for i, v := range reply.MemberList {
|
||||||
pbData.RecvID = v.UserId + " " + groupID
|
pbData.RecvID = v.UserId + " " + groupID
|
||||||
|
isSend := modifyMessageByUserMessageReceiveOpt(v.UserId, groupID, constant.GroupChatType, &pbData)
|
||||||
|
if isSend {
|
||||||
err := rpc.sendMsgToKafka(&pbData, utils.IntToString(i))
|
err := rpc.sendMsgToKafka(&pbData, utils.IntToString(i))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.NewError(pbData.OperationID, "kafka send msg err:UserId", v.UserId, pbData.String())
|
||||||
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
|
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
}
|
||||||
for i, v := range addUidList {
|
for i, v := range addUidList {
|
||||||
pbData.RecvID = v + " " + groupID
|
pbData.RecvID = v + " " + groupID
|
||||||
|
isSend := modifyMessageByUserMessageReceiveOpt(v, groupID, constant.GroupChatType, &pbData)
|
||||||
|
if isSend {
|
||||||
err := rpc.sendMsgToKafka(&pbData, utils.IntToString(i+1))
|
err := rpc.sendMsgToKafka(&pbData, utils.IntToString(i+1))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.NewError(pbData.OperationID, "kafka send msg err:UserId", v, pbData.String())
|
||||||
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
|
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return returnMsg(&replay, pb, 0, "", serverMsgID, pbData.SendTime)
|
return returnMsg(&replay, pb, 0, "", serverMsgID, pbData.SendTime)
|
||||||
default:
|
default:
|
||||||
return returnMsg(&replay, pb, 203, "unkonwn sessionType", "", 0)
|
return returnMsg(&replay, pb, 203, "unkonwn sessionType", "", 0)
|
||||||
@ -193,3 +211,25 @@ func returnMsg(replay *pbChat.UserSendMsgResp, pb *pbChat.UserSendMsgReq, errCod
|
|||||||
replay.SendTime = sendTime
|
replay.SendTime = sendTime
|
||||||
return replay, nil
|
return replay, nil
|
||||||
}
|
}
|
||||||
|
func modifyMessageByUserMessageReceiveOpt(userID, sourceID string, sessionType int, msg *pbChat.WSToMsgSvrChatMsg) bool {
|
||||||
|
conversationID := utils.GetConversationIDBySessionType(sourceID, sessionType)
|
||||||
|
opt, err := db.DB.GetConversationMsgOpt(userID, conversationID)
|
||||||
|
if err != nil {
|
||||||
|
log.NewError(msg.OperationID, "GetConversationMsgOpt from redis err", msg.String())
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
switch opt {
|
||||||
|
case constant.ReceiveMessage:
|
||||||
|
return true
|
||||||
|
case constant.NotReceiveMessage:
|
||||||
|
return false
|
||||||
|
case constant.ReceiveNotNotifyMessage:
|
||||||
|
m := utils.JsonStringToMap(msg.OfflineInfo)
|
||||||
|
utils.SetSwitchFromOptions(m, "offlinePush", 0)
|
||||||
|
s := utils.MapToJsonString(m)
|
||||||
|
msg.OfflineInfo = s
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
@ -116,8 +116,11 @@ func JsonStringToMap(str string) (tempMap map[string]interface{}) {
|
|||||||
return tempMap
|
return tempMap
|
||||||
}
|
}
|
||||||
func GetSwitchFromOptions(Options map[string]interface{}, key string) (result bool) {
|
func GetSwitchFromOptions(Options map[string]interface{}, key string) (result bool) {
|
||||||
if flag, ok := Options[key]; !ok || flag == 1 {
|
if flag, ok := Options[key]; !ok || flag.(int) == 1 {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
func SetSwitchFromOptions(Options map[string]interface{}, key string, value interface{}) {
|
||||||
|
Options[key] = value
|
||||||
|
}
|
||||||
|
@ -7,6 +7,7 @@
|
|||||||
package utils
|
package utils
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"Open_IM/pkg/common/constant"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"strconv"
|
"strconv"
|
||||||
@ -63,6 +64,15 @@ func GetMsgID(sendID string) string {
|
|||||||
t := int64ToString(GetCurrentTimestampByNano())
|
t := int64ToString(GetCurrentTimestampByNano())
|
||||||
return Md5(t + sendID + int64ToString(rand.Int63n(GetCurrentTimestampByNano())))
|
return Md5(t + sendID + int64ToString(rand.Int63n(GetCurrentTimestampByNano())))
|
||||||
}
|
}
|
||||||
|
func GetConversationIDBySessionType(sourceID string, sessionType int) string {
|
||||||
|
switch sessionType {
|
||||||
|
case constant.SingleChatType:
|
||||||
|
return "single_" + sourceID
|
||||||
|
case constant.GroupChatType:
|
||||||
|
return "group_" + sourceID
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
func int64ToString(i int64) string {
|
func int64ToString(i int64) string {
|
||||||
return strconv.FormatInt(i, 10)
|
return strconv.FormatInt(i, 10)
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user