mirror of https://github.com/matrix-org/dendrite
You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1474 lines
49 KiB
Go
1474 lines
49 KiB
Go
package syncapi
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"reflect"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/gorilla/mux"
|
|
"github.com/matrix-org/dendrite/internal/caching"
|
|
"github.com/matrix-org/dendrite/internal/httputil"
|
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
|
"github.com/matrix-org/dendrite/setup/config"
|
|
"github.com/matrix-org/gomatrixserverlib"
|
|
"github.com/matrix-org/gomatrixserverlib/spec"
|
|
"github.com/nats-io/nats.go"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/tidwall/gjson"
|
|
|
|
rstypes "github.com/matrix-org/dendrite/roomserver/types"
|
|
"github.com/matrix-org/dendrite/syncapi/routing"
|
|
"github.com/matrix-org/dendrite/syncapi/storage"
|
|
"github.com/matrix-org/dendrite/syncapi/synctypes"
|
|
|
|
"github.com/matrix-org/dendrite/clientapi/producers"
|
|
"github.com/matrix-org/dendrite/roomserver"
|
|
"github.com/matrix-org/dendrite/roomserver/api"
|
|
rsapi "github.com/matrix-org/dendrite/roomserver/api"
|
|
"github.com/matrix-org/dendrite/setup/jetstream"
|
|
"github.com/matrix-org/dendrite/syncapi/types"
|
|
"github.com/matrix-org/dendrite/test"
|
|
"github.com/matrix-org/dendrite/test/testrig"
|
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
|
)
|
|
|
|
type syncRoomserverAPI struct {
|
|
rsapi.SyncRoomserverAPI
|
|
rooms []*test.Room
|
|
}
|
|
|
|
func (s *syncRoomserverAPI) QueryUserIDForSender(ctx context.Context, roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
|
|
return spec.NewUserID(string(senderID), true)
|
|
}
|
|
|
|
func (s *syncRoomserverAPI) QuerySenderIDForUser(ctx context.Context, roomID spec.RoomID, userID spec.UserID) (*spec.SenderID, error) {
|
|
senderID := spec.SenderID(userID.String())
|
|
return &senderID, nil
|
|
}
|
|
|
|
func (s *syncRoomserverAPI) QueryLatestEventsAndState(ctx context.Context, req *rsapi.QueryLatestEventsAndStateRequest, res *rsapi.QueryLatestEventsAndStateResponse) error {
|
|
var room *test.Room
|
|
for _, r := range s.rooms {
|
|
if r.ID == req.RoomID {
|
|
room = r
|
|
break
|
|
}
|
|
}
|
|
if room == nil {
|
|
res.RoomExists = false
|
|
return nil
|
|
}
|
|
res.RoomVersion = room.Version
|
|
return nil // TODO: return state
|
|
}
|
|
|
|
func (s *syncRoomserverAPI) QuerySharedUsers(ctx context.Context, req *rsapi.QuerySharedUsersRequest, res *rsapi.QuerySharedUsersResponse) error {
|
|
res.UserIDsToCount = make(map[string]int)
|
|
return nil
|
|
}
|
|
func (s *syncRoomserverAPI) QueryBulkStateContent(ctx context.Context, req *rsapi.QueryBulkStateContentRequest, res *rsapi.QueryBulkStateContentResponse) error {
|
|
return nil
|
|
}
|
|
|
|
func (s *syncRoomserverAPI) QueryMembershipForUser(ctx context.Context, req *rsapi.QueryMembershipForUserRequest, res *rsapi.QueryMembershipForUserResponse) error {
|
|
res.IsRoomForgotten = false
|
|
res.RoomExists = true
|
|
return nil
|
|
}
|
|
|
|
func (s *syncRoomserverAPI) QueryMembershipAtEvent(
|
|
ctx context.Context,
|
|
roomID spec.RoomID,
|
|
eventIDs []string,
|
|
senderID spec.SenderID,
|
|
) (map[string]*rstypes.HeaderedEvent, error) {
|
|
return map[string]*rstypes.HeaderedEvent{}, nil
|
|
}
|
|
|
|
type syncUserAPI struct {
|
|
userapi.SyncUserAPI
|
|
accounts []userapi.Device
|
|
}
|
|
|
|
func (s *syncUserAPI) QueryAccessToken(ctx context.Context, req *userapi.QueryAccessTokenRequest, res *userapi.QueryAccessTokenResponse) error {
|
|
for _, acc := range s.accounts {
|
|
if acc.AccessToken == req.AccessToken {
|
|
res.Device = &acc
|
|
return nil
|
|
}
|
|
}
|
|
res.Err = "unknown user"
|
|
return nil
|
|
}
|
|
|
|
func (s *syncUserAPI) QueryKeyChanges(ctx context.Context, req *userapi.QueryKeyChangesRequest, res *userapi.QueryKeyChangesResponse) error {
|
|
return nil
|
|
}
|
|
|
|
func (s *syncUserAPI) QueryOneTimeKeys(ctx context.Context, req *userapi.QueryOneTimeKeysRequest, res *userapi.QueryOneTimeKeysResponse) error {
|
|
return nil
|
|
}
|
|
|
|
func (s *syncUserAPI) PerformLastSeenUpdate(ctx context.Context, req *userapi.PerformLastSeenUpdateRequest, res *userapi.PerformLastSeenUpdateResponse) error {
|
|
return nil
|
|
}
|
|
|
|
func TestSyncAPIAccessTokens(t *testing.T) {
|
|
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
|
testSyncAccessTokens(t, dbType)
|
|
})
|
|
}
|
|
|
|
func testSyncAccessTokens(t *testing.T, dbType test.DBType) {
|
|
user := test.NewUser(t)
|
|
room := test.NewRoom(t, user)
|
|
alice := userapi.Device{
|
|
ID: "ALICEID",
|
|
UserID: user.ID,
|
|
AccessToken: "ALICE_BEARER_TOKEN",
|
|
DisplayName: "Alice",
|
|
AccountType: userapi.AccountTypeUser,
|
|
}
|
|
|
|
cfg, processCtx, close := testrig.CreateConfig(t, dbType)
|
|
routers := httputil.NewRouters()
|
|
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
|
|
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
|
|
natsInstance := jetstream.NATSInstance{}
|
|
defer close()
|
|
|
|
jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
|
|
defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream)
|
|
msgs := toNATSMsgs(t, cfg, room.Events()...)
|
|
AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches, caching.DisableMetrics)
|
|
testrig.MustPublishMsgs(t, jsctx, msgs...)
|
|
|
|
testCases := []struct {
|
|
name string
|
|
req *http.Request
|
|
wantCode int
|
|
wantJoinedRooms []string
|
|
}{
|
|
{
|
|
name: "missing access token",
|
|
req: test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
|
|
"timeout": "0",
|
|
})),
|
|
wantCode: 401,
|
|
},
|
|
{
|
|
name: "unknown access token",
|
|
req: test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
|
|
"access_token": "foo",
|
|
"timeout": "0",
|
|
})),
|
|
wantCode: 401,
|
|
},
|
|
{
|
|
name: "valid access token",
|
|
req: test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
|
|
"access_token": alice.AccessToken,
|
|
"timeout": "0",
|
|
})),
|
|
wantCode: 200,
|
|
wantJoinedRooms: []string{room.ID},
|
|
},
|
|
}
|
|
|
|
syncUntil(t, routers, alice.AccessToken, false, func(syncBody string) bool {
|
|
// wait for the last sent eventID to come down sync
|
|
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(event_id=="%s")`, room.ID, room.Events()[len(room.Events())-1].EventID())
|
|
return gjson.Get(syncBody, path).Exists()
|
|
})
|
|
|
|
for _, tc := range testCases {
|
|
w := httptest.NewRecorder()
|
|
routers.Client.ServeHTTP(w, tc.req)
|
|
if w.Code != tc.wantCode {
|
|
t.Fatalf("%s: got HTTP %d want %d", tc.name, w.Code, tc.wantCode)
|
|
}
|
|
if tc.wantJoinedRooms != nil {
|
|
var res types.Response
|
|
if err := json.NewDecoder(w.Body).Decode(&res); err != nil {
|
|
t.Fatalf("%s: failed to decode response body: %s", tc.name, err)
|
|
}
|
|
if len(res.Rooms.Join) != len(tc.wantJoinedRooms) {
|
|
t.Errorf("%s: got %v joined rooms, want %v.\nResponse: %+v", tc.name, len(res.Rooms.Join), len(tc.wantJoinedRooms), res)
|
|
}
|
|
t.Logf("res: %+v", res.Rooms.Join[room.ID])
|
|
|
|
gotEventIDs := make([]string, len(res.Rooms.Join[room.ID].Timeline.Events))
|
|
for i, ev := range res.Rooms.Join[room.ID].Timeline.Events {
|
|
gotEventIDs[i] = ev.EventID
|
|
}
|
|
test.AssertEventIDsEqual(t, gotEventIDs, room.Events())
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestSyncAPIEventFormatPowerLevels(t *testing.T) {
|
|
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
|
testSyncEventFormatPowerLevels(t, dbType)
|
|
})
|
|
}
|
|
|
|
func testSyncEventFormatPowerLevels(t *testing.T, dbType test.DBType) {
|
|
user := test.NewUser(t)
|
|
setRoomVersion := func(t *testing.T, r *test.Room) { r.Version = gomatrixserverlib.RoomVersionPseudoIDs }
|
|
room := test.NewRoom(t, user, setRoomVersion)
|
|
alice := userapi.Device{
|
|
ID: "ALICEID",
|
|
UserID: user.ID,
|
|
AccessToken: "ALICE_BEARER_TOKEN",
|
|
DisplayName: "Alice",
|
|
AccountType: userapi.AccountTypeUser,
|
|
}
|
|
|
|
room.CreateAndInsert(t, user, spec.MRoomPowerLevels, gomatrixserverlib.PowerLevelContent{
|
|
Users: map[string]int64{
|
|
user.ID: 100,
|
|
},
|
|
}, test.WithStateKey(""))
|
|
|
|
cfg, processCtx, close := testrig.CreateConfig(t, dbType)
|
|
routers := httputil.NewRouters()
|
|
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
|
|
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
|
|
natsInstance := jetstream.NATSInstance{}
|
|
defer close()
|
|
|
|
jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
|
|
defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream)
|
|
msgs := toNATSMsgs(t, cfg, room.Events()...)
|
|
AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches, caching.DisableMetrics)
|
|
testrig.MustPublishMsgs(t, jsctx, msgs...)
|
|
|
|
testCases := []struct {
|
|
name string
|
|
wantCode int
|
|
wantJoinedRooms []string
|
|
eventFormat synctypes.ClientEventFormat
|
|
}{
|
|
{
|
|
name: "Client format",
|
|
wantCode: 200,
|
|
wantJoinedRooms: []string{room.ID},
|
|
eventFormat: synctypes.FormatSync,
|
|
},
|
|
{
|
|
name: "Federation format",
|
|
wantCode: 200,
|
|
wantJoinedRooms: []string{room.ID},
|
|
eventFormat: synctypes.FormatSyncFederation,
|
|
},
|
|
}
|
|
|
|
syncUntil(t, routers, alice.AccessToken, false, func(syncBody string) bool {
|
|
// wait for the last sent eventID to come down sync
|
|
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(event_id=="%s")`, room.ID, room.Events()[len(room.Events())-1].EventID())
|
|
return gjson.Get(syncBody, path).Exists()
|
|
})
|
|
|
|
for _, tc := range testCases {
|
|
format := ""
|
|
if tc.eventFormat == synctypes.FormatSyncFederation {
|
|
format = "federation"
|
|
}
|
|
|
|
w := httptest.NewRecorder()
|
|
routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
|
|
"access_token": alice.AccessToken,
|
|
"timeout": "0",
|
|
"filter": fmt.Sprintf(`{"event_format":"%s"}`, format),
|
|
})))
|
|
if w.Code != tc.wantCode {
|
|
t.Fatalf("%s: got HTTP %d want %d", tc.name, w.Code, tc.wantCode)
|
|
}
|
|
if tc.wantJoinedRooms != nil {
|
|
var res types.Response
|
|
if err := json.NewDecoder(w.Body).Decode(&res); err != nil {
|
|
t.Fatalf("%s: failed to decode response body: %s", tc.name, err)
|
|
}
|
|
if len(res.Rooms.Join) != len(tc.wantJoinedRooms) {
|
|
t.Errorf("%s: got %v joined rooms, want %v.\nResponse: %+v", tc.name, len(res.Rooms.Join), len(tc.wantJoinedRooms), res)
|
|
}
|
|
t.Logf("res: %+v", res.Rooms.Join[room.ID])
|
|
|
|
gotEventIDs := make([]string, len(res.Rooms.Join[room.ID].Timeline.Events))
|
|
for i, ev := range res.Rooms.Join[room.ID].Timeline.Events {
|
|
gotEventIDs[i] = ev.EventID
|
|
}
|
|
test.AssertEventIDsEqual(t, gotEventIDs, room.Events())
|
|
|
|
event := room.CreateAndInsert(t, user, spec.MRoomPowerLevels, gomatrixserverlib.PowerLevelContent{
|
|
Users: map[string]int64{
|
|
user.ID: 100,
|
|
"@otheruser:localhost": 50,
|
|
},
|
|
}, test.WithStateKey(""))
|
|
|
|
msgs := toNATSMsgs(t, cfg, event)
|
|
testrig.MustPublishMsgs(t, jsctx, msgs...)
|
|
|
|
syncUntil(t, routers, alice.AccessToken, false, func(syncBody string) bool {
|
|
// wait for the last sent eventID to come down sync
|
|
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(event_id=="%s")`, room.ID, room.Events()[len(room.Events())-1].EventID())
|
|
return gjson.Get(syncBody, path).Exists()
|
|
})
|
|
|
|
since := res.NextBatch.String()
|
|
w := httptest.NewRecorder()
|
|
routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
|
|
"access_token": alice.AccessToken,
|
|
"timeout": "0",
|
|
"filter": fmt.Sprintf(`{"event_format":"%s"}`, format),
|
|
"since": since,
|
|
})))
|
|
if w.Code != 200 {
|
|
t.Errorf("since=%s got HTTP %d want 200", since, w.Code)
|
|
}
|
|
|
|
res = *types.NewResponse()
|
|
if err := json.NewDecoder(w.Body).Decode(&res); err != nil {
|
|
t.Errorf("failed to decode response body: %s", err)
|
|
}
|
|
if len(res.Rooms.Join) != 1 {
|
|
t.Fatalf("since=%s got %d joined rooms, want 1", since, len(res.Rooms.Join))
|
|
}
|
|
gotEventIDs = make([]string, len(res.Rooms.Join[room.ID].Timeline.Events))
|
|
for j, ev := range res.Rooms.Join[room.ID].Timeline.Events {
|
|
gotEventIDs[j] = ev.EventID
|
|
if ev.Type == spec.MRoomPowerLevels {
|
|
content := gomatrixserverlib.PowerLevelContent{}
|
|
err := json.Unmarshal(ev.Content, &content)
|
|
if err != nil {
|
|
t.Errorf("failed to unmarshal power level content: %s", err)
|
|
}
|
|
otherUserLevel := content.UserLevel("@otheruser:localhost")
|
|
if otherUserLevel != 50 {
|
|
t.Errorf("Expected user PL of %d but got %d", 50, otherUserLevel)
|
|
}
|
|
}
|
|
}
|
|
events := []*rstypes.HeaderedEvent{room.Events()[len(room.Events())-1]}
|
|
test.AssertEventIDsEqual(t, gotEventIDs, events)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Tests what happens when we create a room and then /sync before all events from /createRoom have
|
|
// been sent to the syncapi
|
|
func TestSyncAPICreateRoomSyncEarly(t *testing.T) {
|
|
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
|
testSyncAPICreateRoomSyncEarly(t, dbType)
|
|
})
|
|
}
|
|
|
|
func testSyncAPICreateRoomSyncEarly(t *testing.T, dbType test.DBType) {
|
|
t.Skip("Skipped, possibly fixed")
|
|
user := test.NewUser(t)
|
|
room := test.NewRoom(t, user)
|
|
alice := userapi.Device{
|
|
ID: "ALICEID",
|
|
UserID: user.ID,
|
|
AccessToken: "ALICE_BEARER_TOKEN",
|
|
DisplayName: "Alice",
|
|
AccountType: userapi.AccountTypeUser,
|
|
}
|
|
|
|
cfg, processCtx, close := testrig.CreateConfig(t, dbType)
|
|
routers := httputil.NewRouters()
|
|
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
|
|
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
|
|
defer close()
|
|
natsInstance := jetstream.NATSInstance{}
|
|
|
|
jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
|
|
defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream)
|
|
// order is:
|
|
// m.room.create
|
|
// m.room.member
|
|
// m.room.power_levels
|
|
// m.room.join_rules
|
|
// m.room.history_visibility
|
|
msgs := toNATSMsgs(t, cfg, room.Events()...)
|
|
sinceTokens := make([]string, len(msgs))
|
|
AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches, caching.DisableMetrics)
|
|
for i, msg := range msgs {
|
|
testrig.MustPublishMsgs(t, jsctx, msg)
|
|
time.Sleep(100 * time.Millisecond)
|
|
w := httptest.NewRecorder()
|
|
routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
|
|
"access_token": alice.AccessToken,
|
|
"timeout": "0",
|
|
})))
|
|
if w.Code != 200 {
|
|
t.Errorf("got HTTP %d want 200", w.Code)
|
|
continue
|
|
}
|
|
var res types.Response
|
|
if err := json.NewDecoder(w.Body).Decode(&res); err != nil {
|
|
t.Errorf("failed to decode response body: %s", err)
|
|
}
|
|
sinceTokens[i] = res.NextBatch.String()
|
|
if i == 0 { // create event does not produce a room section
|
|
if len(res.Rooms.Join) != 0 {
|
|
t.Fatalf("i=%v got %d joined rooms, want 0", i, len(res.Rooms.Join))
|
|
}
|
|
} else { // we should have that room somewhere
|
|
if len(res.Rooms.Join) != 1 {
|
|
t.Fatalf("i=%v got %d joined rooms, want 1", i, len(res.Rooms.Join))
|
|
}
|
|
}
|
|
}
|
|
|
|
// sync with no token "" and with the penultimate token and this should neatly return room events in the timeline block
|
|
sinceTokens = append([]string{""}, sinceTokens[:len(sinceTokens)-1]...)
|
|
|
|
t.Logf("waited for events to be consumed; syncing with %v", sinceTokens)
|
|
for i, since := range sinceTokens {
|
|
w := httptest.NewRecorder()
|
|
routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
|
|
"access_token": alice.AccessToken,
|
|
"timeout": "0",
|
|
"since": since,
|
|
})))
|
|
if w.Code != 200 {
|
|
t.Errorf("since=%s got HTTP %d want 200", since, w.Code)
|
|
}
|
|
var res types.Response
|
|
if err := json.NewDecoder(w.Body).Decode(&res); err != nil {
|
|
t.Errorf("failed to decode response body: %s", err)
|
|
}
|
|
if len(res.Rooms.Join) != 1 {
|
|
t.Fatalf("since=%s got %d joined rooms, want 1", since, len(res.Rooms.Join))
|
|
}
|
|
t.Logf("since=%s res state:%+v res timeline:%+v", since, res.Rooms.Join[room.ID].State.Events, res.Rooms.Join[room.ID].Timeline.Events)
|
|
gotEventIDs := make([]string, len(res.Rooms.Join[room.ID].Timeline.Events))
|
|
for j, ev := range res.Rooms.Join[room.ID].Timeline.Events {
|
|
gotEventIDs[j] = ev.EventID
|
|
}
|
|
test.AssertEventIDsEqual(t, gotEventIDs, room.Events()[i:])
|
|
}
|
|
}
|
|
|
|
// Test that if we hit /sync we get back presence: online, regardless of whether messages get delivered
|
|
// via NATS. Regression test for a flakey test "User sees their own presence in a sync"
|
|
func TestSyncAPIUpdatePresenceImmediately(t *testing.T) {
|
|
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
|
testSyncAPIUpdatePresenceImmediately(t, dbType)
|
|
})
|
|
}
|
|
|
|
func testSyncAPIUpdatePresenceImmediately(t *testing.T, dbType test.DBType) {
|
|
user := test.NewUser(t)
|
|
alice := userapi.Device{
|
|
ID: "ALICEID",
|
|
UserID: user.ID,
|
|
AccessToken: "ALICE_BEARER_TOKEN",
|
|
DisplayName: "Alice",
|
|
AccountType: userapi.AccountTypeUser,
|
|
}
|
|
|
|
cfg, processCtx, close := testrig.CreateConfig(t, dbType)
|
|
routers := httputil.NewRouters()
|
|
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
|
|
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
|
|
cfg.Global.Presence.EnableOutbound = true
|
|
cfg.Global.Presence.EnableInbound = true
|
|
defer close()
|
|
natsInstance := jetstream.NATSInstance{}
|
|
|
|
jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
|
|
defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream)
|
|
AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches, caching.DisableMetrics)
|
|
w := httptest.NewRecorder()
|
|
routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
|
|
"access_token": alice.AccessToken,
|
|
"timeout": "0",
|
|
"set_presence": "online",
|
|
})))
|
|
if w.Code != 200 {
|
|
t.Fatalf("got HTTP %d want %d", w.Code, 200)
|
|
}
|
|
var res types.Response
|
|
if err := json.NewDecoder(w.Body).Decode(&res); err != nil {
|
|
t.Errorf("failed to decode response body: %s", err)
|
|
}
|
|
if len(res.Presence.Events) != 1 {
|
|
t.Fatalf("expected 1 presence events, got: %+v", res.Presence.Events)
|
|
}
|
|
if res.Presence.Events[0].Sender != alice.UserID {
|
|
t.Errorf("sender: got %v want %v", res.Presence.Events[0].Sender, alice.UserID)
|
|
}
|
|
if res.Presence.Events[0].Type != "m.presence" {
|
|
t.Errorf("type: got %v want %v", res.Presence.Events[0].Type, "m.presence")
|
|
}
|
|
if gjson.ParseBytes(res.Presence.Events[0].Content).Get("presence").Str != "online" {
|
|
t.Errorf("content: not online, got %v", res.Presence.Events[0].Content)
|
|
}
|
|
|
|
}
|
|
|
|
// This is mainly what Sytest is doing in "test_history_visibility"
|
|
func TestMessageHistoryVisibility(t *testing.T) {
|
|
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
|
testHistoryVisibility(t, dbType)
|
|
})
|
|
}
|
|
|
|
func testHistoryVisibility(t *testing.T, dbType test.DBType) {
|
|
type result struct {
|
|
seeWithoutJoin bool
|
|
seeBeforeJoin bool
|
|
seeAfterInvite bool
|
|
}
|
|
|
|
// create the users
|
|
alice := test.NewUser(t)
|
|
aliceDev := userapi.Device{
|
|
ID: "ALICEID",
|
|
UserID: alice.ID,
|
|
AccessToken: "ALICE_BEARER_TOKEN",
|
|
DisplayName: "ALICE",
|
|
}
|
|
|
|
bob := test.NewUser(t)
|
|
|
|
bobDev := userapi.Device{
|
|
ID: "BOBID",
|
|
UserID: bob.ID,
|
|
AccessToken: "BOD_BEARER_TOKEN",
|
|
DisplayName: "BOB",
|
|
}
|
|
|
|
ctx := context.Background()
|
|
// check guest and normal user accounts
|
|
for _, accType := range []userapi.AccountType{userapi.AccountTypeGuest, userapi.AccountTypeUser} {
|
|
testCases := []struct {
|
|
historyVisibility gomatrixserverlib.HistoryVisibility
|
|
wantResult result
|
|
}{
|
|
{
|
|
historyVisibility: gomatrixserverlib.HistoryVisibilityWorldReadable,
|
|
wantResult: result{
|
|
seeWithoutJoin: true,
|
|
seeBeforeJoin: true,
|
|
seeAfterInvite: true,
|
|
},
|
|
},
|
|
{
|
|
historyVisibility: gomatrixserverlib.HistoryVisibilityShared,
|
|
wantResult: result{
|
|
seeWithoutJoin: false,
|
|
seeBeforeJoin: true,
|
|
seeAfterInvite: true,
|
|
},
|
|
},
|
|
{
|
|
historyVisibility: gomatrixserverlib.HistoryVisibilityInvited,
|
|
wantResult: result{
|
|
seeWithoutJoin: false,
|
|
seeBeforeJoin: false,
|
|
seeAfterInvite: true,
|
|
},
|
|
},
|
|
{
|
|
historyVisibility: gomatrixserverlib.HistoryVisibilityJoined,
|
|
wantResult: result{
|
|
seeWithoutJoin: false,
|
|
seeBeforeJoin: false,
|
|
seeAfterInvite: false,
|
|
},
|
|
},
|
|
}
|
|
|
|
bobDev.AccountType = accType
|
|
userType := "guest"
|
|
if accType == userapi.AccountTypeUser {
|
|
userType = "real user"
|
|
}
|
|
|
|
cfg, processCtx, close := testrig.CreateConfig(t, dbType)
|
|
cfg.ClientAPI.RateLimiting = config.RateLimiting{Enabled: false}
|
|
routers := httputil.NewRouters()
|
|
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
|
|
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
|
|
defer close()
|
|
natsInstance := jetstream.NATSInstance{}
|
|
|
|
jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
|
|
defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream)
|
|
|
|
// Use the actual internal roomserver API
|
|
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
|
rsAPI.SetFederationAPI(nil, nil)
|
|
AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches, caching.DisableMetrics)
|
|
|
|
for _, tc := range testCases {
|
|
testname := fmt.Sprintf("%s - %s", tc.historyVisibility, userType)
|
|
t.Run(testname, func(t *testing.T) {
|
|
// create a room with the given visibility
|
|
room := test.NewRoom(t, alice, test.RoomHistoryVisibility(tc.historyVisibility))
|
|
|
|
// send the events/messages to NATS to create the rooms
|
|
beforeJoinBody := fmt.Sprintf("Before invite in a %s room", tc.historyVisibility)
|
|
beforeJoinEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": beforeJoinBody})
|
|
eventsToSend := append(room.Events(), beforeJoinEv)
|
|
if err := api.SendEvents(ctx, rsAPI, api.KindNew, eventsToSend, "test", "test", "test", nil, false); err != nil {
|
|
t.Fatalf("failed to send events: %v", err)
|
|
}
|
|
syncUntil(t, routers, aliceDev.AccessToken, false,
|
|
func(syncBody string) bool {
|
|
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(content.body=="%s")`, room.ID, beforeJoinBody)
|
|
return gjson.Get(syncBody, path).Exists()
|
|
},
|
|
)
|
|
|
|
// There is only one event, we expect only to be able to see this, if the room is world_readable
|
|
w := httptest.NewRecorder()
|
|
routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/messages", room.ID), test.WithQueryParams(map[string]string{
|
|
"access_token": bobDev.AccessToken,
|
|
"dir": "b",
|
|
"filter": `{"lazy_load_members":true}`, // check that lazy loading doesn't break history visibility
|
|
})))
|
|
if w.Code != 200 {
|
|
t.Logf("%s", w.Body.String())
|
|
t.Fatalf("got HTTP %d want %d", w.Code, 200)
|
|
}
|
|
// We only care about the returned events at this point
|
|
var res struct {
|
|
Chunk []synctypes.ClientEvent `json:"chunk"`
|
|
}
|
|
if err := json.NewDecoder(w.Body).Decode(&res); err != nil {
|
|
t.Errorf("failed to decode response body: %s", err)
|
|
}
|
|
|
|
verifyEventVisible(t, tc.wantResult.seeWithoutJoin, beforeJoinEv, res.Chunk)
|
|
|
|
// Create invite, a message, join the room and create another message.
|
|
inviteEv := room.CreateAndInsert(t, alice, "m.room.member", map[string]interface{}{"membership": "invite"}, test.WithStateKey(bob.ID))
|
|
afterInviteEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": fmt.Sprintf("After invite in a %s room", tc.historyVisibility)})
|
|
joinEv := room.CreateAndInsert(t, bob, "m.room.member", map[string]interface{}{"membership": "join"}, test.WithStateKey(bob.ID))
|
|
afterJoinBody := fmt.Sprintf("After join in a %s room", tc.historyVisibility)
|
|
msgEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": afterJoinBody})
|
|
|
|
eventsToSend = append([]*rstypes.HeaderedEvent{}, inviteEv, afterInviteEv, joinEv, msgEv)
|
|
|
|
if err := api.SendEvents(ctx, rsAPI, api.KindNew, eventsToSend, "test", "test", "test", nil, false); err != nil {
|
|
t.Fatalf("failed to send events: %v", err)
|
|
}
|
|
syncUntil(t, routers, aliceDev.AccessToken, false,
|
|
func(syncBody string) bool {
|
|
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(content.body=="%s")`, room.ID, afterJoinBody)
|
|
return gjson.Get(syncBody, path).Exists()
|
|
},
|
|
)
|
|
|
|
// Verify the messages after/before invite are visible or not
|
|
w = httptest.NewRecorder()
|
|
routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/messages", room.ID), test.WithQueryParams(map[string]string{
|
|
"access_token": bobDev.AccessToken,
|
|
"dir": "b",
|
|
})))
|
|
if w.Code != 200 {
|
|
t.Logf("%s", w.Body.String())
|
|
t.Fatalf("got HTTP %d want %d", w.Code, 200)
|
|
}
|
|
if err := json.NewDecoder(w.Body).Decode(&res); err != nil {
|
|
t.Errorf("failed to decode response body: %s", err)
|
|
}
|
|
// verify results
|
|
verifyEventVisible(t, tc.wantResult.seeBeforeJoin, beforeJoinEv, res.Chunk)
|
|
verifyEventVisible(t, tc.wantResult.seeAfterInvite, afterInviteEv, res.Chunk)
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
func verifyEventVisible(t *testing.T, wantVisible bool, wantVisibleEvent *rstypes.HeaderedEvent, chunk []synctypes.ClientEvent) {
|
|
t.Helper()
|
|
if wantVisible {
|
|
for _, ev := range chunk {
|
|
if ev.EventID == wantVisibleEvent.EventID() {
|
|
return
|
|
}
|
|
}
|
|
t.Fatalf("expected to see event %s but didn't: %+v", wantVisibleEvent.EventID(), chunk)
|
|
} else {
|
|
for _, ev := range chunk {
|
|
if ev.EventID == wantVisibleEvent.EventID() {
|
|
t.Fatalf("expected not to see event %s: %+v", wantVisibleEvent.EventID(), string(ev.Content))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestGetMembership(t *testing.T) {
|
|
alice := test.NewUser(t)
|
|
|
|
aliceDev := userapi.Device{
|
|
ID: "ALICEID",
|
|
UserID: alice.ID,
|
|
AccessToken: "ALICE_BEARER_TOKEN",
|
|
DisplayName: "Alice",
|
|
AccountType: userapi.AccountTypeUser,
|
|
}
|
|
|
|
bob := test.NewUser(t)
|
|
bobDev := userapi.Device{
|
|
ID: "BOBID",
|
|
UserID: bob.ID,
|
|
AccessToken: "notjoinedtoanyrooms",
|
|
}
|
|
|
|
testCases := []struct {
|
|
name string
|
|
roomID string
|
|
additionalEvents func(t *testing.T, room *test.Room)
|
|
request func(t *testing.T, room *test.Room) *http.Request
|
|
wantOK bool
|
|
wantMemberCount int
|
|
useSleep bool // :/
|
|
}{
|
|
{
|
|
name: "/members - Alice joined",
|
|
request: func(t *testing.T, room *test.Room) *http.Request {
|
|
return test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/members", room.ID), test.WithQueryParams(map[string]string{
|
|
"access_token": aliceDev.AccessToken,
|
|
}))
|
|
},
|
|
wantOK: true,
|
|
wantMemberCount: 1,
|
|
},
|
|
{
|
|
name: "/members - Bob never joined",
|
|
request: func(t *testing.T, room *test.Room) *http.Request {
|
|
return test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/members", room.ID), test.WithQueryParams(map[string]string{
|
|
"access_token": bobDev.AccessToken,
|
|
}))
|
|
},
|
|
wantOK: false,
|
|
},
|
|
{
|
|
name: "Alice leaves before Bob joins, should not be able to see Bob",
|
|
request: func(t *testing.T, room *test.Room) *http.Request {
|
|
return test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/members", room.ID), test.WithQueryParams(map[string]string{
|
|
"access_token": aliceDev.AccessToken,
|
|
}))
|
|
},
|
|
additionalEvents: func(t *testing.T, room *test.Room) {
|
|
room.CreateAndInsert(t, alice, spec.MRoomMember, map[string]interface{}{
|
|
"membership": "leave",
|
|
}, test.WithStateKey(alice.ID))
|
|
room.CreateAndInsert(t, bob, spec.MRoomMember, map[string]interface{}{
|
|
"membership": "join",
|
|
}, test.WithStateKey(bob.ID))
|
|
},
|
|
useSleep: true,
|
|
wantOK: true,
|
|
wantMemberCount: 1,
|
|
},
|
|
{
|
|
name: "Alice leaves after Bob joins, should be able to see Bob",
|
|
request: func(t *testing.T, room *test.Room) *http.Request {
|
|
return test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/members", room.ID), test.WithQueryParams(map[string]string{
|
|
"access_token": aliceDev.AccessToken,
|
|
}))
|
|
},
|
|
additionalEvents: func(t *testing.T, room *test.Room) {
|
|
room.CreateAndInsert(t, bob, spec.MRoomMember, map[string]interface{}{
|
|
"membership": "join",
|
|
}, test.WithStateKey(bob.ID))
|
|
room.CreateAndInsert(t, alice, spec.MRoomMember, map[string]interface{}{
|
|
"membership": "leave",
|
|
}, test.WithStateKey(alice.ID))
|
|
},
|
|
useSleep: true,
|
|
wantOK: true,
|
|
wantMemberCount: 2,
|
|
},
|
|
{
|
|
name: "'at' specified, returns memberships before Bob joins",
|
|
request: func(t *testing.T, room *test.Room) *http.Request {
|
|
return test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/members", room.ID), test.WithQueryParams(map[string]string{
|
|
"access_token": aliceDev.AccessToken,
|
|
"at": "t2_5",
|
|
}))
|
|
},
|
|
additionalEvents: func(t *testing.T, room *test.Room) {
|
|
room.CreateAndInsert(t, bob, spec.MRoomMember, map[string]interface{}{
|
|
"membership": "join",
|
|
}, test.WithStateKey(bob.ID))
|
|
},
|
|
useSleep: true,
|
|
wantOK: true,
|
|
wantMemberCount: 1,
|
|
},
|
|
{
|
|
name: "'membership=leave' specified, returns no memberships",
|
|
request: func(t *testing.T, room *test.Room) *http.Request {
|
|
return test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/members", room.ID), test.WithQueryParams(map[string]string{
|
|
"access_token": aliceDev.AccessToken,
|
|
"membership": "leave",
|
|
}))
|
|
},
|
|
wantOK: true,
|
|
wantMemberCount: 0,
|
|
},
|
|
{
|
|
name: "'not_membership=join' specified, returns no memberships",
|
|
request: func(t *testing.T, room *test.Room) *http.Request {
|
|
return test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/members", room.ID), test.WithQueryParams(map[string]string{
|
|
"access_token": aliceDev.AccessToken,
|
|
"not_membership": "join",
|
|
}))
|
|
},
|
|
wantOK: true,
|
|
wantMemberCount: 0,
|
|
},
|
|
{
|
|
name: "'not_membership=leave' & 'membership=join' specified, returns correct memberships",
|
|
request: func(t *testing.T, room *test.Room) *http.Request {
|
|
return test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/members", room.ID), test.WithQueryParams(map[string]string{
|
|
"access_token": aliceDev.AccessToken,
|
|
"not_membership": "leave",
|
|
"membership": "join",
|
|
}))
|
|
},
|
|
additionalEvents: func(t *testing.T, room *test.Room) {
|
|
room.CreateAndInsert(t, bob, spec.MRoomMember, map[string]interface{}{
|
|
"membership": "join",
|
|
}, test.WithStateKey(bob.ID))
|
|
room.CreateAndInsert(t, bob, spec.MRoomMember, map[string]interface{}{
|
|
"membership": "leave",
|
|
}, test.WithStateKey(bob.ID))
|
|
},
|
|
wantOK: true,
|
|
wantMemberCount: 1,
|
|
},
|
|
{
|
|
name: "non-existent room ID",
|
|
request: func(t *testing.T, room *test.Room) *http.Request {
|
|
return test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/members", "!notavalidroom:test"), test.WithQueryParams(map[string]string{
|
|
"access_token": aliceDev.AccessToken,
|
|
}))
|
|
},
|
|
wantOK: false,
|
|
},
|
|
}
|
|
|
|
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
|
|
|
cfg, processCtx, close := testrig.CreateConfig(t, dbType)
|
|
routers := httputil.NewRouters()
|
|
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
|
|
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
|
|
defer close()
|
|
natsInstance := jetstream.NATSInstance{}
|
|
jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
|
|
defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream)
|
|
|
|
// Use an actual roomserver for this
|
|
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
|
rsAPI.SetFederationAPI(nil, nil)
|
|
|
|
AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches, caching.DisableMetrics)
|
|
|
|
for _, tc := range testCases {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
room := test.NewRoom(t, alice)
|
|
t.Cleanup(func() {
|
|
t.Logf("running cleanup for %s", tc.name)
|
|
})
|
|
// inject additional events
|
|
if tc.additionalEvents != nil {
|
|
tc.additionalEvents(t, room)
|
|
}
|
|
if err := api.SendEvents(context.Background(), rsAPI, api.KindNew, room.Events(), "test", "test", "test", nil, false); err != nil {
|
|
t.Fatalf("failed to send events: %v", err)
|
|
}
|
|
|
|
// wait for the events to come down sync
|
|
if tc.useSleep {
|
|
time.Sleep(time.Millisecond * 100)
|
|
} else {
|
|
syncUntil(t, routers, aliceDev.AccessToken, false, func(syncBody string) bool {
|
|
// wait for the last sent eventID to come down sync
|
|
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(event_id=="%s")`, room.ID, room.Events()[len(room.Events())-1].EventID())
|
|
return gjson.Get(syncBody, path).Exists()
|
|
})
|
|
}
|
|
|
|
w := httptest.NewRecorder()
|
|
routers.Client.ServeHTTP(w, tc.request(t, room))
|
|
if w.Code != 200 && tc.wantOK {
|
|
t.Logf("%s", w.Body.String())
|
|
t.Fatalf("got HTTP %d want %d", w.Code, 200)
|
|
}
|
|
t.Logf("[%s] Resp: %s", tc.name, w.Body.String())
|
|
|
|
// check we got the expected events
|
|
if tc.wantOK {
|
|
memberCount := len(gjson.GetBytes(w.Body.Bytes(), "chunk").Array())
|
|
if memberCount != tc.wantMemberCount {
|
|
t.Fatalf("expected %d members, got %d", tc.wantMemberCount, memberCount)
|
|
}
|
|
}
|
|
})
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestSendToDevice(t *testing.T) {
|
|
test.WithAllDatabases(t, testSendToDevice)
|
|
}
|
|
|
|
func testSendToDevice(t *testing.T, dbType test.DBType) {
|
|
user := test.NewUser(t)
|
|
alice := userapi.Device{
|
|
ID: "ALICEID",
|
|
UserID: user.ID,
|
|
AccessToken: "ALICE_BEARER_TOKEN",
|
|
DisplayName: "Alice",
|
|
AccountType: userapi.AccountTypeUser,
|
|
}
|
|
|
|
cfg, processCtx, close := testrig.CreateConfig(t, dbType)
|
|
routers := httputil.NewRouters()
|
|
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
|
|
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
|
|
defer close()
|
|
natsInstance := jetstream.NATSInstance{}
|
|
|
|
jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
|
|
defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream)
|
|
AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches, caching.DisableMetrics)
|
|
|
|
producer := producers.SyncAPIProducer{
|
|
TopicSendToDeviceEvent: cfg.Global.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
|
|
JetStream: jsctx,
|
|
}
|
|
|
|
msgCounter := 0
|
|
|
|
testCases := []struct {
|
|
name string
|
|
since string
|
|
want []string
|
|
sendMessagesCount int
|
|
}{
|
|
{
|
|
name: "initial sync, no messages",
|
|
want: []string{},
|
|
},
|
|
{
|
|
name: "initial sync, one new message",
|
|
sendMessagesCount: 1,
|
|
want: []string{
|
|
"message 1",
|
|
},
|
|
},
|
|
{
|
|
name: "initial sync, two new messages", // we didn't advance the since token, so we'll receive two messages
|
|
sendMessagesCount: 1,
|
|
want: []string{
|
|
"message 1",
|
|
"message 2",
|
|
},
|
|
},
|
|
{
|
|
name: "incremental sync, one message", // this deletes message 1, as we advanced the since token
|
|
since: types.StreamingToken{SendToDevicePosition: 1}.String(),
|
|
want: []string{
|
|
"message 2",
|
|
},
|
|
},
|
|
{
|
|
name: "failed incremental sync, one message", // didn't advance since, so still the same message
|
|
since: types.StreamingToken{SendToDevicePosition: 1}.String(),
|
|
want: []string{
|
|
"message 2",
|
|
},
|
|
},
|
|
{
|
|
name: "incremental sync, no message", // this should delete message 2
|
|
since: types.StreamingToken{SendToDevicePosition: 2}.String(), // next_batch from previous sync
|
|
want: []string{},
|
|
},
|
|
{
|
|
name: "incremental sync, three new messages",
|
|
since: types.StreamingToken{SendToDevicePosition: 2}.String(),
|
|
sendMessagesCount: 3,
|
|
want: []string{
|
|
"message 3", // message 2 was deleted in the previous test
|
|
"message 4",
|
|
"message 5",
|
|
},
|
|
},
|
|
{
|
|
name: "initial sync, three messages", // we expect three messages, as we didn't go beyond "2"
|
|
want: []string{
|
|
"message 3",
|
|
"message 4",
|
|
"message 5",
|
|
},
|
|
},
|
|
{
|
|
name: "incremental sync, no messages", // advance the sync token, no new messages
|
|
since: types.StreamingToken{SendToDevicePosition: 5}.String(),
|
|
want: []string{},
|
|
},
|
|
}
|
|
|
|
ctx := context.Background()
|
|
for _, tc := range testCases {
|
|
// Send to-device messages of type "m.dendrite.test" with content `{"dummy":"message $counter"}`
|
|
for i := 0; i < tc.sendMessagesCount; i++ {
|
|
msgCounter++
|
|
msg := json.RawMessage(fmt.Sprintf(`{"dummy":"message %d"}`, msgCounter))
|
|
if err := producer.SendToDevice(ctx, user.ID, user.ID, alice.ID, "m.dendrite.test", msg); err != nil {
|
|
t.Fatalf("unable to send to device message: %v", err)
|
|
}
|
|
}
|
|
|
|
syncUntil(t, routers, alice.AccessToken,
|
|
len(tc.want) == 0,
|
|
func(body string) bool {
|
|
return gjson.Get(body, fmt.Sprintf(`to_device.events.#(content.dummy=="message %d")`, msgCounter)).Exists()
|
|
},
|
|
)
|
|
|
|
// Execute a /sync request, recording the response
|
|
w := httptest.NewRecorder()
|
|
routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
|
|
"access_token": alice.AccessToken,
|
|
"since": tc.since,
|
|
})))
|
|
|
|
// Extract the to_device.events, # gets all values of an array, in this case a string slice with "message $counter" entries
|
|
events := gjson.Get(w.Body.String(), "to_device.events.#.content.dummy").Array()
|
|
got := make([]string, len(events))
|
|
for i := range events {
|
|
got[i] = events[i].String()
|
|
}
|
|
|
|
// Ensure the messages we received are as we expect them to be
|
|
if !reflect.DeepEqual(got, tc.want) {
|
|
t.Logf("[%s|since=%s]: Sync: %s", tc.name, tc.since, w.Body.String())
|
|
t.Fatalf("[%s|since=%s]: got: %+v, want: %+v", tc.name, tc.since, got, tc.want)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestContext(t *testing.T) {
|
|
test.WithAllDatabases(t, testContext)
|
|
}
|
|
|
|
func testContext(t *testing.T, dbType test.DBType) {
|
|
|
|
tests := []struct {
|
|
name string
|
|
roomID string
|
|
eventID string
|
|
params map[string]string
|
|
wantError bool
|
|
wantStateLength int
|
|
wantBeforeLength int
|
|
wantAfterLength int
|
|
}{
|
|
{
|
|
name: "invalid filter",
|
|
params: map[string]string{
|
|
"filter": "{",
|
|
},
|
|
wantError: true,
|
|
},
|
|
{
|
|
name: "invalid limit",
|
|
params: map[string]string{
|
|
"limit": "abc",
|
|
},
|
|
wantError: true,
|
|
},
|
|
{
|
|
name: "high limit",
|
|
params: map[string]string{
|
|
"limit": "100000",
|
|
},
|
|
},
|
|
{
|
|
name: "fine limit",
|
|
params: map[string]string{
|
|
"limit": "10",
|
|
},
|
|
},
|
|
{
|
|
name: "last event without lazy loading",
|
|
wantStateLength: 5,
|
|
},
|
|
{
|
|
name: "last event with lazy loading",
|
|
params: map[string]string{
|
|
"filter": `{"lazy_load_members":true}`,
|
|
},
|
|
wantStateLength: 1,
|
|
},
|
|
{
|
|
name: "invalid room",
|
|
roomID: "!doesnotexist",
|
|
wantError: true,
|
|
},
|
|
{
|
|
name: "invalid eventID",
|
|
eventID: "$doesnotexist",
|
|
wantError: true,
|
|
},
|
|
{
|
|
name: "state is limited",
|
|
params: map[string]string{
|
|
"limit": "1",
|
|
},
|
|
wantStateLength: 1,
|
|
},
|
|
{
|
|
name: "events are not limited",
|
|
wantBeforeLength: 5,
|
|
},
|
|
{
|
|
name: "all events are limited",
|
|
params: map[string]string{
|
|
"limit": "1",
|
|
},
|
|
wantStateLength: 1,
|
|
wantBeforeLength: 1,
|
|
wantAfterLength: 1,
|
|
},
|
|
}
|
|
|
|
user := test.NewUser(t)
|
|
alice := userapi.Device{
|
|
ID: "ALICEID",
|
|
UserID: user.ID,
|
|
AccessToken: "ALICE_BEARER_TOKEN",
|
|
DisplayName: "Alice",
|
|
AccountType: userapi.AccountTypeUser,
|
|
}
|
|
|
|
cfg, processCtx, close := testrig.CreateConfig(t, dbType)
|
|
routers := httputil.NewRouters()
|
|
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
|
|
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
|
|
defer close()
|
|
|
|
// Use an actual roomserver for this
|
|
natsInstance := jetstream.NATSInstance{}
|
|
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
|
rsAPI.SetFederationAPI(nil, nil)
|
|
|
|
AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, rsAPI, caches, caching.DisableMetrics)
|
|
|
|
room := test.NewRoom(t, user)
|
|
|
|
room.CreateAndInsert(t, user, "m.room.message", map[string]interface{}{"body": "hello world 1!"})
|
|
room.CreateAndInsert(t, user, "m.room.message", map[string]interface{}{"body": "hello world 2!"})
|
|
thirdMsg := room.CreateAndInsert(t, user, "m.room.message", map[string]interface{}{"body": "hello world3!"})
|
|
room.CreateAndInsert(t, user, "m.room.message", map[string]interface{}{"body": "hello world4!"})
|
|
|
|
if err := api.SendEvents(context.Background(), rsAPI, api.KindNew, room.Events(), "test", "test", "test", nil, false); err != nil {
|
|
t.Fatalf("failed to send events: %v", err)
|
|
}
|
|
|
|
jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
|
|
defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream)
|
|
|
|
syncUntil(t, routers, alice.AccessToken, false, func(syncBody string) bool {
|
|
// wait for the last sent eventID to come down sync
|
|
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(event_id=="%s")`, room.ID, thirdMsg.EventID())
|
|
return gjson.Get(syncBody, path).Exists()
|
|
})
|
|
|
|
for _, tc := range tests {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
params := map[string]string{
|
|
"access_token": alice.AccessToken,
|
|
}
|
|
w := httptest.NewRecorder()
|
|
// test overrides
|
|
roomID := room.ID
|
|
if tc.roomID != "" {
|
|
roomID = tc.roomID
|
|
}
|
|
eventID := thirdMsg.EventID()
|
|
if tc.eventID != "" {
|
|
eventID = tc.eventID
|
|
}
|
|
requestPath := fmt.Sprintf("/_matrix/client/v3/rooms/%s/context/%s", roomID, eventID)
|
|
if tc.params != nil {
|
|
for k, v := range tc.params {
|
|
params[k] = v
|
|
}
|
|
}
|
|
routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", requestPath, test.WithQueryParams(params)))
|
|
|
|
if tc.wantError && w.Code == 200 {
|
|
t.Fatalf("Expected an error, but got none")
|
|
}
|
|
t.Log(w.Body.String())
|
|
resp := routing.ContextRespsonse{}
|
|
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if tc.wantStateLength > 0 && tc.wantStateLength != len(resp.State) {
|
|
t.Fatalf("expected %d state events, got %d", tc.wantStateLength, len(resp.State))
|
|
}
|
|
if tc.wantBeforeLength > 0 && tc.wantBeforeLength != len(resp.EventsBefore) {
|
|
t.Fatalf("expected %d before events, got %d", tc.wantBeforeLength, len(resp.EventsBefore))
|
|
}
|
|
if tc.wantAfterLength > 0 && tc.wantAfterLength != len(resp.EventsAfter) {
|
|
t.Fatalf("expected %d after events, got %d", tc.wantAfterLength, len(resp.EventsAfter))
|
|
}
|
|
|
|
if !tc.wantError && resp.Event.EventID != eventID {
|
|
t.Fatalf("unexpected eventID %s, expected %s", resp.Event.EventID, eventID)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestUpdateRelations(t *testing.T) {
|
|
testCases := []struct {
|
|
name string
|
|
eventContent map[string]interface{}
|
|
eventType string
|
|
}{
|
|
{
|
|
name: "empty event content should not error",
|
|
},
|
|
{
|
|
name: "unable to unmarshal event should not error",
|
|
eventContent: map[string]interface{}{
|
|
"m.relates_to": map[string]interface{}{
|
|
"event_id": map[string]interface{}{}, // this should be a string and not struct
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "empty event ID is ignored",
|
|
eventContent: map[string]interface{}{
|
|
"m.relates_to": map[string]interface{}{
|
|
"event_id": "",
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "empty rel_type is ignored",
|
|
eventContent: map[string]interface{}{
|
|
"m.relates_to": map[string]interface{}{
|
|
"event_id": "$randomEventID",
|
|
"rel_type": "",
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "redactions are ignored",
|
|
eventType: spec.MRoomRedaction,
|
|
eventContent: map[string]interface{}{
|
|
"m.relates_to": map[string]interface{}{
|
|
"event_id": "$randomEventID",
|
|
"rel_type": "m.replace",
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "valid event is correctly written",
|
|
eventContent: map[string]interface{}{
|
|
"m.relates_to": map[string]interface{}{
|
|
"event_id": "$randomEventID",
|
|
"rel_type": "m.replace",
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
ctx := context.Background()
|
|
|
|
alice := test.NewUser(t)
|
|
room := test.NewRoom(t, alice)
|
|
|
|
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
|
cfg, processCtx, close := testrig.CreateConfig(t, dbType)
|
|
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
|
|
t.Cleanup(close)
|
|
db, err := storage.NewSyncServerDatasource(processCtx.Context(), cm, &cfg.SyncAPI.Database)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
for _, tc := range testCases {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
evType := "m.room.message"
|
|
if tc.eventType != "" {
|
|
evType = tc.eventType
|
|
}
|
|
ev := room.CreateEvent(t, alice, evType, tc.eventContent)
|
|
err = db.UpdateRelations(ctx, ev)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
})
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestRemoveEditedEventFromSearchIndex(t *testing.T) {
|
|
user := test.NewUser(t)
|
|
alice := userapi.Device{
|
|
ID: "ALICEID",
|
|
UserID: user.ID,
|
|
AccessToken: "ALICE_BEARER_TOKEN",
|
|
DisplayName: "Alice",
|
|
AccountType: userapi.AccountTypeUser,
|
|
}
|
|
|
|
routers := httputil.NewRouters()
|
|
|
|
cfg, processCtx, close := testrig.CreateConfig(t, test.DBTypeSQLite)
|
|
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
|
|
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
|
|
defer close()
|
|
|
|
// Use an actual roomserver for this
|
|
natsInstance := jetstream.NATSInstance{}
|
|
jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
|
|
defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream)
|
|
|
|
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
|
rsAPI.SetFederationAPI(nil, nil)
|
|
|
|
room := test.NewRoom(t, user)
|
|
AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches, caching.DisableMetrics)
|
|
|
|
if err := api.SendEvents(processCtx.Context(), rsAPI, api.KindNew, room.Events(), "test", "test", "test", nil, false); err != nil {
|
|
t.Fatalf("failed to send events: %v", err)
|
|
}
|
|
|
|
ev1 := room.CreateAndInsert(t, user, "m.room.message", map[string]interface{}{"body": "first"})
|
|
ev2 := room.CreateAndInsert(t, user, "m.room.message", map[string]interface{}{
|
|
"body": " * first",
|
|
"m.new_content": map[string]interface{}{
|
|
"body": "first",
|
|
"msgtype": "m.text",
|
|
},
|
|
"m.relates_to": map[string]interface{}{
|
|
"event_id": ev1.EventID(),
|
|
"rel_type": "m.replace",
|
|
},
|
|
})
|
|
events := []*rstypes.HeaderedEvent{ev1, ev2}
|
|
|
|
for _, e := range events {
|
|
roomEvents := append([]*rstypes.HeaderedEvent{}, e)
|
|
if err := api.SendEvents(processCtx.Context(), rsAPI, api.KindNew, roomEvents, "test", "test", "test", nil, false); err != nil {
|
|
t.Fatalf("failed to send events: %v", err)
|
|
}
|
|
|
|
syncUntil(t, routers, alice.AccessToken, false, func(syncBody string) bool {
|
|
// wait for the last sent eventID to come down sync
|
|
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(event_id=="%s")`, room.ID, e.EventID())
|
|
|
|
return gjson.Get(syncBody, path).Exists()
|
|
})
|
|
|
|
// We search that event is the only one nad is the exact event we sent
|
|
searchResult := searchRequest(t, routers.Client, alice.AccessToken, "first", []string{room.ID})
|
|
results := gjson.GetBytes(searchResult, fmt.Sprintf(`search_categories.room_events.groups.room_id.%s.results`, room.ID))
|
|
assert.True(t, results.Exists(), "Should be a search response")
|
|
assert.Equal(t, 1, len(results.Array()), "Should be exactly one result")
|
|
assert.Equal(t, e.EventID(), results.Array()[0].String(), "Should be only found exact event")
|
|
}
|
|
}
|
|
|
|
func searchRequest(t *testing.T, router *mux.Router, accessToken, searchTerm string, roomList []string) []byte {
|
|
t.Helper()
|
|
w := httptest.NewRecorder()
|
|
rq := test.NewRequest(t, "POST", "/_matrix/client/v3/search", test.WithQueryParams(map[string]string{
|
|
"access_token": accessToken,
|
|
}), test.WithJSONBody(t, map[string]interface{}{
|
|
"search_categories": map[string]interface{}{
|
|
"room_events": map[string]interface{}{
|
|
"filters": roomList,
|
|
"search_term": searchTerm,
|
|
},
|
|
},
|
|
}))
|
|
|
|
router.ServeHTTP(w, rq)
|
|
assert.Equal(t, 200, w.Code)
|
|
defer w.Result().Body.Close()
|
|
body, err := io.ReadAll(w.Result().Body)
|
|
assert.NoError(t, err)
|
|
return body
|
|
}
|
|
func syncUntil(t *testing.T,
|
|
routers httputil.Routers, accessToken string,
|
|
skip bool,
|
|
checkFunc func(syncBody string) bool,
|
|
) {
|
|
t.Helper()
|
|
if checkFunc == nil {
|
|
t.Fatalf("No checkFunc defined")
|
|
}
|
|
if skip {
|
|
return
|
|
}
|
|
// loop on /sync until we receive the last send message or timeout after 5 seconds, since we don't know if the message made it
|
|
// to the syncAPI when hitting /sync
|
|
done := make(chan bool)
|
|
defer close(done)
|
|
go func() {
|
|
for {
|
|
w := httptest.NewRecorder()
|
|
routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
|
|
"access_token": accessToken,
|
|
"timeout": "1000",
|
|
})))
|
|
if checkFunc(w.Body.String()) {
|
|
done <- true
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
select {
|
|
case <-done:
|
|
case <-time.After(time.Second * 5):
|
|
t.Fatalf("Timed out waiting for messages")
|
|
}
|
|
}
|
|
|
|
func toNATSMsgs(t *testing.T, cfg *config.Dendrite, input ...*rstypes.HeaderedEvent) []*nats.Msg {
|
|
result := make([]*nats.Msg, len(input))
|
|
for i, ev := range input {
|
|
var addsStateIDs []string
|
|
if ev.StateKey() != nil {
|
|
addsStateIDs = append(addsStateIDs, ev.EventID())
|
|
}
|
|
result[i] = testrig.NewOutputEventMsg(t, cfg, ev.RoomID().String(), api.OutputEvent{
|
|
Type: rsapi.OutputTypeNewRoomEvent,
|
|
NewRoomEvent: &rsapi.OutputNewRoomEvent{
|
|
Event: ev,
|
|
AddsStateEventIDs: addsStateIDs,
|
|
HistoryVisibility: ev.Visibility,
|
|
},
|
|
})
|
|
}
|
|
return result
|
|
}
|