summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.circleci/config.yml60
-rwxr-xr-x.circleci/merge_base_branch.sh13
-rw-r--r--.gitignore2
-rw-r--r--.travis.yml8
-rw-r--r--CHANGES.md84
-rw-r--r--CONTRIBUTING.rst38
-rw-r--r--MANIFEST.in8
-rw-r--r--MAP.rst35
-rw-r--r--README.rst112
-rw-r--r--UPGRADE.rst6
-rw-r--r--contrib/grafana/synapse.json592
-rw-r--r--docker/Dockerfile72
-rw-r--r--docker/Dockerfile-pgtests12
-rw-r--r--docker/README.md1
-rw-r--r--docker/conf/homeserver.yaml2
-rwxr-xr-xdocker/run_pg_tests.sh20
-rwxr-xr-xdocker/start.py3
-rwxr-xr-xjenkins-dendron-haproxy-postgres.sh23
-rwxr-xr-xjenkins-dendron-postgres.sh20
-rwxr-xr-xjenkins-flake8.sh22
-rwxr-xr-xjenkins-postgres.sh18
-rwxr-xr-xjenkins-sqlite.sh16
-rwxr-xr-xjenkins-unittests.sh30
-rwxr-xr-xjenkins/clone.sh44
-rwxr-xr-xscripts-dev/copyrighter-sql.pl33
-rwxr-xr-xscripts-dev/copyrighter.pl33
-rwxr-xr-xscripts-dev/dump_macaroon.py2
-rwxr-xr-xscripts-dev/next_github_number.sh9
-rwxr-xr-xscripts/register_new_matrix_user18
-rw-r--r--synapse/__init__.py2
-rw-r--r--synapse/api/filtering.py2
-rw-r--r--synapse/api/urls.py2
-rw-r--r--synapse/app/__init__.py2
-rw-r--r--synapse/app/appservice.py3
-rw-r--r--synapse/app/client_reader.py3
-rw-r--r--synapse/app/event_creator.py3
-rw-r--r--synapse/app/federation_reader.py3
-rw-r--r--synapse/app/federation_sender.py3
-rw-r--r--synapse/app/frontend_proxy.py3
-rwxr-xr-xsynapse/app/homeserver.py10
-rw-r--r--synapse/app/media_repository.py3
-rw-r--r--synapse/app/pusher.py3
-rw-r--r--synapse/app/synchrotron.py3
-rwxr-xr-xsynapse/app/synctl.py284
-rw-r--r--synapse/app/user_dir.py3
-rw-r--r--synapse/config/__main__.py2
-rw-r--r--synapse/event_auth.py4
-rw-r--r--synapse/events/__init__.py13
-rw-r--r--synapse/federation/federation_client.py38
-rw-r--r--synapse/federation/federation_server.py34
-rw-r--r--synapse/federation/transaction_queue.py36
-rw-r--r--synapse/handlers/directory.py19
-rw-r--r--synapse/handlers/e2e_keys.py2
-rw-r--r--synapse/handlers/federation.py387
-rw-r--r--synapse/handlers/message.py25
-rw-r--r--synapse/handlers/profile.py2
-rw-r--r--synapse/handlers/register.py1
-rw-r--r--synapse/handlers/room_member.py5
-rw-r--r--synapse/handlers/sync.py13
-rw-r--r--synapse/handlers/typing.py23
-rw-r--r--synapse/http/client.py4
-rw-r--r--synapse/http/endpoint.py13
-rw-r--r--synapse/http/matrixfederationclient.py403
-rw-r--r--synapse/http/request_metrics.py22
-rw-r--r--synapse/http/server.py49
-rw-r--r--synapse/http/site.py35
-rw-r--r--synapse/metrics/background_process_metrics.py6
-rw-r--r--synapse/notifier.py29
-rw-r--r--synapse/push/mailer.py2
-rw-r--r--synapse/python_dependencies.py34
-rw-r--r--synapse/rest/media/v1/download_resource.py1
-rw-r--r--synapse/rest/media/v1/preview_url_resource.py1
-rw-r--r--synapse/state/__init__.py9
-rw-r--r--synapse/state/v1.py14
-rw-r--r--synapse/storage/client_ips.py34
-rw-r--r--synapse/storage/directory.py1
-rw-r--r--synapse/storage/monthly_active_users.py5
-rw-r--r--synapse/storage/state.py30
-rw-r--r--synapse/storage/transactions.py31
-rw-r--r--synapse/util/async_helpers.py88
-rw-r--r--synapse/util/caches/__init__.py27
-rw-r--r--synapse/util/caches/expiringcache.py40
-rw-r--r--synapse/util/logcontext.py41
-rw-r--r--synapse/util/retryutils.py2
-rwxr-xr-x[l---------]synctl295
-rwxr-xr-xtest_postgresql.sh12
-rw-r--r--tests/http/test_fedclient.py43
-rw-r--r--tests/replication/slave/storage/_base.py35
-rw-r--r--tests/replication/slave/storage/test_events.py6
-rw-r--r--tests/server.py89
-rw-r--r--tests/server_notices/test_consent.py100
-rw-r--r--tests/storage/test_client_ips.py158
-rw-r--r--tests/storage/test_state.py39
-rw-r--r--tests/storage/test_transactions.py45
-rw-r--r--tests/test_federation.py112
-rw-r--r--tests/test_server.py74
-rw-r--r--tests/unittest.py88
-rw-r--r--tests/util/test_expiring_cache.py1
-rw-r--r--tests/util/test_logcontext.py5
-rw-r--r--tests/utils.py137
-rw-r--r--tox.ini32
101 files changed, 2745 insertions, 1719 deletions
diff --git a/.circleci/config.yml b/.circleci/config.yml
index 605430fb..ec3848b0 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -1,5 +1,27 @@
version: 2
jobs:
+ dockerhubuploadrelease:
+ machine: true
+ steps:
+ - checkout
+ - run: docker build -f docker/Dockerfile -t matrixdotorg/synapse:${CIRCLE_TAG} .
+ - run: docker build -f docker/Dockerfile -t matrixdotorg/synapse:${CIRCLE_TAG}-py3 --build-arg PYTHON_VERSION=3.6 .
+ - run: docker login --username $DOCKER_HUB_USERNAME --password $DOCKER_HUB_PASSWORD
+ - run: docker push matrixdotorg/synapse:${CIRCLE_TAG}
+ - run: docker push matrixdotorg/synapse:${CIRCLE_TAG}-py3
+ dockerhubuploadlatest:
+ machine: true
+ steps:
+ - checkout
+ - run: docker build -f docker/Dockerfile -t matrixdotorg/synapse:${CIRCLE_SHA1} .
+ - run: docker build -f docker/Dockerfile -t matrixdotorg/synapse:${CIRCLE_SHA1}-py3 --build-arg PYTHON_VERSION=3.6 .
+ - run: docker login --username $DOCKER_HUB_USERNAME --password $DOCKER_HUB_PASSWORD
+ - run: docker tag matrixdotorg/synapse:${CIRCLE_SHA1} matrixdotorg/synapse:latest
+ - run: docker tag matrixdotorg/synapse:${CIRCLE_SHA1}-py3 matrixdotorg/synapse:latest-py3
+ - run: docker push matrixdotorg/synapse:${CIRCLE_SHA1}
+ - run: docker push matrixdotorg/synapse:${CIRCLE_SHA1}-py3
+ - run: docker push matrixdotorg/synapse:latest
+ - run: docker push matrixdotorg/synapse:latest-py3
sytestpy2:
machine: true
steps:
@@ -99,23 +121,45 @@ workflows:
version: 2
build:
jobs:
- - sytestpy2
- - sytestpy2postgres
- - sytestpy3
- - sytestpy3postgres
+ - sytestpy2:
+ filters:
+ branches:
+ only: /develop|master|release-.*/
+ - sytestpy2postgres:
+ filters:
+ branches:
+ only: /develop|master|release-.*/
+ - sytestpy3:
+ filters:
+ branches:
+ only: /develop|master|release-.*/
+ - sytestpy3postgres:
+ filters:
+ branches:
+ only: /develop|master|release-.*/
- sytestpy2merged:
filters:
branches:
- ignore: /develop|master/
+ ignore: /develop|master|release-.*/
- sytestpy2postgresmerged:
filters:
branches:
- ignore: /develop|master/
+ ignore: /develop|master|release-.*/
- sytestpy3merged:
filters:
branches:
- ignore: /develop|master/
+ ignore: /develop|master|release-.*/
- sytestpy3postgresmerged:
filters:
branches:
- ignore: /develop|master/
+ ignore: /develop|master|release-.*/
+ - dockerhubuploadrelease:
+ filters:
+ tags:
+ only: /v[0-9].[0-9]+.[0-9]+.*/
+ branches:
+ ignore: /.*/
+ - dockerhubuploadlatest:
+ filters:
+ branches:
+ only: master
diff --git a/.circleci/merge_base_branch.sh b/.circleci/merge_base_branch.sh
index 9614eb91..6b0bf3aa 100755
--- a/.circleci/merge_base_branch.sh
+++ b/.circleci/merge_base_branch.sh
@@ -9,13 +9,16 @@ source $BASH_ENV
if [[ -z "${CIRCLE_PR_NUMBER}" ]]
then
- echo "Can't figure out what the PR number is!"
- exit 1
+ echo "Can't figure out what the PR number is! Assuming merge target is develop."
+
+ # It probably hasn't had a PR opened yet. Since all PRs land on develop, we
+ # can probably assume it's based on it and will be merged into it.
+ GITBASE="develop"
+else
+ # Get the reference, using the GitHub API
+ GITBASE=`curl -q https://api.github.com/repos/matrix-org/synapse/pulls/${CIRCLE_PR_NUMBER} | jq -r '.base.ref'`
fi
-# Get the reference, using the GitHub API
-GITBASE=`curl -q https://api.github.com/repos/matrix-org/synapse/pulls/${CIRCLE_PR_NUMBER} | jq -r '.base.ref'`
-
# Show what we are before
git show -s
diff --git a/.gitignore b/.gitignore
index 17181853..3b2252ad 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,9 +1,11 @@
*.pyc
.*.swp
*~
+*.lock
.DS_Store
_trial_temp/
+_trial_temp*/
logs/
dbs/
*.egg
diff --git a/.travis.yml b/.travis.yml
index b3ee66da..2077f6af 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -21,6 +21,9 @@ matrix:
env: TOX_ENV=py27
- python: 2.7
+ env: TOX_ENV=py27-old
+
+ - python: 2.7
env: TOX_ENV=py27-postgres TRIAL_FLAGS="-j 4"
services:
- postgresql
@@ -32,6 +35,11 @@ matrix:
env: TOX_ENV=py36
- python: 3.6
+ env: TOX_ENV=py36-postgres TRIAL_FLAGS="-j 4"
+ services:
+ - postgresql
+
+ - python: 3.6
env: TOX_ENV=check_isort
- python: 3.6
diff --git a/CHANGES.md b/CHANGES.md
index 45d3cdb1..048b9f95 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -1,11 +1,91 @@
+Synapse 0.33.6 (2018-10-04)
+===========================
+
+Internal Changes
+----------------
+
+- Pin to prometheus_client<0.4 to avoid renaming all of our metrics ([\#4002](https://github.com/matrix-org/synapse/issues/4002))
+
+
+Synapse 0.33.6rc1 (2018-10-03)
+==============================
+
+Features
+--------
+
+- Adding the ability to change MAX_UPLOAD_SIZE for the docker container variables. ([\#3883](https://github.com/matrix-org/synapse/issues/3883))
+- Report "python_version" in the phone home stats ([\#3894](https://github.com/matrix-org/synapse/issues/3894))
+- Always LL ourselves if we're in a room ([\#3916](https://github.com/matrix-org/synapse/issues/3916))
+- Include eventid in log lines when processing incoming federation transactions ([\#3959](https://github.com/matrix-org/synapse/issues/3959))
+- Remove spurious check which made 'localhost' servers not work ([\#3964](https://github.com/matrix-org/synapse/issues/3964))
+
+
+Bugfixes
+--------
+
+- Fix problem when playing media from Chrome using direct URL (thanks @remjey!) ([\#3578](https://github.com/matrix-org/synapse/issues/3578))
+- support registering regular users non-interactively with register_new_matrix_user script ([\#3836](https://github.com/matrix-org/synapse/issues/3836))
+- Fix broken invite email links for self hosted riots ([\#3868](https://github.com/matrix-org/synapse/issues/3868))
+- Don't ratelimit autojoins ([\#3879](https://github.com/matrix-org/synapse/issues/3879))
+- Fix 500 error when deleting unknown room alias ([\#3889](https://github.com/matrix-org/synapse/issues/3889))
+- Fix some b'abcd' noise in logs and metrics ([\#3892](https://github.com/matrix-org/synapse/issues/3892), [\#3895](https://github.com/matrix-org/synapse/issues/3895))
+- When we join a room, always try the server we used for the alias lookup first, to avoid unresponsive and out-of-date servers. ([\#3899](https://github.com/matrix-org/synapse/issues/3899))
+- Fix incorrect server-name indication for outgoing federation requests ([\#3907](https://github.com/matrix-org/synapse/issues/3907))
+- Fix adding client IPs to the database failing on Python 3. ([\#3908](https://github.com/matrix-org/synapse/issues/3908))
+- Fix bug where things occaisonally were not being timed out correctly. ([\#3910](https://github.com/matrix-org/synapse/issues/3910))
+- Fix bug where outbound federation would stop talking to some servers when using workers ([\#3914](https://github.com/matrix-org/synapse/issues/3914))
+- Fix some instances of ExpiringCache not expiring cache items ([\#3932](https://github.com/matrix-org/synapse/issues/3932), [\#3980](https://github.com/matrix-org/synapse/issues/3980))
+- Fix out-of-bounds error when LLing yourself ([\#3936](https://github.com/matrix-org/synapse/issues/3936))
+- Sending server notices regarding user consent now works on Python 3. ([\#3938](https://github.com/matrix-org/synapse/issues/3938))
+- Fix exceptions from metrics handler ([\#3956](https://github.com/matrix-org/synapse/issues/3956))
+- Fix error message for events with m.room.create missing from auth_events ([\#3960](https://github.com/matrix-org/synapse/issues/3960))
+- Fix errors due to concurrent monthly_active_user upserts ([\#3961](https://github.com/matrix-org/synapse/issues/3961))
+- Fix exceptions when processing incoming events over federation ([\#3968](https://github.com/matrix-org/synapse/issues/3968))
+- Replaced all occurences of e.message with str(e). Contributed by Schnuffle ([\#3970](https://github.com/matrix-org/synapse/issues/3970))
+- Fix lazy loaded sync in the presence of rejected state events ([\#3986](https://github.com/matrix-org/synapse/issues/3986))
+- Fix error when logging incomplete HTTP requests ([\#3990](https://github.com/matrix-org/synapse/issues/3990))
+
+
+Internal Changes
+----------------
+
+- Unit tests can now be run under PostgreSQL in Docker using ``test_postgresql.sh``. ([\#3699](https://github.com/matrix-org/synapse/issues/3699))
+- Speed up calculation of typing updates for replication ([\#3794](https://github.com/matrix-org/synapse/issues/3794))
+- Remove documentation regarding installation on Cygwin, the use of WSL is recommended instead. ([\#3873](https://github.com/matrix-org/synapse/issues/3873))
+- Fix typo in README, synaspse -> synapse ([\#3897](https://github.com/matrix-org/synapse/issues/3897))
+- Increase the timeout when filling missing events in federation requests ([\#3903](https://github.com/matrix-org/synapse/issues/3903))
+- Improve the logging when handling a federation transaction ([\#3904](https://github.com/matrix-org/synapse/issues/3904), [\#3966](https://github.com/matrix-org/synapse/issues/3966))
+- Improve logging of outbound federation requests ([\#3906](https://github.com/matrix-org/synapse/issues/3906), [\#3909](https://github.com/matrix-org/synapse/issues/3909))
+- Fix the docker image building on python 3 ([\#3911](https://github.com/matrix-org/synapse/issues/3911))
+- Add a regression test for logging failed HTTP requests on Python 3. ([\#3912](https://github.com/matrix-org/synapse/issues/3912))
+- Comments and interface cleanup for on_receive_pdu ([\#3924](https://github.com/matrix-org/synapse/issues/3924))
+- Fix spurious exceptions when remote http client closes conncetion ([\#3925](https://github.com/matrix-org/synapse/issues/3925))
+- Log exceptions thrown by background tasks ([\#3927](https://github.com/matrix-org/synapse/issues/3927))
+- Add a cache to get_destination_retry_timings ([\#3933](https://github.com/matrix-org/synapse/issues/3933), [\#3991](https://github.com/matrix-org/synapse/issues/3991))
+- Automate pushes to docker hub ([\#3946](https://github.com/matrix-org/synapse/issues/3946))
+- Require attrs 16.0.0 or later ([\#3947](https://github.com/matrix-org/synapse/issues/3947))
+- Fix incompatibility with python3 on alpine ([\#3948](https://github.com/matrix-org/synapse/issues/3948))
+- Run the test suite on the oldest supported versions of our dependencies in CI. ([\#3952](https://github.com/matrix-org/synapse/issues/3952))
+- CircleCI now only runs merged jobs on PRs, and commit jobs on develop, master, and release branches. ([\#3957](https://github.com/matrix-org/synapse/issues/3957))
+- Fix docstrings and add tests for state store methods ([\#3958](https://github.com/matrix-org/synapse/issues/3958))
+- fix docstring for FederationClient.get_state_for_room ([\#3963](https://github.com/matrix-org/synapse/issues/3963))
+- Run notify_app_services as a bg process ([\#3965](https://github.com/matrix-org/synapse/issues/3965))
+- Clarifications in FederationHandler ([\#3967](https://github.com/matrix-org/synapse/issues/3967))
+- Further reduce the docker image size ([\#3972](https://github.com/matrix-org/synapse/issues/3972))
+- Build py3 docker images for docker hub too ([\#3976](https://github.com/matrix-org/synapse/issues/3976))
+- Updated the installation instructions to point to the matrix-synapse package on PyPI. ([\#3985](https://github.com/matrix-org/synapse/issues/3985))
+- Disable USE_FROZEN_DICTS for unittests by default. ([\#3987](https://github.com/matrix-org/synapse/issues/3987))
+- Remove unused Jenkins and development related files from the repo. ([\#3988](https://github.com/matrix-org/synapse/issues/3988))
+- Improve stacktraces in certain exceptions in the logs ([\#3989](https://github.com/matrix-org/synapse/issues/3989))
+
+
Synapse 0.33.5.1 (2018-09-25)
=============================
Internal Changes
----------------
-- Fix incompatibility with older Twisted version in tests. Thanks
- @OlegGirko! ([\#3940](https://github.com/matrix-org/synapse/issues/3940))
+- Fix incompatibility with older Twisted version in tests. Thanks @OlegGirko! ([\#3940](https://github.com/matrix-org/synapse/issues/3940))
Synapse 0.33.5 (2018-09-24)
diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst
index f9de78a4..6ef7d48d 100644
--- a/CONTRIBUTING.rst
+++ b/CONTRIBUTING.rst
@@ -30,12 +30,28 @@ use github's pull request workflow to review the contribution, and either ask
you to make any refinements needed or merge it and make them ourselves. The
changes will then land on master when we next do a release.
-We use `Jenkins <http://matrix.org/jenkins>`_ and
-`Travis <https://travis-ci.org/matrix-org/synapse>`_ for continuous
-integration. All pull requests to synapse get automatically tested by Travis;
-the Jenkins builds require an adminstrator to start them. If your change
-breaks the build, this will be shown in github, so please keep an eye on the
-pull request for feedback.
+We use `CircleCI <https://circleci.com/gh/matrix-org>`_ and `Travis CI
+<https://travis-ci.org/matrix-org/synapse>`_ for continuous integration. All
+pull requests to synapse get automatically tested by Travis and CircleCI.
+If your change breaks the build, this will be shown in GitHub, so please
+keep an eye on the pull request for feedback.
+
+To run unit tests in a local development environment, you can use:
+
+- ``tox -e py27`` (requires tox to be installed by ``pip install tox``) for
+ SQLite-backed Synapse on Python 2.7.
+- ``tox -e py35`` for SQLite-backed Synapse on Python 3.5.
+- ``tox -e py36`` for SQLite-backed Synapse on Python 3.6.
+- ``tox -e py27-postgres`` for PostgreSQL-backed Synapse on Python 2.7
+ (requires a running local PostgreSQL with access to create databases).
+- ``./test_postgresql.sh`` for PostgreSQL-backed Synapse on Python 2.7
+ (requires Docker). Entirely self-contained, recommended if you don't want to
+ set up PostgreSQL yourself.
+
+Docker images are available for running the integration tests (SyTest) locally,
+see the `documentation in the SyTest repo
+<https://github.com/matrix-org/sytest/blob/develop/docker/README.md>`_ for more
+information.
Code style
~~~~~~~~~~
@@ -77,7 +93,8 @@ AUTHORS.rst file for the project in question. Please feel free to include a
change to AUTHORS.rst in your pull request to list yourself and a short
description of the area(s) you've worked on. Also, we sometimes have swag to
give away to contributors - if you feel that Matrix-branded apparel is missing
-from your life, please mail us your shipping address to matrix at matrix.org and we'll try to fix it :)
+from your life, please mail us your shipping address to matrix at matrix.org and
+we'll try to fix it :)
Sign off
~~~~~~~~
@@ -144,4 +161,9 @@ flag to ``git commit``, which uses the name and email set in your
Conclusion
~~~~~~~~~~
-That's it! Matrix is a very open and collaborative project as you might expect given our obsession with open communication. If we're going to successfully matrix together all the fragmented communication technologies out there we are reliant on contributions and collaboration from the community to do so. So please get involved - and we hope you have as much fun hacking on Matrix as we do!
+That's it! Matrix is a very open and collaborative project as you might expect
+given our obsession with open communication. If we're going to successfully
+matrix together all the fragmented communication technologies out there we are
+reliant on contributions and collaboration from the community to do so. So
+please get involved - and we hope you have as much fun hacking on Matrix as we
+do!
diff --git a/MANIFEST.in b/MANIFEST.in
index e0826ba5..c6a37ac6 100644
--- a/MANIFEST.in
+++ b/MANIFEST.in
@@ -23,12 +23,9 @@ recursive-include synapse/static *.gif
recursive-include synapse/static *.html
recursive-include synapse/static *.js
-exclude jenkins.sh
-exclude jenkins*.sh
-exclude jenkins*
exclude Dockerfile
exclude .dockerignore
-recursive-exclude jenkins *.sh
+exclude test_postgresql.sh
include pyproject.toml
recursive-include changelog.d *
@@ -37,3 +34,6 @@ prune .github
prune demo/etc
prune docker
prune .circleci
+
+exclude jenkins*
+recursive-exclude jenkins *.sh
diff --git a/MAP.rst b/MAP.rst
deleted file mode 100644
index 0f8e9818..00000000
--- a/MAP.rst
+++ /dev/null
@@ -1,35 +0,0 @@
-Directory Structure
-===================
-
-Warning: this may be a bit stale...
-
-::
-
- .
- ├── cmdclient Basic CLI python Matrix client
- ├── demo Scripts for running standalone Matrix demos
- ├── docs All doc, including the draft Matrix API spec
- │   ├── client-server The client-server Matrix API spec
- │   ├── model Domain-specific elements of the Matrix API spec
- │   ├── server-server The server-server model of the Matrix API spec
- │   └── sphinx The internal API doc of the Synapse homeserver
- ├── experiments Early experiments of using Synapse's internal APIs
- ├── graph Visualisation of Matrix's distributed message store
- ├── synapse The reference Matrix homeserver implementation
- │   ├── api Common building blocks for the APIs
- │   │   ├── events Definition of state representation Events
- │   │   └── streams Definition of streamable Event objects
- │   ├── app The __main__ entry point for the homeserver
- │   ├── crypto The PKI client/server used for secure federation
- │   │   └── resource PKI helper objects (e.g. keys)
- │   ├── federation Server-server state replication logic
- │   ├── handlers The main business logic of the homeserver
- │   ├── http Wrappers around Twisted's HTTP server & client
- │   ├── rest Servlet-style RESTful API
- │   ├── storage Persistence subsystem (currently only sqlite3)
- │   │   └── schema sqlite persistence schema
- │   └── util Synapse-specific utilities
- ├── tests Unit tests for the Synapse homeserver
- └── webclient Basic AngularJS Matrix web client
-
-
diff --git a/README.rst b/README.rst
index cfcf8b52..e1ea351f 100644
--- a/README.rst
+++ b/README.rst
@@ -81,7 +81,7 @@ Thanks for using Matrix!
Synapse Installation
====================
-Synapse is the reference python/twisted Matrix homeserver implementation.
+Synapse is the reference Python/Twisted Matrix homeserver implementation.
System requirements:
@@ -91,12 +91,13 @@ System requirements:
Installing from source
----------------------
+
(Prebuilt packages are available for some platforms - see `Platform-Specific
Instructions`_.)
-Synapse is written in python but some of the libraries it uses are written in
-C. So before we can install synapse itself we need a working C compiler and the
-header files for python C extensions.
+Synapse is written in Python but some of the libraries it uses are written in
+C. So before we can install Synapse itself we need a working C compiler and the
+header files for Python C extensions.
Installing prerequisites on Ubuntu or Debian::
@@ -143,21 +144,27 @@ Installing prerequisites on OpenBSD::
doas pkg_add python libffi py-pip py-setuptools sqlite3 py-virtualenv \
libxslt
-To install the synapse homeserver run::
+To install the Synapse homeserver run::
virtualenv -p python2.7 ~/.synapse
source ~/.synapse/bin/activate
pip install --upgrade pip
pip install --upgrade setuptools
- pip install https://github.com/matrix-org/synapse/tarball/master
+ pip install matrix-synapse
-This installs synapse, along with the libraries it uses, into a virtual
+This installs Synapse, along with the libraries it uses, into a virtual
environment under ``~/.synapse``. Feel free to pick a different directory
if you prefer.
+This Synapse installation can then be later upgraded by using pip again with the
+update flag::
+
+ source ~/.synapse/bin/activate
+ pip install -U matrix-synapse
+
In case of problems, please see the _`Troubleshooting` section below.
-There is an offical synapse image available at
+There is an offical synapse image available at
https://hub.docker.com/r/matrixdotorg/synapse/tags/ which can be used with
the docker-compose file available at `contrib/docker <contrib/docker>`_. Further information on
this including configuration options is available in the README on
@@ -167,7 +174,7 @@ Alternatively, Andreas Peters (previously Silvio Fricke) has contributed a
Dockerfile to automate a synapse server in a single Docker image, at
https://hub.docker.com/r/avhost/docker-matrix/tags/
-Configuring synapse
+Configuring Synapse
-------------------
Before you can start Synapse, you will need to generate a configuration
@@ -249,26 +256,6 @@ Setting up a TURN server
For reliable VoIP calls to be routed via this homeserver, you MUST configure
a TURN server. See `<docs/turn-howto.rst>`_ for details.
-IPv6
-----
-
-As of Synapse 0.19 we finally support IPv6, many thanks to @kyrias and @glyph
-for providing PR #1696.
-
-However, for federation to work on hosts with IPv6 DNS servers you **must**
-be running Twisted 17.1.0 or later - see https://github.com/matrix-org/synapse/issues/1002
-for details. We can't make Synapse depend on Twisted 17.1 by default
-yet as it will break most older distributions (see https://github.com/matrix-org/synapse/pull/1909)
-so if you are using operating system dependencies you'll have to install your
-own Twisted 17.1 package via pip or backports etc.
-
-If you're running in a virtualenv then pip should have installed the newest
-Twisted automatically, but if your virtualenv is old you will need to manually
-upgrade to a newer Twisted dependency via:
-
- pip install Twisted>=17.1.0
-
-
Running Synapse
===============
@@ -444,8 +431,7 @@ settings require a slightly more difficult installation process.
using the ``.`` command, rather than ``bash``'s ``source``.
5) Optionally, use ``pip`` to install ``lxml``, which Synapse needs to parse
webpages for their titles.
-6) Use ``pip`` to install this repository: ``pip install
- https://github.com/matrix-org/synapse/tarball/master``
+6) Use ``pip`` to install this repository: ``pip install matrix-synapse``
7) Optionally, change ``_synapse``'s shell to ``/bin/false`` to reduce the
chance of a compromised Synapse server being used to take over your box.
@@ -459,37 +445,13 @@ https://github.com/NixOS/nixpkgs/blob/master/nixos/modules/services/misc/matrix-
Windows Install
---------------
-Synapse can be installed on Cygwin. It requires the following Cygwin packages:
-
-- gcc
-- git
-- libffi-devel
-- openssl (and openssl-devel, python-openssl)
-- python
-- python-setuptools
-
-The content repository requires additional packages and will be unable to process
-uploads without them:
-
-- libjpeg8
-- libjpeg8-devel
-- zlib
-
-If you choose to install Synapse without these packages, you will need to reinstall
-``pillow`` for changes to be applied, e.g. ``pip uninstall pillow`` ``pip install
-pillow --user``
-
-Troubleshooting:
-
-- You may need to upgrade ``setuptools`` to get this to work correctly:
- ``pip install setuptools --upgrade``.
-- You may encounter errors indicating that ``ffi.h`` is missing, even with
- ``libffi-devel`` installed. If you do, copy the ``.h`` files:
- ``cp /usr/lib/libffi-3.0.13/include/*.h /usr/include``
-- You may need to install libsodium from source in order to install PyNacl. If
- you do, you may need to create a symlink to ``libsodium.a`` so ``ld`` can find
- it: ``ln -s /usr/local/lib/libsodium.a /usr/lib/libsodium.a``
+If you wish to run or develop Synapse on Windows, the Windows Subsystem For
+Linux provides a Linux environment on Windows 10 which is capable of using the
+Debian, Fedora, or source installation methods. More information about WSL can
+be found at https://docs.microsoft.com/en-us/windows/wsl/install-win10 for
+Windows 10 and https://docs.microsoft.com/en-us/windows/wsl/install-on-server
+for Windows Server.
Troubleshooting
===============
@@ -497,7 +459,7 @@ Troubleshooting
Troubleshooting Installation
----------------------------
-Synapse requires pip 1.7 or later, so if your OS provides too old a version you
+Synapse requires pip 8 or later, so if your OS provides too old a version you
may need to manually upgrade it::
sudo pip install --upgrade pip
@@ -532,28 +494,6 @@ failing, e.g.::
pip install twisted
-On OS X, if you encounter clang: error: unknown argument: '-mno-fused-madd' you
-will need to export CFLAGS=-Qunused-arguments.
-
-Troubleshooting Running
------------------------
-
-If synapse fails with ``missing "sodium.h"`` crypto errors, you may need
-to manually upgrade PyNaCL, as synapse uses NaCl (https://nacl.cr.yp.to/) for
-encryption and digital signatures.
-Unfortunately PyNACL currently has a few issues
-(https://github.com/pyca/pynacl/issues/53) and
-(https://github.com/pyca/pynacl/issues/79) that mean it may not install
-correctly, causing all tests to fail with errors about missing "sodium.h". To
-fix try re-installing from PyPI or directly from
-(https://github.com/pyca/pynacl)::
-
- # Install from PyPI
- pip install --user --upgrade --force pynacl
-
- # Install from github
- pip install --user https://github.com/pyca/pynacl/tarball/master
-
Running out of File Handles
~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -908,7 +848,7 @@ to install using pip and a virtualenv::
virtualenv -p python2.7 env
source env/bin/activate
- python synapse/python_dependencies.py | xargs pip install
+ python -m synapse.python_dependencies | xargs pip install
pip install lxml mock
This will run a process of downloading and installing all the needed
@@ -968,7 +908,7 @@ improvement in overall amount, and especially in terms of giving back RAM
to the OS. To use it, the library must simply be put in the LD_PRELOAD
environment variable when launching Synapse. On Debian, this can be done
by installing the ``libjemalloc1`` package and adding this line to
-``/etc/default/matrix-synaspse``::
+``/etc/default/matrix-synapse``::
LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so.1
diff --git a/UPGRADE.rst b/UPGRADE.rst
index f6bb1070..6cf3169f 100644
--- a/UPGRADE.rst
+++ b/UPGRADE.rst
@@ -18,7 +18,7 @@ instructions that may be required are listed later in this document.
.. code:: bash
- pip install --upgrade --process-dependency-links https://github.com/matrix-org/synapse/tarball/master
+ pip install --upgrade --process-dependency-links matrix-synapse
# restart synapse
synctl restart
@@ -48,11 +48,11 @@ returned by the Client-Server API:
# configured on port 443.
curl -kv https://<host.name>/_matrix/client/versions 2>&1 | grep "Server:"
-Upgrading to $NEXT_VERSION
+Upgrading to v0.27.3
====================
This release expands the anonymous usage stats sent if the opt-in
-``report_stats`` configuration is set to ``true``. We now capture RSS memory
+``report_stats`` configuration is set to ``true``. We now capture RSS memory
and cpu use at a very coarse level. This requires administrators to install
the optional ``psutil`` python module.
diff --git a/contrib/grafana/synapse.json b/contrib/grafana/synapse.json
index c5861259..dc3f4a1d 100644
--- a/contrib/grafana/synapse.json
+++ b/contrib/grafana/synapse.json
@@ -14,7 +14,7 @@
"type": "grafana",
"id": "grafana",
"name": "Grafana",
- "version": "5.2.0"
+ "version": "5.2.4"
},
{
"type": "panel",
@@ -54,7 +54,7 @@
"gnetId": null,
"graphTooltip": 0,
"id": null,
- "iteration": 1533598785368,
+ "iteration": 1537878047048,
"links": [
{
"asDropdown": true,
@@ -86,7 +86,7 @@
"bars": false,
"dashLength": 10,
"dashes": false,
- "datasource": "${DS_PROMETHEUS}",
+ "datasource": "$datasource",
"fill": 1,
"gridPos": {
"h": 9,
@@ -118,7 +118,7 @@
"steppedLine": false,
"targets": [
{
- "expr": "process_cpu_seconds:rate2m{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}",
+ "expr": "rate(process_cpu_seconds_total{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])",
"format": "time_series",
"intervalFactor": 1,
"legendFormat": "{{job}}-{{index}} ",
@@ -179,7 +179,7 @@
"mode": "spectrum"
},
"dataFormat": "tsbuckets",
- "datasource": "${DS_PROMETHEUS}",
+ "datasource": "$datasource",
"gridPos": {
"h": 9,
"w": 12,
@@ -525,7 +525,7 @@
"x": 0,
"y": 25
},
- "id": 48,
+ "id": 50,
"legend": {
"avg": false,
"current": false,
@@ -549,8 +549,9 @@
"steppedLine": false,
"targets": [
{
- "expr": "rate(synapse_storage_schedule_time_sum{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])/rate(synapse_storage_schedule_time_count[$bucket_size])",
+ "expr": "rate(python_twisted_reactor_tick_time_sum{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])/rate(python_twisted_reactor_tick_time_count[$bucket_size])",
"format": "time_series",
+ "interval": "",
"intervalFactor": 2,
"legendFormat": "{{job}}-{{index}}",
"refId": "A",
@@ -560,7 +561,7 @@
"thresholds": [],
"timeFrom": null,
"timeShift": null,
- "title": "Avg time waiting for db conn",
+ "title": "Avg reactor tick time",
"tooltip": {
"shared": true,
"sort": 0,
@@ -576,12 +577,11 @@
},
"yaxes": [
{
- "decimals": null,
"format": "s",
- "label": "",
+ "label": null,
"logBase": 1,
"max": null,
- "min": "0",
+ "min": null,
"show": true
},
{
@@ -604,6 +604,7 @@
"dashLength": 10,
"dashes": false,
"datasource": "$datasource",
+ "description": "Shows the time in which the given percentage of reactor ticks completed, over the sampled timespan",
"fill": 1,
"gridPos": {
"h": 7,
@@ -611,7 +612,7 @@
"x": 12,
"y": 25
},
- "id": 49,
+ "id": 105,
"legend": {
"avg": false,
"current": false,
@@ -629,33 +630,47 @@
"pointradius": 5,
"points": false,
"renderer": "flot",
- "seriesOverrides": [
- {
- "alias": "/^up/",
- "legend": false,
- "yaxis": 2
- }
- ],
+ "seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
- "expr": "scrape_duration_seconds{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}",
+ "expr": "histogram_quantile(0.99, rate(python_twisted_reactor_tick_time_bucket{index=~\"$index\",instance=\"$instance\",job=~\"$job\"}[$bucket_size]))",
"format": "time_series",
"interval": "",
"intervalFactor": 2,
- "legendFormat": "{{job}}-{{index}}",
+ "legendFormat": "{{job}}-{{index}} 99%",
"refId": "A",
"step": 20
+ },
+ {
+ "expr": "histogram_quantile(0.95, rate(python_twisted_reactor_tick_time_bucket{index=~\"$index\",instance=\"$instance\",job=~\"$job\"}[$bucket_size]))",
+ "format": "time_series",
+ "intervalFactor": 1,
+ "legendFormat": "{{job}}-{{index}} 95%",
+ "refId": "B"
+ },
+ {
+ "expr": "histogram_quantile(0.90, rate(python_twisted_reactor_tick_time_bucket{index=~\"$index\",instance=\"$instance\",job=~\"$job\"}[$bucket_size]))",
+ "format": "time_series",
+ "intervalFactor": 1,
+ "legendFormat": "{{job}}-{{index}} 90%",
+ "refId": "C"
+ },
+ {
+ "expr": "",
+ "format": "time_series",
+ "intervalFactor": 1,
+ "refId": "D"
}
],
"thresholds": [],
"timeFrom": null,
"timeShift": null,
- "title": "Prometheus scrape time",
+ "title": "Reactor tick quantiles",
"tooltip": {
- "shared": true,
+ "shared": false,
"sort": 0,
"value_type": "individual"
},
@@ -673,16 +688,15 @@
"label": null,
"logBase": 1,
"max": null,
- "min": "0",
+ "min": null,
"show": true
},
{
- "decimals": 0,
- "format": "none",
- "label": "",
+ "format": "short",
+ "label": null,
"logBase": 1,
- "max": "0",
- "min": "-1",
+ "max": null,
+ "min": null,
"show": false
}
],
@@ -697,14 +711,14 @@
"dashLength": 10,
"dashes": false,
"datasource": "$datasource",
- "fill": 1,
+ "fill": 0,
"gridPos": {
"h": 7,
"w": 12,
"x": 0,
"y": 32
},
- "id": 50,
+ "id": 53,
"legend": {
"avg": false,
"current": false,
@@ -728,19 +742,17 @@
"steppedLine": false,
"targets": [
{
- "expr": "rate(python_twisted_reactor_tick_time_sum{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])/rate(python_twisted_reactor_tick_time_count[$bucket_size])",
+ "expr": "min_over_time(up{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])",
"format": "time_series",
- "interval": "",
"intervalFactor": 2,
"legendFormat": "{{job}}-{{index}}",
- "refId": "A",
- "step": 20
+ "refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeShift": null,
- "title": "Avg reactor tick time",
+ "title": "Up",
"tooltip": {
"shared": true,
"sort": 0,
@@ -756,7 +768,7 @@
},
"yaxes": [
{
- "format": "s",
+ "format": "short",
"label": null,
"logBase": 1,
"max": null,
@@ -769,7 +781,7 @@
"logBase": 1,
"max": null,
"min": null,
- "show": false
+ "show": true
}
],
"yaxis": {
@@ -783,26 +795,19 @@
"dashLength": 10,
"dashes": false,
"datasource": "$datasource",
- "editable": true,
- "error": false,
"fill": 1,
- "grid": {},
"gridPos": {
"h": 7,
"w": 12,
"x": 12,
"y": 32
},
- "id": 5,
+ "id": 49,
"legend": {
- "alignAsTable": false,
"avg": false,
"current": false,
- "hideEmpty": false,
- "hideZero": false,
"max": false,
"min": false,
- "rightSide": false,
"show": true,
"total": false,
"values": false
@@ -817,10 +822,9 @@
"renderer": "flot",
"seriesOverrides": [
{
- "alias": "/user/"
- },
- {
- "alias": "/system/"
+ "alias": "/^up/",
+ "legend": false,
+ "yaxis": 2
}
],
"spaceLength": 10,
@@ -828,44 +832,19 @@
"steppedLine": false,
"targets": [
{
- "expr": "rate(process_cpu_system_seconds_total{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])",
- "format": "time_series",
- "intervalFactor": 1,
- "legendFormat": "{{job}}-{{index}} system ",
- "metric": "",
- "refId": "B",
- "step": 20
- },
- {
- "expr": "rate(process_cpu_user_seconds_total{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])",
+ "expr": "scrape_duration_seconds{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}",
"format": "time_series",
- "hide": false,
"interval": "",
- "intervalFactor": 1,
- "legendFormat": "{{job}}-{{index}} user",
+ "intervalFactor": 2,
+ "legendFormat": "{{job}}-{{index}}",
"refId": "A",
"step": 20
}
],
- "thresholds": [
- {
- "colorMode": "custom",
- "line": true,
- "lineColor": "rgba(216, 200, 27, 0.27)",
- "op": "gt",
- "value": 0.5
- },
- {
- "colorMode": "custom",
- "line": true,
- "lineColor": "rgba(234, 112, 112, 0.22)",
- "op": "gt",
- "value": 0.8
- }
- ],
+ "thresholds": [],
"timeFrom": null,
"timeShift": null,
- "title": "CPU",
+ "title": "Prometheus scrape time",
"tooltip": {
"shared": true,
"sort": 0,
@@ -881,20 +860,21 @@
},
"yaxes": [
{
- "decimals": null,
- "format": "percentunit",
- "label": "",
+ "format": "s",
+ "label": null,
"logBase": 1,
- "max": "1.2",
- "min": 0,
+ "max": null,
+ "min": "0",
"show": true
},
{
- "format": "short",
+ "decimals": 0,
+ "format": "none",
+ "label": "",
"logBase": 1,
- "max": null,
- "min": null,
- "show": true
+ "max": "0",
+ "min": "-1",
+ "show": false
}
],
"yaxis": {
@@ -907,20 +887,27 @@
"bars": false,
"dashLength": 10,
"dashes": false,
- "datasource": "${DS_PROMETHEUS}",
- "fill": 0,
+ "datasource": "$datasource",
+ "editable": true,
+ "error": false,
+ "fill": 1,
+ "grid": {},
"gridPos": {
"h": 7,
"w": 12,
"x": 0,
"y": 39
},
- "id": 53,
+ "id": 5,
"legend": {
+ "alignAsTable": false,
"avg": false,
"current": false,
+ "hideEmpty": false,
+ "hideZero": false,
"max": false,
"min": false,
+ "rightSide": false,
"show": true,
"total": false,
"values": false
@@ -933,23 +920,57 @@
"pointradius": 5,
"points": false,
"renderer": "flot",
- "seriesOverrides": [],
+ "seriesOverrides": [
+ {
+ "alias": "/user/"
+ },
+ {
+ "alias": "/system/"
+ }
+ ],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
- "expr": "min_over_time(up{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])",
+ "expr": "rate(process_cpu_system_seconds_total{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])",
"format": "time_series",
- "intervalFactor": 2,
- "legendFormat": "{{job}}-{{index}}",
- "refId": "A"
+ "intervalFactor": 1,
+ "legendFormat": "{{job}}-{{index}} system ",
+ "metric": "",
+ "refId": "B",
+ "step": 20
+ },
+ {
+ "expr": "rate(process_cpu_user_seconds_total{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])",
+ "format": "time_series",
+ "hide": false,
+ "interval": "",
+ "intervalFactor": 1,
+ "legendFormat": "{{job}}-{{index}} user",
+ "refId": "A",
+ "step": 20
+ }
+ ],
+ "thresholds": [
+ {
+ "colorMode": "custom",
+ "line": true,
+ "lineColor": "rgba(216, 200, 27, 0.27)",
+ "op": "gt",
+ "value": 0.5
+ },
+ {
+ "colorMode": "custom",
+ "line": true,
+ "lineColor": "rgba(234, 112, 112, 0.22)",
+ "op": "gt",
+ "value": 0.8
}
],
- "thresholds": [],
"timeFrom": null,
"timeShift": null,
- "title": "Up",
+ "title": "CPU",
"tooltip": {
"shared": true,
"sort": 0,
@@ -965,16 +986,16 @@
},
"yaxes": [
{
- "format": "short",
- "label": null,
+ "decimals": null,
+ "format": "percentunit",
+ "label": "",
"logBase": 1,
- "max": null,
- "min": null,
+ "max": "1.2",
+ "min": 0,
"show": true
},
{
"format": "short",
- "label": null,
"logBase": 1,
"max": null,
"min": null,
@@ -1013,7 +1034,7 @@
"h": 7,
"w": 12,
"x": 0,
- "y": 49
+ "y": 47
},
"id": 40,
"legend": {
@@ -1098,7 +1119,7 @@
"h": 7,
"w": 12,
"x": 12,
- "y": 49
+ "y": 47
},
"id": 46,
"legend": {
@@ -1187,7 +1208,7 @@
"h": 7,
"w": 12,
"x": 0,
- "y": 56
+ "y": 54
},
"id": 44,
"legend": {
@@ -1276,7 +1297,7 @@
"h": 7,
"w": 12,
"x": 12,
- "y": 56
+ "y": 54
},
"id": 45,
"legend": {
@@ -1383,7 +1404,7 @@
"h": 8,
"w": 12,
"x": 0,
- "y": 48
+ "y": 62
},
"id": 4,
"legend": {
@@ -1490,7 +1511,7 @@
"h": 8,
"w": 12,
"x": 12,
- "y": 48
+ "y": 62
},
"id": 32,
"legend": {
@@ -1578,7 +1599,7 @@
"h": 8,
"w": 12,
"x": 0,
- "y": 56
+ "y": 70
},
"id": 23,
"legend": {
@@ -1688,7 +1709,7 @@
"h": 8,
"w": 12,
"x": 12,
- "y": 56
+ "y": 70
},
"id": 52,
"legend": {
@@ -1795,7 +1816,7 @@
"h": 8,
"w": 12,
"x": 0,
- "y": 64
+ "y": 78
},
"id": 7,
"legend": {
@@ -1886,7 +1907,7 @@
"h": 8,
"w": 12,
"x": 12,
- "y": 64
+ "y": 78
},
"id": 47,
"legend": {
@@ -1969,13 +1990,13 @@
"bars": false,
"dashLength": 10,
"dashes": false,
- "datasource": "${DS_PROMETHEUS}",
+ "datasource": "$datasource",
"fill": 1,
"gridPos": {
"h": 9,
"w": 12,
"x": 0,
- "y": 72
+ "y": 86
},
"id": 103,
"legend": {
@@ -2069,13 +2090,13 @@
"bars": false,
"dashLength": 10,
"dashes": false,
- "datasource": "${DS_PROMETHEUS}",
+ "datasource": "$datasource",
"fill": 1,
"gridPos": {
"h": 9,
"w": 12,
"x": 0,
- "y": 23
+ "y": 49
},
"id": 99,
"legend": {
@@ -2154,13 +2175,13 @@
"bars": false,
"dashLength": 10,
"dashes": false,
- "datasource": "${DS_PROMETHEUS}",
+ "datasource": "$datasource",
"fill": 1,
"gridPos": {
"h": 9,
"w": 12,
"x": 12,
- "y": 23
+ "y": 49
},
"id": 101,
"legend": {
@@ -2186,17 +2207,24 @@
"steppedLine": false,
"targets": [
{
- "expr": "rate(synapse_background_process_db_txn_duration_seconds{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])",
+ "expr": "rate(synapse_background_process_db_txn_duration_seconds{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size]) + rate(synapse_background_process_db_sched_duration_seconds{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])",
"format": "time_series",
+ "hide": false,
"intervalFactor": 1,
"legendFormat": "{{job}}-{{index}} {{name}}",
"refId": "A"
+ },
+ {
+ "expr": "",
+ "format": "time_series",
+ "intervalFactor": 1,
+ "refId": "B"
}
],
"thresholds": [],
"timeFrom": null,
"timeShift": null,
- "title": "DB usage by background jobs",
+ "title": "DB usage by background jobs (including scheduling time)",
"tooltip": {
"shared": true,
"sort": 0,
@@ -2252,13 +2280,13 @@
"bars": false,
"dashLength": 10,
"dashes": false,
- "datasource": "${DS_PROMETHEUS}",
+ "datasource": "$datasource",
"fill": 1,
"gridPos": {
"h": 9,
"w": 12,
"x": 0,
- "y": 25
+ "y": 64
},
"id": 79,
"legend": {
@@ -2336,13 +2364,13 @@
"bars": false,
"dashLength": 10,
"dashes": false,
- "datasource": "${DS_PROMETHEUS}",
+ "datasource": "$datasource",
"fill": 1,
"gridPos": {
"h": 9,
"w": 12,
"x": 12,
- "y": 25
+ "y": 64
},
"id": 83,
"legend": {
@@ -2447,7 +2475,7 @@
"h": 7,
"w": 12,
"x": 0,
- "y": 23
+ "y": 65
},
"id": 51,
"legend": {
@@ -2551,6 +2579,194 @@
"dashLength": 10,
"dashes": false,
"datasource": "$datasource",
+ "fill": 1,
+ "gridPos": {
+ "h": 7,
+ "w": 12,
+ "x": 0,
+ "y": 24
+ },
+ "id": 48,
+ "legend": {
+ "avg": false,
+ "current": false,
+ "max": false,
+ "min": false,
+ "show": true,
+ "total": false,
+ "values": false
+ },
+ "lines": true,
+ "linewidth": 1,
+ "links": [],
+ "nullPointMode": "null",
+ "percentage": false,
+ "pointradius": 5,
+ "points": false,
+ "renderer": "flot",
+ "seriesOverrides": [],
+ "spaceLength": 10,
+ "stack": false,
+ "steppedLine": false,
+ "targets": [
+ {
+ "expr": "rate(synapse_storage_schedule_time_sum{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])/rate(synapse_storage_schedule_time_count[$bucket_size])",
+ "format": "time_series",
+ "intervalFactor": 2,
+ "legendFormat": "{{job}}-{{index}}",
+ "refId": "A",
+ "step": 20
+ }
+ ],
+ "thresholds": [],
+ "timeFrom": null,
+ "timeShift": null,
+ "title": "Avg time waiting for db conn",
+ "tooltip": {
+ "shared": true,
+ "sort": 0,
+ "value_type": "individual"
+ },
+ "type": "graph",
+ "xaxis": {
+ "buckets": null,
+ "mode": "time",
+ "name": null,
+ "show": true,
+ "values": []
+ },
+ "yaxes": [
+ {
+ "decimals": null,
+ "format": "s",
+ "label": "",
+ "logBase": 1,
+ "max": null,
+ "min": "0",
+ "show": true
+ },
+ {
+ "format": "short",
+ "label": null,
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": false
+ }
+ ],
+ "yaxis": {
+ "align": false,
+ "alignLevel": null
+ }
+ },
+ {
+ "aliasColors": {},
+ "bars": false,
+ "dashLength": 10,
+ "dashes": false,
+ "datasource": "$datasource",
+ "description": "Shows the time in which the given percentage of database queries were scheduled, over the sampled timespan",
+ "fill": 1,
+ "gridPos": {
+ "h": 7,
+ "w": 12,
+ "x": 12,
+ "y": 24
+ },
+ "id": 104,
+ "legend": {
+ "avg": false,
+ "current": false,
+ "max": false,
+ "min": false,
+ "show": true,
+ "total": false,
+ "values": false
+ },
+ "lines": true,
+ "linewidth": 1,
+ "links": [],
+ "nullPointMode": "null",
+ "percentage": false,
+ "pointradius": 5,
+ "points": false,
+ "renderer": "flot",
+ "seriesOverrides": [],
+ "spaceLength": 10,
+ "stack": false,
+ "steppedLine": false,
+ "targets": [
+ {
+ "expr": "histogram_quantile(0.99, rate(synapse_storage_schedule_time_bucket{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size]))",
+ "format": "time_series",
+ "hide": false,
+ "intervalFactor": 1,
+ "legendFormat": "{{job}} {{index}} 99%",
+ "refId": "A",
+ "step": 20
+ },
+ {
+ "expr": "histogram_quantile(0.95, rate(synapse_storage_schedule_time_bucket{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size]))",
+ "format": "time_series",
+ "intervalFactor": 1,
+ "legendFormat": "{{job}} {{index}} 95%",
+ "refId": "B"
+ },
+ {
+ "expr": "histogram_quantile(0.90, rate(synapse_storage_schedule_time_bucket{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size]))",
+ "format": "time_series",
+ "intervalFactor": 1,
+ "legendFormat": "{{job}} {{index}} 90%",
+ "refId": "C"
+ }
+ ],
+ "thresholds": [],
+ "timeFrom": null,
+ "timeShift": null,
+ "title": "Db scheduling time quantiles",
+ "tooltip": {
+ "shared": true,
+ "sort": 0,
+ "value_type": "individual"
+ },
+ "type": "graph",
+ "xaxis": {
+ "buckets": null,
+ "mode": "time",
+ "name": null,
+ "show": true,
+ "values": []
+ },
+ "yaxes": [
+ {
+ "decimals": null,
+ "format": "s",
+ "label": "",
+ "logBase": 1,
+ "max": null,
+ "min": "0",
+ "show": true
+ },
+ {
+ "format": "short",
+ "label": null,
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": false
+ }
+ ],
+ "yaxis": {
+ "align": false,
+ "alignLevel": null
+ }
+ },
+ {
+ "aliasColors": {},
+ "bars": false,
+ "dashLength": 10,
+ "dashes": false,
+ "datasource": "$datasource",
"editable": true,
"error": false,
"fill": 0,
@@ -2559,7 +2775,7 @@
"h": 7,
"w": 12,
"x": 0,
- "y": 25
+ "y": 31
},
"id": 10,
"legend": {
@@ -2648,7 +2864,7 @@
"h": 7,
"w": 12,
"x": 12,
- "y": 25
+ "y": 31
},
"id": 11,
"legend": {
@@ -2672,11 +2888,11 @@
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
- "stack": false,
+ "stack": true,
"steppedLine": true,
"targets": [
{
- "expr": "topk(5, rate(synapse_storage_transaction_time_sum{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size]))",
+ "expr": "rate(synapse_storage_transaction_time_sum{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])",
"format": "time_series",
"instant": false,
"interval": "",
@@ -2753,7 +2969,7 @@
"h": 13,
"w": 12,
"x": 0,
- "y": 17
+ "y": 60
},
"id": 12,
"legend": {
@@ -2841,7 +3057,7 @@
"h": 13,
"w": 12,
"x": 12,
- "y": 17
+ "y": 60
},
"id": 26,
"legend": {
@@ -2929,7 +3145,7 @@
"h": 13,
"w": 12,
"x": 0,
- "y": 30
+ "y": 73
},
"id": 13,
"legend": {
@@ -3017,7 +3233,7 @@
"h": 13,
"w": 12,
"x": 12,
- "y": 30
+ "y": 73
},
"id": 27,
"legend": {
@@ -3105,7 +3321,7 @@
"h": 13,
"w": 12,
"x": 0,
- "y": 43
+ "y": 86
},
"id": 28,
"legend": {
@@ -3192,7 +3408,7 @@
"h": 13,
"w": 12,
"x": 12,
- "y": 43
+ "y": 86
},
"id": 25,
"legend": {
@@ -3295,7 +3511,7 @@
"h": 10,
"w": 12,
"x": 0,
- "y": 55
+ "y": 68
},
"id": 1,
"legend": {
@@ -3387,7 +3603,7 @@
"h": 10,
"w": 12,
"x": 12,
- "y": 55
+ "y": 68
},
"id": 8,
"legend": {
@@ -3477,7 +3693,7 @@
"h": 10,
"w": 12,
"x": 0,
- "y": 65
+ "y": 78
},
"id": 38,
"legend": {
@@ -3563,7 +3779,7 @@
"h": 10,
"w": 12,
"x": 12,
- "y": 65
+ "y": 78
},
"id": 39,
"legend": {
@@ -3643,13 +3859,13 @@
"bars": false,
"dashLength": 10,
"dashes": false,
- "datasource": "${DS_PROMETHEUS}",
+ "datasource": "$datasource",
"fill": 1,
"gridPos": {
"h": 9,
"w": 12,
"x": 0,
- "y": 75
+ "y": 88
},
"id": 65,
"legend": {
@@ -3745,13 +3961,13 @@
"bars": false,
"dashLength": 10,
"dashes": false,
- "datasource": "${DS_PROMETHEUS}",
+ "datasource": "$datasource",
"fill": 1,
"gridPos": {
"h": 9,
"w": 12,
"x": 0,
- "y": 90
+ "y": 27
},
"id": 91,
"legend": {
@@ -3841,7 +4057,7 @@
"h": 9,
"w": 12,
"x": 12,
- "y": 90
+ "y": 27
},
"id": 21,
"legend": {
@@ -3920,13 +4136,13 @@
"bars": false,
"dashLength": 10,
"dashes": false,
- "datasource": "${DS_PROMETHEUS}",
+ "datasource": "$datasource",
"fill": 1,
"gridPos": {
"h": 9,
"w": 12,
"x": 0,
- "y": 99
+ "y": 36
},
"id": 89,
"legend": {
@@ -4006,13 +4222,13 @@
"bars": false,
"dashLength": 10,
"dashes": false,
- "datasource": "${DS_PROMETHEUS}",
+ "datasource": "$datasource",
"fill": 1,
"gridPos": {
"h": 9,
"w": 12,
"x": 12,
- "y": 99
+ "y": 36
},
"id": 93,
"legend": {
@@ -4027,7 +4243,7 @@
"lines": true,
"linewidth": 1,
"links": [],
- "nullPointMode": "null",
+ "nullPointMode": "connected",
"percentage": false,
"pointradius": 5,
"points": false,
@@ -4090,13 +4306,13 @@
"bars": false,
"dashLength": 10,
"dashes": false,
- "datasource": "${DS_PROMETHEUS}",
+ "datasource": "$datasource",
"fill": 1,
"gridPos": {
"h": 9,
"w": 12,
"x": 0,
- "y": 108
+ "y": 45
},
"id": 95,
"legend": {
@@ -4189,7 +4405,7 @@
"h": 9,
"w": 12,
"x": 12,
- "y": 108
+ "y": 45
},
"heatmap": {},
"highlightCards": true,
@@ -4251,13 +4467,13 @@
"bars": false,
"dashLength": 10,
"dashes": false,
- "datasource": "${DS_PROMETHEUS}",
+ "datasource": "$datasource",
"fill": 1,
"gridPos": {
"h": 7,
"w": 12,
"x": 0,
- "y": 19
+ "y": 97
},
"id": 2,
"legend": {
@@ -4357,20 +4573,24 @@
"min": null,
"show": true
}
- ]
+ ],
+ "yaxis": {
+ "align": false,
+ "alignLevel": null
+ }
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
- "datasource": "${DS_PROMETHEUS}",
+ "datasource": "$datasource",
"fill": 1,
"gridPos": {
"h": 7,
"w": 12,
"x": 12,
- "y": 19
+ "y": 97
},
"id": 41,
"legend": {
@@ -4439,20 +4659,24 @@
"min": null,
"show": true
}
- ]
+ ],
+ "yaxis": {
+ "align": false,
+ "alignLevel": null
+ }
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
- "datasource": "${DS_PROMETHEUS}",
+ "datasource": "$datasource",
"fill": 1,
"gridPos": {
"h": 7,
"w": 12,
"x": 0,
- "y": 26
+ "y": 104
},
"id": 42,
"legend": {
@@ -4520,20 +4744,24 @@
"min": null,
"show": true
}
- ]
+ ],
+ "yaxis": {
+ "align": false,
+ "alignLevel": null
+ }
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
- "datasource": "${DS_PROMETHEUS}",
+ "datasource": "$datasource",
"fill": 1,
"gridPos": {
"h": 7,
"w": 12,
"x": 12,
- "y": 26
+ "y": 104
},
"id": 43,
"legend": {
@@ -4601,7 +4829,11 @@
"min": null,
"show": true
}
- ]
+ ],
+ "yaxis": {
+ "align": false,
+ "alignLevel": null
+ }
}
],
"repeat": null,
@@ -4623,7 +4855,7 @@
"bars": false,
"dashLength": 10,
"dashes": false,
- "datasource": "${DS_PROMETHEUS}",
+ "datasource": "$datasource",
"fill": 1,
"gridPos": {
"h": 9,
@@ -4644,7 +4876,7 @@
"lines": true,
"linewidth": 1,
"links": [],
- "nullPointMode": "null",
+ "nullPointMode": "connected",
"percentage": false,
"pointradius": 5,
"points": false,
@@ -4708,7 +4940,7 @@
"bars": false,
"dashLength": 10,
"dashes": false,
- "datasource": "${DS_PROMETHEUS}",
+ "datasource": "$datasource",
"fill": 1,
"gridPos": {
"h": 9,
@@ -4729,7 +4961,7 @@
"lines": true,
"linewidth": 1,
"links": [],
- "nullPointMode": "null",
+ "nullPointMode": "connected",
"percentage": false,
"pointradius": 5,
"points": false,
@@ -4856,9 +5088,19 @@
"selected": false,
"text": "5m",
"value": "5m"
+ },
+ {
+ "selected": false,
+ "text": "10m",
+ "value": "10m"
+ },
+ {
+ "selected": false,
+ "text": "15m",
+ "value": "15m"
}
],
- "query": "30s,1m,2m,5m",
+ "query": "30s,1m,2m,5m,10m,15m",
"refresh": 2,
"type": "interval"
},
@@ -4872,7 +5114,7 @@
"multi": false,
"name": "instance",
"options": [],
- "query": "label_values(process_cpu_user_seconds_total{job=~\"synapse.*\"}, instance)",
+ "query": "label_values(synapse_util_metrics_block_ru_utime_seconds, instance)",
"refresh": 2,
"regex": "",
"sort": 0,
@@ -4895,7 +5137,7 @@
"multiFormat": "regex values",
"name": "job",
"options": [],
- "query": "label_values(process_cpu_user_seconds_total{job=~\"synapse.*\"}, job)",
+ "query": "label_values(synapse_util_metrics_block_ru_utime_seconds, job)",
"refresh": 2,
"refresh_on_load": false,
"regex": "",
@@ -4919,7 +5161,7 @@
"multiFormat": "regex values",
"name": "index",
"options": [],
- "query": "label_values(process_cpu_user_seconds_total{job=~\"synapse.*\"}, index)",
+ "query": "label_values(synapse_util_metrics_block_ru_utime_seconds, index)",
"refresh": 2,
"refresh_on_load": false,
"regex": "",
@@ -4965,5 +5207,5 @@
"timezone": "",
"title": "Synapse",
"uid": "000000012",
- "version": 127
+ "version": 3
} \ No newline at end of file
diff --git a/docker/Dockerfile b/docker/Dockerfile
index 20d3fe3b..db44c02a 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -1,8 +1,13 @@
-FROM docker.io/python:2-alpine3.8
+ARG PYTHON_VERSION=2
-COPY . /synapse
+###
+### Stage 0: builder
+###
+FROM docker.io/python:${PYTHON_VERSION}-alpine3.8 as builder
+
+# install the OS build deps
-RUN apk add --no-cache --virtual .build_deps \
+RUN apk add \
build-base \
libffi-dev \
libjpeg-turbo-dev \
@@ -10,30 +15,47 @@ RUN apk add --no-cache --virtual .build_deps \
libxslt-dev \
linux-headers \
postgresql-dev \
- zlib-dev \
- && cd /synapse \
- && apk add --no-cache --virtual .runtime_deps \
- libffi \
- libjpeg-turbo \
- libressl \
- libxslt \
- libpq \
- zlib \
- su-exec \
- && pip install --upgrade \
+ zlib-dev
+
+# build things which have slow build steps, before we copy synapse, so that
+# the layer can be cached.
+#
+# (we really just care about caching a wheel here, as the "pip install" below
+# will install them again.)
+
+RUN pip install --prefix="/install" --no-warn-script-location \
+ cryptography \
+ msgpack-python \
+ pillow \
+ pynacl
+
+# now install synapse and all of the python deps to /install.
+
+COPY . /synapse
+RUN pip install --prefix="/install" --no-warn-script-location \
lxml \
- pip \
psycopg2 \
- setuptools \
- && mkdir -p /synapse/cache \
- && pip install -f /synapse/cache --upgrade --process-dependency-links . \
- && mv /synapse/docker/start.py /synapse/docker/conf / \
- && rm -rf \
- setup.cfg \
- setup.py \
- synapse \
- && apk del .build_deps
-
+ /synapse
+
+###
+### Stage 1: runtime
+###
+
+FROM docker.io/python:${PYTHON_VERSION}-alpine3.8
+
+RUN apk add --no-cache --virtual .runtime_deps \
+ libffi \
+ libjpeg-turbo \
+ libressl \
+ libxslt \
+ libpq \
+ zlib \
+ su-exec
+
+COPY --from=builder /install /usr/local
+COPY ./docker/start.py /start.py
+COPY ./docker/conf /conf
+
VOLUME ["/data"]
EXPOSE 8008/tcp 8448/tcp
diff --git a/docker/Dockerfile-pgtests b/docker/Dockerfile-pgtests
new file mode 100644
index 00000000..7da8eeb9
--- /dev/null
+++ b/docker/Dockerfile-pgtests
@@ -0,0 +1,12 @@
+# Use the Sytest image that comes with a lot of the build dependencies
+# pre-installed
+FROM matrixdotorg/sytest:latest
+
+# The Sytest image doesn't come with python, so install that
+RUN apt-get -qq install -y python python-dev python-pip
+
+# We need tox to run the tests in run_pg_tests.sh
+RUN pip install tox
+
+ADD run_pg_tests.sh /pg_tests.sh
+ENTRYPOINT /pg_tests.sh
diff --git a/docker/README.md b/docker/README.md
index 038c78f7..3c00d1e9 100644
--- a/docker/README.md
+++ b/docker/README.md
@@ -88,6 +88,7 @@ variables are available for configuration:
* ``SYNAPSE_TURN_URIS``, set this variable to the coma-separated list of TURN
uris to enable TURN for this homeserver.
* ``SYNAPSE_TURN_SECRET``, set this to the TURN shared secret if required.
+* ``SYNAPSE_MAX_UPLOAD_SIZE``, set this variable to change the max upload size [default `10M`].
Shared secrets, that will be initialized to random values if not set:
diff --git a/docker/conf/homeserver.yaml b/docker/conf/homeserver.yaml
index 6bc25bb4..cfe88788 100644
--- a/docker/conf/homeserver.yaml
+++ b/docker/conf/homeserver.yaml
@@ -85,7 +85,7 @@ federation_rc_concurrent: 3
media_store_path: "/data/media"
uploads_path: "/data/uploads"
-max_upload_size: "10M"
+max_upload_size: "{{ SYNAPSE_MAX_UPLOAD_SIZE or "10M" }}"
max_image_pixels: "32M"
dynamic_thumbnails: false
diff --git a/docker/run_pg_tests.sh b/docker/run_pg_tests.sh
new file mode 100755
index 00000000..e77424c4
--- /dev/null
+++ b/docker/run_pg_tests.sh
@@ -0,0 +1,20 @@
+#!/bin/bash
+
+# This script runs the PostgreSQL tests inside a Docker container. It expects
+# the relevant source files to be mounted into /src (done automatically by the
+# caller script). It will set up the database, run it, and then use the tox
+# configuration to run the tests.
+
+set -e
+
+# Set PGUSER so Synapse's tests know what user to connect to the database with
+export PGUSER=postgres
+
+# Initialise & start the database
+su -c '/usr/lib/postgresql/9.6/bin/initdb -D /var/lib/postgresql/data -E "UTF-8" --lc-collate="en_US.UTF-8" --lc-ctype="en_US.UTF-8" --username=postgres' postgres
+su -c '/usr/lib/postgresql/9.6/bin/pg_ctl -w -D /var/lib/postgresql/data start' postgres
+
+# Run the tests
+cd /src
+export TRIAL_FLAGS="-j 4"
+tox --workdir=/tmp -e py27-postgres
diff --git a/docker/start.py b/docker/start.py
index 90e8b9c5..346df8c8 100755
--- a/docker/start.py
+++ b/docker/start.py
@@ -5,6 +5,7 @@ import os
import sys
import subprocess
import glob
+import codecs
# Utility functions
convert = lambda src, dst, environ: open(dst, "w").write(jinja2.Template(open(src).read()).render(**environ))
@@ -23,7 +24,7 @@ def generate_secrets(environ, secrets):
with open(filename) as handle: value = handle.read()
else:
print("Generating a random secret for {}".format(name))
- value = os.urandom(32).encode("hex")
+ value = codecs.encode(os.urandom(32), "hex").decode()
with open(filename, "w") as handle: handle.write(value)
environ[secret] = value
diff --git a/jenkins-dendron-haproxy-postgres.sh b/jenkins-dendron-haproxy-postgres.sh
deleted file mode 100755
index 07979bf8..00000000
--- a/jenkins-dendron-haproxy-postgres.sh
+++ /dev/null
@@ -1,23 +0,0 @@
-#!/bin/bash
-
-set -eux
-
-: ${WORKSPACE:="$(pwd)"}
-
-export WORKSPACE
-export PYTHONDONTWRITEBYTECODE=yep
-export SYNAPSE_CACHE_FACTOR=1
-
-export HAPROXY_BIN=/home/haproxy/haproxy-1.6.11/haproxy
-
-./jenkins/prepare_synapse.sh
-./jenkins/clone.sh sytest https://github.com/matrix-org/sytest.git
-./jenkins/clone.sh dendron https://github.com/matrix-org/dendron.git
-./dendron/jenkins/build_dendron.sh
-./sytest/jenkins/prep_sytest_for_postgres.sh
-
-./sytest/jenkins/install_and_run.sh \
- --python $WORKSPACE/.tox/py27/bin/python \
- --synapse-directory $WORKSPACE \
- --dendron $WORKSPACE/dendron/bin/dendron \
- --haproxy \
diff --git a/jenkins-dendron-postgres.sh b/jenkins-dendron-postgres.sh
deleted file mode 100755
index 3b932fe3..00000000
--- a/jenkins-dendron-postgres.sh
+++ /dev/null
@@ -1,20 +0,0 @@
-#!/bin/bash
-
-set -eux
-
-: ${WORKSPACE:="$(pwd)"}
-
-export WORKSPACE
-export PYTHONDONTWRITEBYTECODE=yep
-export SYNAPSE_CACHE_FACTOR=1
-
-./jenkins/prepare_synapse.sh
-./jenkins/clone.sh sytest https://github.com/matrix-org/sytest.git
-./jenkins/clone.sh dendron https://github.com/matrix-org/dendron.git
-./dendron/jenkins/build_dendron.sh
-./sytest/jenkins/prep_sytest_for_postgres.sh
-
-./sytest/jenkins/install_and_run.sh \
- --python $WORKSPACE/.tox/py27/bin/python \
- --synapse-directory $WORKSPACE \
- --dendron $WORKSPACE/dendron/bin/dendron \
diff --git a/jenkins-flake8.sh b/jenkins-flake8.sh
deleted file mode 100755
index 11f1cab6..00000000
--- a/jenkins-flake8.sh
+++ /dev/null
@@ -1,22 +0,0 @@
-#!/bin/bash
-
-set -eux
-
-: ${WORKSPACE:="$(pwd)"}
-
-export PYTHONDONTWRITEBYTECODE=yep
-export SYNAPSE_CACHE_FACTOR=1
-
-# Output test results as junit xml
-export TRIAL_FLAGS="--reporter=subunit"
-export TOXSUFFIX="| subunit-1to2 | subunit2junitxml --no-passthrough --output-to=results.xml"
-# Write coverage reports to a separate file for each process
-export COVERAGE_OPTS="-p"
-export DUMP_COVERAGE_COMMAND="coverage help"
-
-# Output flake8 violations to violations.flake8.log
-export PEP8SUFFIX="--output-file=violations.flake8.log"
-
-rm .coverage* || echo "No coverage files to remove"
-
-tox -e packaging -e pep8
diff --git a/jenkins-postgres.sh b/jenkins-postgres.sh
deleted file mode 100755
index 1afb7363..00000000
--- a/jenkins-postgres.sh
+++ /dev/null
@@ -1,18 +0,0 @@
-#!/bin/bash
-
-set -eux
-
-: ${WORKSPACE:="$(pwd)"}
-
-export WORKSPACE
-export PYTHONDONTWRITEBYTECODE=yep
-export SYNAPSE_CACHE_FACTOR=1
-
-./jenkins/prepare_synapse.sh
-./jenkins/clone.sh sytest https://github.com/matrix-org/sytest.git
-
-./sytest/jenkins/prep_sytest_for_postgres.sh
-
-./sytest/jenkins/install_and_run.sh \
- --python $WORKSPACE/.tox/py27/bin/python \
- --synapse-directory $WORKSPACE \
diff --git a/jenkins-sqlite.sh b/jenkins-sqlite.sh
deleted file mode 100755
index baf4713a..00000000
--- a/jenkins-sqlite.sh
+++ /dev/null
@@ -1,16 +0,0 @@
-#!/bin/bash
-
-set -eux
-
-: ${WORKSPACE:="$(pwd)"}
-
-export WORKSPACE
-export PYTHONDONTWRITEBYTECODE=yep
-export SYNAPSE_CACHE_FACTOR=1
-
-./jenkins/prepare_synapse.sh
-./jenkins/clone.sh sytest https://github.com/matrix-org/sytest.git
-
-./sytest/jenkins/install_and_run.sh \
- --python $WORKSPACE/.tox/py27/bin/python \
- --synapse-directory $WORKSPACE \
diff --git a/jenkins-unittests.sh b/jenkins-unittests.sh
deleted file mode 100755
index 4c2f103e..00000000
--- a/jenkins-unittests.sh
+++ /dev/null
@@ -1,30 +0,0 @@
-#!/bin/bash
-
-set -eux
-
-: ${WORKSPACE:="$(pwd)"}
-
-export PYTHONDONTWRITEBYTECODE=yep
-export SYNAPSE_CACHE_FACTOR=1
-
-# Output test results as junit xml
-export TRIAL_FLAGS="--reporter=subunit"
-export TOXSUFFIX="| subunit-1to2 | subunit2junitxml --no-passthrough --output-to=results.xml"
-# Write coverage reports to a separate file for each process
-export COVERAGE_OPTS="-p"
-export DUMP_COVERAGE_COMMAND="coverage help"
-
-# Output flake8 violations to violations.flake8.log
-# Don't exit with non-0 status code on Jenkins,
-# so that the build steps continue and a later step can decided whether to
-# UNSTABLE or FAILURE this build.
-export PEP8SUFFIX="--output-file=violations.flake8.log || echo flake8 finished with status code \$?"
-
-rm .coverage* || echo "No coverage files to remove"
-
-tox --notest -e py27
-TOX_BIN=$WORKSPACE/.tox/py27/bin
-python synapse/python_dependencies.py | xargs -n1 $TOX_BIN/pip install
-$TOX_BIN/pip install lxml
-
-tox -e py27
diff --git a/jenkins/clone.sh b/jenkins/clone.sh
deleted file mode 100755
index ab30ac77..00000000
--- a/jenkins/clone.sh
+++ /dev/null
@@ -1,44 +0,0 @@
-#! /bin/bash
-
-# This clones a project from github into a named subdirectory
-# If the project has a branch with the same name as this branch
-# then it will checkout that branch after cloning.
-# Otherwise it will checkout "origin/develop."
-# The first argument is the name of the directory to checkout
-# the branch into.
-# The second argument is the URL of the remote repository to checkout.
-# Usually something like https://github.com/matrix-org/sytest.git
-
-set -eux
-
-NAME=$1
-PROJECT=$2
-BASE=".$NAME-base"
-
-# Update our mirror.
-if [ ! -d ".$NAME-base" ]; then
- # Create a local mirror of the source repository.
- # This saves us from having to download the entire repository
- # when this script is next run.
- git clone "$PROJECT" "$BASE" --mirror
-else
- # Fetch any updates from the source repository.
- (cd "$BASE"; git fetch -p)
-fi
-
-# Remove the existing repository so that we have a clean copy
-rm -rf "$NAME"
-# Cloning with --shared means that we will share portions of the
-# .git directory with our local mirror.
-git clone "$BASE" "$NAME" --shared
-
-# Jenkins may have supplied us with the name of the branch in the
-# environment. Otherwise we will have to guess based on the current
-# commit.
-: ${GIT_BRANCH:="origin/$(git rev-parse --abbrev-ref HEAD)"}
-cd "$NAME"
-# check out the relevant branch
-git checkout "${GIT_BRANCH}" || (
- echo >&2 "No ref ${GIT_BRANCH} found, falling back to develop"
- git checkout "origin/develop"
-)
diff --git a/scripts-dev/copyrighter-sql.pl b/scripts-dev/copyrighter-sql.pl
deleted file mode 100755
index 13e630fc..00000000
--- a/scripts-dev/copyrighter-sql.pl
+++ /dev/null
@@ -1,33 +0,0 @@
-#!/usr/bin/perl -pi
-# Copyright 2014-2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-$copyright = <<EOT;
-/* Copyright 2016 OpenMarket Ltd
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-EOT
-
-s/^(# -\*- coding: utf-8 -\*-\n)?/$1$copyright/ if ($. == 1);
diff --git a/scripts-dev/copyrighter.pl b/scripts-dev/copyrighter.pl
deleted file mode 100755
index 03656f69..00000000
--- a/scripts-dev/copyrighter.pl
+++ /dev/null
@@ -1,33 +0,0 @@
-#!/usr/bin/perl -pi
-# Copyright 2014-2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-$copyright = <<EOT;
-# Copyright 2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-EOT
-
-s/^(# -\*- coding: utf-8 -\*-\n)?/$1$copyright/ if ($. == 1);
diff --git a/scripts-dev/dump_macaroon.py b/scripts-dev/dump_macaroon.py
index 6e45be75..fcc55688 100755
--- a/scripts-dev/dump_macaroon.py
+++ b/scripts-dev/dump_macaroon.py
@@ -21,4 +21,4 @@ try:
verifier.verify(macaroon, key)
print "Signature is correct"
except Exception as e:
- print e.message
+ print str(e)
diff --git a/scripts-dev/next_github_number.sh b/scripts-dev/next_github_number.sh
new file mode 100755
index 00000000..37628002
--- /dev/null
+++ b/scripts-dev/next_github_number.sh
@@ -0,0 +1,9 @@
+#!/bin/bash
+
+set -e
+
+# Fetch the current GitHub issue number, add one to it -- presto! The likely
+# next PR number.
+CURRENT_NUMBER=`curl -s "https://api.github.com/repos/matrix-org/synapse/issues?state=all&per_page=1" | jq -r ".[0].number"`
+CURRENT_NUMBER=$((CURRENT_NUMBER+1))
+echo $CURRENT_NUMBER \ No newline at end of file
diff --git a/scripts/register_new_matrix_user b/scripts/register_new_matrix_user
index 8c3d4293..91bdb3a2 100755
--- a/scripts/register_new_matrix_user
+++ b/scripts/register_new_matrix_user
@@ -133,7 +133,7 @@ def register_new_user(user, password, server_location, shared_secret, admin):
print "Passwords do not match"
sys.exit(1)
- if not admin:
+ if admin is None:
admin = raw_input("Make admin [no]: ")
if admin in ("y", "yes", "true"):
admin = True
@@ -160,10 +160,16 @@ if __name__ == "__main__":
default=None,
help="New password for user. Will prompt if omitted.",
)
- parser.add_argument(
+ admin_group = parser.add_mutually_exclusive_group()
+ admin_group.add_argument(
"-a", "--admin",
action="store_true",
- help="Register new user as an admin. Will prompt if omitted.",
+ help="Register new user as an admin. Will prompt if --no-admin is not set either.",
+ )
+ admin_group.add_argument(
+ "--no-admin",
+ action="store_true",
+ help="Register new user as a regular user. Will prompt if --admin is not set either.",
)
group = parser.add_mutually_exclusive_group(required=True)
@@ -197,4 +203,8 @@ if __name__ == "__main__":
else:
secret = args.shared_secret
- register_new_user(args.user, args.password, args.server_url, secret, args.admin)
+ admin = None
+ if args.admin or args.no_admin:
+ admin = args.admin
+
+ register_new_user(args.user, args.password, args.server_url, secret, admin)
diff --git a/synapse/__init__.py b/synapse/__init__.py
index b1f7a89f..43c5821a 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -27,4 +27,4 @@ try:
except ImportError:
pass
-__version__ = "0.33.5.1"
+__version__ = "0.33.6"
diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index a31a9a17..eed8c67e 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -226,7 +226,7 @@ class Filtering(object):
jsonschema.validate(user_filter_json, USER_FILTER_SCHEMA,
format_checker=FormatChecker())
except jsonschema.ValidationError as e:
- raise SynapseError(400, e.message)
+ raise SynapseError(400, str(e))
class FilterCollection(object):
diff --git a/synapse/api/urls.py b/synapse/api/urls.py
index 71347912..6d9f1ca0 100644
--- a/synapse/api/urls.py
+++ b/synapse/api/urls.py
@@ -64,7 +64,7 @@ class ConsentURIBuilder(object):
"""
mac = hmac.new(
key=self._hmac_secret,
- msg=user_id,
+ msg=user_id.encode('ascii'),
digestmod=sha256,
).hexdigest()
consent_uri = "%s_matrix/consent?%s" % (
diff --git a/synapse/app/__init__.py b/synapse/app/__init__.py
index 3b6b9368..c3afcc57 100644
--- a/synapse/app/__init__.py
+++ b/synapse/app/__init__.py
@@ -24,7 +24,7 @@ try:
python_dependencies.check_requirements()
except python_dependencies.MissingRequirementError as e:
message = "\n".join([
- "Missing Requirement: %s" % (e.message,),
+ "Missing Requirement: %s" % (str(e),),
"To install run:",
" pip install --upgrade --force \"%s\"" % (e.dependency,),
"",
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index 86b50674..8559e141 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -136,7 +136,7 @@ def start(config_options):
"Synapse appservice", config_options
)
except ConfigError as e:
- sys.stderr.write("\n" + e.message + "\n")
+ sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.appservice"
@@ -172,7 +172,6 @@ def start(config_options):
def start():
ps.get_datastore().start_profiling()
- ps.get_state_handler().start_caching()
reactor.callWhenRunning(start)
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index ce2b113d..76aed8c6 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -153,7 +153,7 @@ def start(config_options):
"Synapse client reader", config_options
)
except ConfigError as e:
- sys.stderr.write("\n" + e.message + "\n")
+ sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.client_reader"
@@ -181,7 +181,6 @@ def start(config_options):
ss.start_listening(config.worker_listeners)
def start():
- ss.get_state_handler().start_caching()
ss.get_datastore().start_profiling()
reactor.callWhenRunning(start)
diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py
index f98e456e..9060ab14 100644
--- a/synapse/app/event_creator.py
+++ b/synapse/app/event_creator.py
@@ -169,7 +169,7 @@ def start(config_options):
"Synapse event creator", config_options
)
except ConfigError as e:
- sys.stderr.write("\n" + e.message + "\n")
+ sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.event_creator"
@@ -199,7 +199,6 @@ def start(config_options):
ss.start_listening(config.worker_listeners)
def start():
- ss.get_state_handler().start_caching()
ss.get_datastore().start_profiling()
reactor.callWhenRunning(start)
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index 60f59735..228a297f 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -140,7 +140,7 @@ def start(config_options):
"Synapse federation reader", config_options
)
except ConfigError as e:
- sys.stderr.write("\n" + e.message + "\n")
+ sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.federation_reader"
@@ -168,7 +168,6 @@ def start(config_options):
ss.start_listening(config.worker_listeners)
def start():
- ss.get_state_handler().start_caching()
ss.get_datastore().start_profiling()
reactor.callWhenRunning(start)
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 60dd09aa..e9a99d76 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -160,7 +160,7 @@ def start(config_options):
"Synapse federation sender", config_options
)
except ConfigError as e:
- sys.stderr.write("\n" + e.message + "\n")
+ sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.federation_sender"
@@ -201,7 +201,6 @@ def start(config_options):
def start():
ps.get_datastore().start_profiling()
- ps.get_state_handler().start_caching()
reactor.callWhenRunning(start)
_base.start_worker_reactor("synapse-federation-sender", config)
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
index 8c0b9c67..fc4b25de 100644
--- a/synapse/app/frontend_proxy.py
+++ b/synapse/app/frontend_proxy.py
@@ -228,7 +228,7 @@ def start(config_options):
"Synapse frontend proxy", config_options
)
except ConfigError as e:
- sys.stderr.write("\n" + e.message + "\n")
+ sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.frontend_proxy"
@@ -258,7 +258,6 @@ def start(config_options):
ss.start_listening(config.worker_listeners)
def start():
- ss.get_state_handler().start_caching()
ss.get_datastore().start_profiling()
reactor.callWhenRunning(start)
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index ac97e196..e3f0d99a 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -301,7 +301,7 @@ class SynapseHomeServer(HomeServer):
try:
database_engine.check_database(db_conn.cursor())
except IncorrectDatabaseSetup as e:
- quit_with_error(e.message)
+ quit_with_error(str(e))
# Gauges to expose monthly active user control metrics
@@ -328,7 +328,7 @@ def setup(config_options):
config_options,
)
except ConfigError as e:
- sys.stderr.write("\n" + e.message + "\n")
+ sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
if not config:
@@ -384,10 +384,8 @@ def setup(config_options):
def start():
hs.get_pusherpool().start()
- hs.get_state_handler().start_caching()
hs.get_datastore().start_profiling()
hs.get_datastore().start_doing_background_updates()
- hs.get_federation_client().start_get_pdu_cache()
reactor.callWhenRunning(start)
@@ -457,6 +455,10 @@ def run(hs):
stats["homeserver"] = hs.config.server_name
stats["timestamp"] = now
stats["uptime_seconds"] = uptime
+ version = sys.version_info
+ stats["python_version"] = "{}.{}.{}".format(
+ version.major, version.minor, version.micro
+ )
stats["total_users"] = yield hs.get_datastore().count_all_users()
total_nonbridged_users = yield hs.get_datastore().count_nonbridged_users()
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index e3dbb3b4..acc0487a 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -133,7 +133,7 @@ def start(config_options):
"Synapse media repository", config_options
)
except ConfigError as e:
- sys.stderr.write("\n" + e.message + "\n")
+ sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.media_repository"
@@ -168,7 +168,6 @@ def start(config_options):
ss.start_listening(config.worker_listeners)
def start():
- ss.get_state_handler().start_caching()
ss.get_datastore().start_profiling()
reactor.callWhenRunning(start)
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 244c604d..630dcda4 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -191,7 +191,7 @@ def start(config_options):
"Synapse pusher", config_options
)
except ConfigError as e:
- sys.stderr.write("\n" + e.message + "\n")
+ sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.pusher"
@@ -228,7 +228,6 @@ def start(config_options):
def start():
ps.get_pusherpool().start()
ps.get_datastore().start_profiling()
- ps.get_state_handler().start_caching()
reactor.callWhenRunning(start)
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 66623407..9a7fc6ee 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -410,7 +410,7 @@ def start(config_options):
"Synapse synchrotron", config_options
)
except ConfigError as e:
- sys.stderr.write("\n" + e.message + "\n")
+ sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.synchrotron"
@@ -435,7 +435,6 @@ def start(config_options):
def start():
ss.get_datastore().start_profiling()
- ss.get_state_handler().start_caching()
reactor.callWhenRunning(start)
diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py
deleted file mode 100755
index d658f967..00000000
--- a/synapse/app/synctl.py
+++ /dev/null
@@ -1,284 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-# Copyright 2014-2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import argparse
-import collections
-import errno
-import glob
-import os
-import os.path
-import signal
-import subprocess
-import sys
-import time
-
-from six import iteritems
-
-import yaml
-
-SYNAPSE = [sys.executable, "-B", "-m", "synapse.app.homeserver"]
-
-GREEN = "\x1b[1;32m"
-YELLOW = "\x1b[1;33m"
-RED = "\x1b[1;31m"
-NORMAL = "\x1b[m"
-
-
-def pid_running(pid):
- try:
- os.kill(pid, 0)
- return True
- except OSError as err:
- if err.errno == errno.EPERM:
- return True
- return False
-
-
-def write(message, colour=NORMAL, stream=sys.stdout):
- if colour == NORMAL:
- stream.write(message + "\n")
- else:
- stream.write(colour + message + NORMAL + "\n")
-
-
-def abort(message, colour=RED, stream=sys.stderr):
- write(message, colour, stream)
- sys.exit(1)
-
-
-def start(configfile):
- write("Starting ...")
- args = SYNAPSE
- args.extend(["--daemonize", "-c", configfile])
-
- try:
- subprocess.check_call(args)
- write("started synapse.app.homeserver(%r)" %
- (configfile,), colour=GREEN)
- except subprocess.CalledProcessError as e:
- write(
- "error starting (exit code: %d); see above for logs" % e.returncode,
- colour=RED,
- )
-
-
-def start_worker(app, configfile, worker_configfile):
- args = [
- "python", "-B",
- "-m", app,
- "-c", configfile,
- "-c", worker_configfile
- ]
-
- try:
- subprocess.check_call(args)
- write("started %s(%r)" % (app, worker_configfile), colour=GREEN)
- except subprocess.CalledProcessError as e:
- write(
- "error starting %s(%r) (exit code: %d); see above for logs" % (
- app, worker_configfile, e.returncode,
- ),
- colour=RED,
- )
-
-
-def stop(pidfile, app):
- if os.path.exists(pidfile):
- pid = int(open(pidfile).read())
- try:
- os.kill(pid, signal.SIGTERM)
- write("stopped %s" % (app,), colour=GREEN)
- except OSError as err:
- if err.errno == errno.ESRCH:
- write("%s not running" % (app,), colour=YELLOW)
- elif err.errno == errno.EPERM:
- abort("Cannot stop %s: Operation not permitted" % (app,))
- else:
- abort("Cannot stop %s: Unknown error" % (app,))
-
-
-Worker = collections.namedtuple("Worker", [
- "app", "configfile", "pidfile", "cache_factor"
-])
-
-
-def main():
-
- parser = argparse.ArgumentParser()
-
- parser.add_argument(
- "action",
- choices=["start", "stop", "restart"],
- help="whether to start, stop or restart the synapse",
- )
- parser.add_argument(
- "configfile",
- nargs="?",
- default="homeserver.yaml",
- help="the homeserver config file, defaults to homeserver.yaml",
- )
- parser.add_argument(
- "-w", "--worker",
- metavar="WORKERCONFIG",
- help="start or stop a single worker",
- )
- parser.add_argument(
- "-a", "--all-processes",
- metavar="WORKERCONFIGDIR",
- help="start or stop all the workers in the given directory"
- " and the main synapse process",
- )
-
- options = parser.parse_args()
-
- if options.worker and options.all_processes:
- write(
- 'Cannot use "--worker" with "--all-processes"',
- stream=sys.stderr
- )
- sys.exit(1)
-
- configfile = options.configfile
-
- if not os.path.exists(configfile):
- write(
- "No config file found\n"
- "To generate a config file, run '%s -c %s --generate-config"
- " --server-name=<server name>'\n" % (
- " ".join(SYNAPSE), options.configfile
- ),
- stream=sys.stderr,
- )
- sys.exit(1)
-
- with open(configfile) as stream:
- config = yaml.load(stream)
-
- pidfile = config["pid_file"]
- cache_factor = config.get("synctl_cache_factor")
- start_stop_synapse = True
-
- if cache_factor:
- os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
-
- cache_factors = config.get("synctl_cache_factors", {})
- for cache_name, factor in iteritems(cache_factors):
- os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
-
- worker_configfiles = []
- if options.worker:
- start_stop_synapse = False
- worker_configfile = options.worker
- if not os.path.exists(worker_configfile):
- write(
- "No worker config found at %r" % (worker_configfile,),
- stream=sys.stderr,
- )
- sys.exit(1)
- worker_configfiles.append(worker_configfile)
-
- if options.all_processes:
- # To start the main synapse with -a you need to add a worker file
- # with worker_app == "synapse.app.homeserver"
- start_stop_synapse = False
- worker_configdir = options.all_processes
- if not os.path.isdir(worker_configdir):
- write(
- "No worker config directory found at %r" % (worker_configdir,),
- stream=sys.stderr,
- )
- sys.exit(1)
- worker_configfiles.extend(sorted(glob.glob(
- os.path.join(worker_configdir, "*.yaml")
- )))
-
- workers = []
- for worker_configfile in worker_configfiles:
- with open(worker_configfile) as stream:
- worker_config = yaml.load(stream)
- worker_app = worker_config["worker_app"]
- if worker_app == "synapse.app.homeserver":
- # We need to special case all of this to pick up options that may
- # be set in the main config file or in this worker config file.
- worker_pidfile = (
- worker_config.get("pid_file")
- or pidfile
- )
- worker_cache_factor = worker_config.get("synctl_cache_factor") or cache_factor
- daemonize = worker_config.get("daemonize") or config.get("daemonize")
- assert daemonize, "Main process must have daemonize set to true"
-
- # The master process doesn't support using worker_* config.
- for key in worker_config:
- if key == "worker_app": # But we allow worker_app
- continue
- assert not key.startswith("worker_"), \
- "Main process cannot use worker_* config"
- else:
- worker_pidfile = worker_config["worker_pid_file"]
- worker_daemonize = worker_config["worker_daemonize"]
- assert worker_daemonize, "In config %r: expected '%s' to be True" % (
- worker_configfile, "worker_daemonize")
- worker_cache_factor = worker_config.get("synctl_cache_factor")
- workers.append(Worker(
- worker_app, worker_configfile, worker_pidfile, worker_cache_factor,
- ))
-
- action = options.action
-
- if action == "stop" or action == "restart":
- for worker in workers:
- stop(worker.pidfile, worker.app)
-
- if start_stop_synapse:
- stop(pidfile, "synapse.app.homeserver")
-
- # Wait for synapse to actually shutdown before starting it again
- if action == "restart":
- running_pids = []
- if start_stop_synapse and os.path.exists(pidfile):
- running_pids.append(int(open(pidfile).read()))
- for worker in workers:
- if os.path.exists(worker.pidfile):
- running_pids.append(int(open(worker.pidfile).read()))
- if len(running_pids) > 0:
- write("Waiting for process to exit before restarting...")
- for running_pid in running_pids:
- while pid_running(running_pid):
- time.sleep(0.2)
- write("All processes exited; now restarting...")
-
- if action == "start" or action == "restart":
- if start_stop_synapse:
- # Check if synapse is already running
- if os.path.exists(pidfile) and pid_running(int(open(pidfile).read())):
- abort("synapse.app.homeserver already running")
- start(configfile)
-
- for worker in workers:
- if worker.cache_factor:
- os.environ["SYNAPSE_CACHE_FACTOR"] = str(worker.cache_factor)
-
- start_worker(worker.app, configfile, worker.configfile)
-
- if cache_factor:
- os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
- else:
- os.environ.pop("SYNAPSE_CACHE_FACTOR", None)
-
-
-if __name__ == "__main__":
- main()
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index 96ffcaf0..0a5f62b5 100644
--- a/synapse/app/user_dir.py
+++ b/synapse/app/user_dir.py
@@ -188,7 +188,7 @@ def start(config_options):
"Synapse user directory", config_options
)
except ConfigError as e:
- sys.stderr.write("\n" + e.message + "\n")
+ sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.user_dir"
@@ -229,7 +229,6 @@ def start(config_options):
def start():
ps.get_datastore().start_profiling()
- ps.get_state_handler().start_caching()
reactor.callWhenRunning(start)
diff --git a/synapse/config/__main__.py b/synapse/config/__main__.py
index 58c97a70..8fccf573 100644
--- a/synapse/config/__main__.py
+++ b/synapse/config/__main__.py
@@ -25,7 +25,7 @@ if __name__ == "__main__":
try:
config = HomeServerConfig.load_config("", sys.argv[3:])
except ConfigError as e:
- sys.stderr.write("\n" + e.message + "\n")
+ sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
print (getattr(config, key))
diff --git a/synapse/event_auth.py b/synapse/event_auth.py
index 6baeccca..02fa46ef 100644
--- a/synapse/event_auth.py
+++ b/synapse/event_auth.py
@@ -98,9 +98,9 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
creation_event = auth_events.get((EventTypes.Create, ""), None)
if not creation_event:
- raise SynapseError(
+ raise AuthError(
403,
- "Room %r does not exist" % (event.room_id,)
+ "No create event in auth events",
)
creating_domain = get_domain_from_id(event.room_id)
diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index b782af63..12f1eb0a 100644
--- a/synapse/events/__init__.py
+++ b/synapse/events/__init__.py
@@ -13,15 +13,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import os
+from distutils.util import strtobool
+
import six
from synapse.util.caches import intern_dict
from synapse.util.frozenutils import freeze
# Whether we should use frozen_dict in FrozenEvent. Using frozen_dicts prevents
-# bugs where we accidentally share e.g. signature dicts. However, converting
-# a dict to frozen_dicts is expensive.
-USE_FROZEN_DICTS = True
+# bugs where we accidentally share e.g. signature dicts. However, converting a
+# dict to frozen_dicts is expensive.
+#
+# NOTE: This is overridden by the configuration by the Synapse worker apps, but
+# for the sake of tests, it is set here while it cannot be configured on the
+# homeserver object itself.
+USE_FROZEN_DICTS = strtobool(os.environ.get("SYNAPSE_USE_FROZEN_DICTS", "0"))
class _EventInternalMetadata(object):
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index fe67b2ff..d05ed91d 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -66,6 +66,14 @@ class FederationClient(FederationBase):
self.state = hs.get_state_handler()
self.transport_layer = hs.get_federation_transport_client()
+ self._get_pdu_cache = ExpiringCache(
+ cache_name="get_pdu_cache",
+ clock=self._clock,
+ max_len=1000,
+ expiry_ms=120 * 1000,
+ reset_expiry_on_get=False,
+ )
+
def _clear_tried_cache(self):
"""Clear pdu_destination_tried cache"""
now = self._clock.time_msec()
@@ -82,17 +90,6 @@ class FederationClient(FederationBase):
if destination_dict:
self.pdu_destination_tried[event_id] = destination_dict
- def start_get_pdu_cache(self):
- self._get_pdu_cache = ExpiringCache(
- cache_name="get_pdu_cache",
- clock=self._clock,
- max_len=1000,
- expiry_ms=120 * 1000,
- reset_expiry_on_get=False,
- )
-
- self._get_pdu_cache.start()
-
@log_function
def make_query(self, destination, query_type, args,
retry_on_dns_fail=False, ignore_backoff=False):
@@ -212,8 +209,6 @@ class FederationClient(FederationBase):
Will attempt to get the PDU from each destination in the list until
one succeeds.
- This will persist the PDU locally upon receipt.
-
Args:
destinations (list): Which home servers to query
event_id (str): event to fetch
@@ -229,10 +224,9 @@ class FederationClient(FederationBase):
# TODO: Rate limit the number of times we try and get the same event.
- if self._get_pdu_cache:
- ev = self._get_pdu_cache.get(event_id)
- if ev:
- defer.returnValue(ev)
+ ev = self._get_pdu_cache.get(event_id)
+ if ev:
+ defer.returnValue(ev)
pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {})
@@ -285,7 +279,7 @@ class FederationClient(FederationBase):
)
continue
- if self._get_pdu_cache is not None and signed_pdu:
+ if signed_pdu:
self._get_pdu_cache[event_id] = signed_pdu
defer.returnValue(signed_pdu)
@@ -293,8 +287,7 @@ class FederationClient(FederationBase):
@defer.inlineCallbacks
@log_function
def get_state_for_room(self, destination, room_id, event_id):
- """Requests all of the `current` state PDUs for a given room from
- a remote home server.
+ """Requests all of the room state at a given event from a remote home server.
Args:
destination (str): The remote homeserver to query for the state.
@@ -302,9 +295,10 @@ class FederationClient(FederationBase):
event_id (str): The id of the event we want the state at.
Returns:
- Deferred: Results in a list of PDUs.
+ Deferred[Tuple[List[EventBase], List[EventBase]]]:
+ A list of events in the state, and a list of events in the auth chain
+ for the given event.
"""
-
try:
# First we try and ask for just the IDs, as thats far quicker if
# we have most of the state and auth_chain already.
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index dbee404e..819e8f73 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -46,6 +46,7 @@ from synapse.replication.http.federation import (
from synapse.types import get_domain_from_id
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.caches.response_cache import ResponseCache
+from synapse.util.logcontext import nested_logging_context
from synapse.util.logutils import log_function
# when processing incoming transactions, we try to handle multiple rooms in
@@ -187,21 +188,22 @@ class FederationServer(FederationBase):
for pdu in pdus_by_room[room_id]:
event_id = pdu.event_id
- try:
- yield self._handle_received_pdu(
- origin, pdu
- )
- pdu_results[event_id] = {}
- except FederationError as e:
- logger.warn("Error handling PDU %s: %s", event_id, e)
- pdu_results[event_id] = {"error": str(e)}
- except Exception as e:
- f = failure.Failure()
- pdu_results[event_id] = {"error": str(e)}
- logger.error(
- "Failed to handle PDU %s: %s",
- event_id, f.getTraceback().rstrip(),
- )
+ with nested_logging_context(event_id):
+ try:
+ yield self._handle_received_pdu(
+ origin, pdu
+ )
+ pdu_results[event_id] = {}
+ except FederationError as e:
+ logger.warn("Error handling PDU %s: %s", event_id, e)
+ pdu_results[event_id] = {"error": str(e)}
+ except Exception as e:
+ f = failure.Failure()
+ pdu_results[event_id] = {"error": str(e)}
+ logger.error(
+ "Failed to handle PDU %s: %s",
+ event_id, f.getTraceback().rstrip(),
+ )
yield concurrently_execute(
process_pdus_for_room, pdus_by_room.keys(),
@@ -618,7 +620,7 @@ class FederationServer(FederationBase):
)
yield self.handler.on_receive_pdu(
- origin, pdu, get_missing=True, sent_to_us_directly=True,
+ origin, pdu, sent_to_us_directly=True,
)
def __str__(self):
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 8cbf8c4f..98b59508 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -137,26 +137,6 @@ class TransactionQueue(object):
self._processing_pending_presence = False
- def can_send_to(self, destination):
- """Can we send messages to the given server?
-
- We can't send messages to ourselves. If we are running on localhost
- then we can only federation with other servers running on localhost.
- Otherwise we only federate with servers on a public domain.
-
- Args:
- destination(str): The server we are possibly trying to send to.
- Returns:
- bool: True if we can send to the server.
- """
-
- if destination == self.server_name:
- return False
- if self.server_name.startswith("localhost"):
- return destination.startswith("localhost")
- else:
- return not destination.startswith("localhost")
-
def notify_new_events(self, current_id):
"""This gets called when we have some new events we might want to
send out to other servers.
@@ -279,10 +259,7 @@ class TransactionQueue(object):
self._order += 1
destinations = set(destinations)
- destinations = set(
- dest for dest in destinations if self.can_send_to(dest)
- )
-
+ destinations.discard(self.server_name)
logger.debug("Sending to: %s", str(destinations))
if not destinations:
@@ -358,7 +335,7 @@ class TransactionQueue(object):
for destinations, states in hosts_and_states:
for destination in destinations:
- if not self.can_send_to(destination):
+ if destination == self.server_name:
continue
self.pending_presence_by_dest.setdefault(
@@ -377,7 +354,8 @@ class TransactionQueue(object):
content=content,
)
- if not self.can_send_to(destination):
+ if destination == self.server_name:
+ logger.info("Not sending EDU to ourselves")
return
sent_edus_counter.inc()
@@ -392,10 +370,8 @@ class TransactionQueue(object):
self._attempt_new_transaction(destination)
def send_device_messages(self, destination):
- if destination == self.server_name or destination == "localhost":
- return
-
- if not self.can_send_to(destination):
+ if destination == self.server_name:
+ logger.info("Not sending device update to ourselves")
return
self._attempt_new_transaction(destination)
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index ef866da1..18741c5f 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -20,7 +20,14 @@ import string
from twisted.internet import defer
from synapse.api.constants import EventTypes
-from synapse.api.errors import AuthError, CodeMessageException, Codes, SynapseError
+from synapse.api.errors import (
+ AuthError,
+ CodeMessageException,
+ Codes,
+ NotFoundError,
+ StoreError,
+ SynapseError,
+)
from synapse.types import RoomAlias, UserID, get_domain_from_id
from ._base import BaseHandler
@@ -109,7 +116,13 @@ class DirectoryHandler(BaseHandler):
def delete_association(self, requester, user_id, room_alias):
# association deletion for human users
- can_delete = yield self._user_can_delete_alias(room_alias, user_id)
+ try:
+ can_delete = yield self._user_can_delete_alias(room_alias, user_id)
+ except StoreError as e:
+ if e.code == 404:
+ raise NotFoundError("Unknown room alias")
+ raise
+
if not can_delete:
raise AuthError(
403, "You don't have permission to delete the alias.",
@@ -320,7 +333,7 @@ class DirectoryHandler(BaseHandler):
def _user_can_delete_alias(self, alias, user_id):
creator = yield self.store.get_room_alias_creator(alias.to_string())
- if creator and creator == user_id:
+ if creator is not None and creator == user_id:
defer.returnValue(True)
is_admin = yield self.auth.is_server_admin(UserID.from_string(user_id))
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 578e9250..9dc46aa1 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -341,7 +341,7 @@ class E2eKeysHandler(object):
def _exception_to_failure(e):
if isinstance(e, CodeMessageException):
return {
- "status": e.code, "message": e.message,
+ "status": e.code, "message": str(e),
}
if isinstance(e, NotRetryingDestination):
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 0c68e8a4..45d955e6 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -18,7 +18,6 @@
import itertools
import logging
-import sys
import six
from six import iteritems, itervalues
@@ -69,6 +68,27 @@ from ._base import BaseHandler
logger = logging.getLogger(__name__)
+def shortstr(iterable, maxitems=5):
+ """If iterable has maxitems or fewer, return the stringification of a list
+ containing those items.
+
+ Otherwise, return the stringification of a a list with the first maxitems items,
+ followed by "...".
+
+ Args:
+ iterable (Iterable): iterable to truncate
+ maxitems (int): number of items to return before truncating
+
+ Returns:
+ unicode
+ """
+
+ items = list(itertools.islice(iterable, maxitems + 1))
+ if len(items) <= maxitems:
+ return str(items)
+ return u"[" + u", ".join(repr(r) for r in items[:maxitems]) + u", ...]"
+
+
class FederationHandler(BaseHandler):
"""Handles events that originated from federation.
Responsible for:
@@ -85,7 +105,7 @@ class FederationHandler(BaseHandler):
self.hs = hs
- self.store = hs.get_datastore()
+ self.store = hs.get_datastore() # type: synapse.storage.DataStore
self.federation_client = hs.get_federation_client()
self.state_handler = hs.get_state_handler()
self.server_name = hs.hostname
@@ -114,9 +134,8 @@ class FederationHandler(BaseHandler):
self._room_pdu_linearizer = Linearizer("fed_room_pdu")
@defer.inlineCallbacks
- @log_function
def on_receive_pdu(
- self, origin, pdu, get_missing=True, sent_to_us_directly=False,
+ self, origin, pdu, sent_to_us_directly=False,
):
""" Process a PDU received via a federation /send/ transaction, or
via backfill of missing prev_events
@@ -125,14 +144,23 @@ class FederationHandler(BaseHandler):
origin (str): server which initiated the /send/ transaction. Will
be used to fetch missing events or state.
pdu (FrozenEvent): received PDU
- get_missing (bool): True if we should fetch missing prev_events
+ sent_to_us_directly (bool): True if this event was pushed to us; False if
+ we pulled it as the result of a missing prev_event.
Returns (Deferred): completes with None
"""
+ room_id = pdu.room_id
+ event_id = pdu.event_id
+
+ logger.info(
+ "[%s %s] handling received PDU: %s",
+ room_id, event_id, pdu,
+ )
+
# We reprocess pdus when we have seen them only as outliers
existing = yield self.store.get_event(
- pdu.event_id,
+ event_id,
allow_none=True,
allow_rejected=True,
)
@@ -147,7 +175,7 @@ class FederationHandler(BaseHandler):
)
)
if already_seen:
- logger.debug("Already seen pdu %s", pdu.event_id)
+ logger.debug("[%s %s]: Already seen pdu", room_id, event_id)
return
# do some initial sanity-checking of the event. In particular, make
@@ -156,6 +184,7 @@ class FederationHandler(BaseHandler):
try:
self._sanity_check_event(pdu)
except SynapseError as err:
+ logger.warn("[%s %s] Received event failed sanity checks", room_id, event_id)
raise FederationError(
"ERROR",
err.code,
@@ -165,10 +194,12 @@ class FederationHandler(BaseHandler):
# If we are currently in the process of joining this room, then we
# queue up events for later processing.
- if pdu.room_id in self.room_queues:
- logger.info("Ignoring PDU %s for room %s from %s for now; join "
- "in progress", pdu.event_id, pdu.room_id, origin)
- self.room_queues[pdu.room_id].append((pdu, origin))
+ if room_id in self.room_queues:
+ logger.info(
+ "[%s %s] Queuing PDU from %s for now: join in progress",
+ room_id, event_id, origin,
+ )
+ self.room_queues[room_id].append((pdu, origin))
return
# If we're no longer in the room just ditch the event entirely. This
@@ -179,7 +210,7 @@ class FederationHandler(BaseHandler):
# we should check if we *are* in fact in the room. If we are then we
# can magically rejoin the room.
is_in_room = yield self.auth.check_host_in_room(
- pdu.room_id,
+ room_id,
self.server_name
)
if not is_in_room:
@@ -188,8 +219,8 @@ class FederationHandler(BaseHandler):
)
if was_in_room:
logger.info(
- "Ignoring PDU %s for room %s from %s as we've left the room!",
- pdu.event_id, pdu.room_id, origin,
+ "[%s %s] Ignoring PDU from %s as we've left the room",
+ room_id, event_id, origin,
)
defer.returnValue(None)
@@ -204,8 +235,8 @@ class FederationHandler(BaseHandler):
)
logger.debug(
- "_handle_new_pdu min_depth for %s: %d",
- pdu.room_id, min_depth
+ "[%s %s] min_depth: %d",
+ room_id, event_id, min_depth,
)
prevs = {e_id for e_id, _ in pdu.prev_events}
@@ -218,17 +249,18 @@ class FederationHandler(BaseHandler):
# send to the clients.
pdu.internal_metadata.outlier = True
elif min_depth and pdu.depth > min_depth:
- if get_missing and prevs - seen:
+ missing_prevs = prevs - seen
+ if sent_to_us_directly and missing_prevs:
# If we're missing stuff, ensure we only fetch stuff one
# at a time.
logger.info(
- "Acquiring lock for room %r to fetch %d missing events: %r...",
- pdu.room_id, len(prevs - seen), list(prevs - seen)[:5],
+ "[%s %s] Acquiring room lock to fetch %d missing prev_events: %s",
+ room_id, event_id, len(missing_prevs), shortstr(missing_prevs),
)
with (yield self._room_pdu_linearizer.queue(pdu.room_id)):
logger.info(
- "Acquired lock for room %r to fetch %d missing events",
- pdu.room_id, len(prevs - seen),
+ "[%s %s] Acquired room lock to fetch %d missing prev_events",
+ room_id, event_id, len(missing_prevs),
)
yield self._get_missing_events_for_pdu(
@@ -241,69 +273,150 @@ class FederationHandler(BaseHandler):
if not prevs - seen:
logger.info(
- "Found all missing prev events for %s", pdu.event_id
+ "[%s %s] Found all missing prev_events",
+ room_id, event_id,
)
- elif prevs - seen:
+ elif missing_prevs:
logger.info(
- "Not fetching %d missing events for room %r,event %s: %r...",
- len(prevs - seen), pdu.room_id, pdu.event_id,
- list(prevs - seen)[:5],
+ "[%s %s] Not recursively fetching %d missing prev_events: %s",
+ room_id, event_id, len(missing_prevs), shortstr(missing_prevs),
)
- if sent_to_us_directly and prevs - seen:
- # If they have sent it to us directly, and the server
- # isn't telling us about the auth events that it's
- # made a message referencing, we explode
- raise FederationError(
- "ERROR",
- 403,
- (
- "Your server isn't divulging details about prev_events "
- "referenced in this event."
- ),
- affected=pdu.event_id,
- )
- elif prevs - seen:
- # Calculate the state of the previous events, and
- # de-conflict them to find the current state.
- state_groups = []
+ if prevs - seen:
+ # We've still not been able to get all of the prev_events for this event.
+ #
+ # In this case, we need to fall back to asking another server in the
+ # federation for the state at this event. That's ok provided we then
+ # resolve the state against other bits of the DAG before using it (which
+ # will ensure that you can't just take over a room by sending an event,
+ # withholding its prev_events, and declaring yourself to be an admin in
+ # the subsequent state request).
+ #
+ # Now, if we're pulling this event as a missing prev_event, then clearly
+ # this event is not going to become the only forward-extremity and we are
+ # guaranteed to resolve its state against our existing forward
+ # extremities, so that should be fine.
+ #
+ # On the other hand, if this event was pushed to us, it is possible for
+ # it to become the only forward-extremity in the room, and we would then
+ # trust its state to be the state for the whole room. This is very bad.
+ # Further, if the event was pushed to us, there is no excuse for us not to
+ # have all the prev_events. We therefore reject any such events.
+ #
+ # XXX this really feels like it could/should be merged with the above,
+ # but there is an interaction with min_depth that I'm not really
+ # following.
+
+ if sent_to_us_directly:
+ logger.warn(
+ "[%s %s] Failed to fetch %d prev events: rejecting",
+ room_id, event_id, len(prevs - seen),
+ )
+ raise FederationError(
+ "ERROR",
+ 403,
+ (
+ "Your server isn't divulging details about prev_events "
+ "referenced in this event."
+ ),
+ affected=pdu.event_id,
+ )
+
+ # Calculate the state after each of the previous events, and
+ # resolve them to find the correct state at the current event.
auth_chains = set()
+ event_map = {
+ event_id: pdu,
+ }
try:
# Get the state of the events we know about
- ours = yield self.store.get_state_groups(pdu.room_id, list(seen))
- state_groups.append(ours)
+ ours = yield self.store.get_state_groups_ids(room_id, seen)
+
+ # state_maps is a list of mappings from (type, state_key) to event_id
+ # type: list[dict[tuple[str, str], str]]
+ state_maps = list(ours.values())
+
+ # we don't need this any more, let's delete it.
+ del ours
# Ask the remote server for the states we don't
# know about
for p in prevs - seen:
- state, got_auth_chain = (
- yield self.federation_client.get_state_for_room(
- origin, pdu.room_id, p
- )
+ logger.info(
+ "[%s %s] Requesting state at missing prev_event %s",
+ room_id, event_id, p,
)
- auth_chains.update(got_auth_chain)
- state_group = {(x.type, x.state_key): x.event_id for x in state}
- state_groups.append(state_group)
+
+ with logcontext.nested_logging_context(p):
+ # note that if any of the missing prevs share missing state or
+ # auth events, the requests to fetch those events are deduped
+ # by the get_pdu_cache in federation_client.
+ remote_state, got_auth_chain = (
+ yield self.federation_client.get_state_for_room(
+ origin, room_id, p,
+ )
+ )
+
+ # we want the state *after* p; get_state_for_room returns the
+ # state *before* p.
+ remote_event = yield self.federation_client.get_pdu(
+ [origin], p, outlier=True,
+ )
+
+ if remote_event is None:
+ raise Exception(
+ "Unable to get missing prev_event %s" % (p, )
+ )
+
+ if remote_event.is_state():
+ remote_state.append(remote_event)
+
+ # XXX hrm I'm not convinced that duplicate events will compare
+ # for equality, so I'm not sure this does what the author
+ # hoped.
+ auth_chains.update(got_auth_chain)
+
+ remote_state_map = {
+ (x.type, x.state_key): x.event_id for x in remote_state
+ }
+ state_maps.append(remote_state_map)
+
+ for x in remote_state:
+ event_map[x.event_id] = x
# Resolve any conflicting state
+ @defer.inlineCallbacks
def fetch(ev_ids):
- return self.store.get_events(
- ev_ids, get_prev_content=False, check_redacted=False
+ fetched = yield self.store.get_events(
+ ev_ids, get_prev_content=False, check_redacted=False,
)
+ # add any events we fetch here to the `event_map` so that we
+ # can use them to build the state event list below.
+ event_map.update(fetched)
+ defer.returnValue(fetched)
- room_version = yield self.store.get_room_version(pdu.room_id)
+ room_version = yield self.store.get_room_version(room_id)
state_map = yield resolve_events_with_factory(
- room_version, state_groups, {pdu.event_id: pdu}, fetch
+ room_version, state_maps, event_map, fetch,
)
- state = (yield self.store.get_events(state_map.values())).values()
+ # we need to give _process_received_pdu the actual state events
+ # rather than event ids, so generate that now.
+ state = [
+ event_map[e] for e in six.itervalues(state_map)
+ ]
auth_chain = list(auth_chains)
except Exception:
+ logger.warn(
+ "[%s %s] Error attempting to resolve state at missing "
+ "prev_events",
+ room_id, event_id, exc_info=True,
+ )
raise FederationError(
"ERROR",
403,
"We can't get valid state history.",
- affected=pdu.event_id,
+ affected=event_id,
)
yield self._process_received_pdu(
@@ -322,15 +435,16 @@ class FederationHandler(BaseHandler):
prevs (set(str)): List of event ids which we are missing
min_depth (int): Minimum depth of events to return.
"""
- # We recalculate seen, since it may have changed.
+
+ room_id = pdu.room_id
+ event_id = pdu.event_id
+
seen = yield self.store.have_seen_events(prevs)
if not prevs - seen:
return
- latest = yield self.store.get_latest_event_ids_in_room(
- pdu.room_id
- )
+ latest = yield self.store.get_latest_event_ids_in_room(room_id)
# We add the prev events that we have seen to the latest
# list to ensure the remote server doesn't give them to us
@@ -338,8 +452,8 @@ class FederationHandler(BaseHandler):
latest |= seen
logger.info(
- "Missing %d events for room %r pdu %s: %r...",
- len(prevs - seen), pdu.room_id, pdu.event_id, list(prevs - seen)[:5]
+ "[%s %s]: Requesting %d prev_events: %s",
+ room_id, event_id, len(prevs - seen), shortstr(prevs - seen)
)
# XXX: we set timeout to 10s to help workaround
@@ -360,49 +474,88 @@ class FederationHandler(BaseHandler):
# apparently.
#
# see https://github.com/matrix-org/synapse/pull/1744
+ #
+ # ----
+ #
+ # Update richvdh 2018/09/18: There are a number of problems with timing this
+ # request out agressively on the client side:
+ #
+ # - it plays badly with the server-side rate-limiter, which starts tarpitting you
+ # if you send too many requests at once, so you end up with the server carefully
+ # working through the backlog of your requests, which you have already timed
+ # out.
+ #
+ # - for this request in particular, we now (as of
+ # https://github.com/matrix-org/synapse/pull/3456) reject any PDUs where the
+ # server can't produce a plausible-looking set of prev_events - so we becone
+ # much more likely to reject the event.
+ #
+ # - contrary to what it says above, we do *not* fall back to fetching fresh state
+ # for the room if get_missing_events times out. Rather, we give up processing
+ # the PDU whose prevs we are missing, which then makes it much more likely that
+ # we'll end up back here for the *next* PDU in the list, which exacerbates the
+ # problem.
+ #
+ # - the agressive 10s timeout was introduced to deal with incoming federation
+ # requests taking 8 hours to process. It's not entirely clear why that was going
+ # on; certainly there were other issues causing traffic storms which are now
+ # resolved, and I think in any case we may be more sensible about our locking
+ # now. We're *certainly* more sensible about our logging.
+ #
+ # All that said: Let's try increasing the timout to 60s and see what happens.
missing_events = yield self.federation_client.get_missing_events(
origin,
- pdu.room_id,
+ room_id,
earliest_events_ids=list(latest),
latest_events=[pdu],
limit=10,
min_depth=min_depth,
- timeout=10000,
+ timeout=60000,
)
logger.info(
- "Got %d events: %r...",
- len(missing_events), [e.event_id for e in missing_events[:5]]
+ "[%s %s]: Got %d prev_events: %s",
+ room_id, event_id, len(missing_events), shortstr(missing_events),
)
# We want to sort these by depth so we process them and
# tell clients about them in order.
missing_events.sort(key=lambda x: x.depth)
- for e in missing_events:
- logger.info("Handling found event %s", e.event_id)
- try:
- yield self.on_receive_pdu(
- origin,
- e,
- get_missing=False
- )
- except FederationError as e:
- if e.code == 403:
- logger.warn("Event %s failed history check.")
- else:
- raise
+ for ev in missing_events:
+ logger.info(
+ "[%s %s] Handling received prev_event %s",
+ room_id, event_id, ev.event_id,
+ )
+ with logcontext.nested_logging_context(ev.event_id):
+ try:
+ yield self.on_receive_pdu(
+ origin,
+ ev,
+ sent_to_us_directly=False,
+ )
+ except FederationError as e:
+ if e.code == 403:
+ logger.warn(
+ "[%s %s] Received prev_event %s failed history check.",
+ room_id, event_id, ev.event_id,
+ )
+ else:
+ raise
- @log_function
@defer.inlineCallbacks
- def _process_received_pdu(self, origin, pdu, state, auth_chain):
+ def _process_received_pdu(self, origin, event, state, auth_chain):
""" Called when we have a new pdu. We need to do auth checks and put it
through the StateHandler.
"""
- event = pdu
+ room_id = event.room_id
+ event_id = event.event_id
- logger.debug("Processing event: %s", event)
+ logger.debug(
+ "[%s %s] Processing event: %s",
+ room_id, event_id, event,
+ )
# FIXME (erikj): Awful hack to make the case where we are not currently
# in the room work
@@ -411,15 +564,16 @@ class FederationHandler(BaseHandler):
# event.
if state and auth_chain and not event.internal_metadata.is_outlier():
is_in_room = yield self.auth.check_host_in_room(
- event.room_id,
+ room_id,
self.server_name
)
else:
is_in_room = True
+
if not is_in_room:
logger.info(
- "Got event for room we're not in: %r %r",
- event.room_id, event.event_id
+ "[%s %s] Got event for room we're not in",
+ room_id, event_id,
)
try:
@@ -431,7 +585,7 @@ class FederationHandler(BaseHandler):
"ERROR",
e.code,
e.msg,
- affected=event.event_id,
+ affected=event_id,
)
else:
@@ -464,6 +618,10 @@ class FederationHandler(BaseHandler):
})
seen_ids.add(e.event_id)
+ logger.info(
+ "[%s %s] persisting newly-received auth/state events %s",
+ room_id, event_id, [e["event"].event_id for e in event_infos]
+ )
yield self._handle_new_events(origin, event_infos)
try:
@@ -480,12 +638,12 @@ class FederationHandler(BaseHandler):
affected=event.event_id,
)
- room = yield self.store.get_room(event.room_id)
+ room = yield self.store.get_room(room_id)
if not room:
try:
yield self.store.store_room(
- room_id=event.room_id,
+ room_id=room_id,
room_creator_user_id="",
is_public=False,
)
@@ -513,7 +671,7 @@ class FederationHandler(BaseHandler):
if newly_joined:
user = UserID.from_string(event.state_key)
- yield self.user_joined_room(user, event.room_id)
+ yield self.user_joined_room(user, room_id)
@log_function
@defer.inlineCallbacks
@@ -1027,7 +1185,8 @@ class FederationHandler(BaseHandler):
try:
logger.info("Processing queued PDU %s which was received "
"while we were joining %s", p.event_id, p.room_id)
- yield self.on_receive_pdu(origin, p)
+ with logcontext.nested_logging_context(p.event_id):
+ yield self.on_receive_pdu(origin, p, sent_to_us_directly=True)
except Exception as e:
logger.warn(
"Error handling queued PDU %s from %s: %s",
@@ -1430,12 +1589,10 @@ class FederationHandler(BaseHandler):
else:
defer.returnValue(None)
- @log_function
def get_min_depth_for_context(self, context):
return self.store.get_min_depth(context)
@defer.inlineCallbacks
- @log_function
def _handle_new_event(self, origin, event, state=None, auth_events=None,
backfilled=False):
context = yield self._prep_event(
@@ -1444,6 +1601,9 @@ class FederationHandler(BaseHandler):
auth_events=auth_events,
)
+ # reraise does not allow inlineCallbacks to preserve the stacktrace, so we
+ # hack around with a try/finally instead.
+ success = False
try:
if not event.internal_metadata.is_outlier() and not backfilled:
yield self.action_generator.handle_push_actions_for_event(
@@ -1454,15 +1614,13 @@ class FederationHandler(BaseHandler):
[(event, context)],
backfilled=backfilled,
)
- except: # noqa: E722, as we reraise the exception this is fine.
- tp, value, tb = sys.exc_info()
-
- logcontext.run_in_background(
- self.store.remove_push_actions_from_staging,
- event.event_id,
- )
-
- six.reraise(tp, value, tb)
+ success = True
+ finally:
+ if not success:
+ logcontext.run_in_background(
+ self.store.remove_push_actions_from_staging,
+ event.event_id,
+ )
defer.returnValue(context)
@@ -1475,15 +1633,22 @@ class FederationHandler(BaseHandler):
Notifies about the events where appropriate.
"""
- contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
- [
- logcontext.run_in_background(
- self._prep_event,
+
+ @defer.inlineCallbacks
+ def prep(ev_info):
+ event = ev_info["event"]
+ with logcontext.nested_logging_context(suffix=event.event_id):
+ res = yield self._prep_event(
origin,
- ev_info["event"],
+ event,
state=ev_info.get("state"),
auth_events=ev_info.get("auth_events"),
)
+ defer.returnValue(res)
+
+ contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
+ [
+ logcontext.run_in_background(prep, ev_info)
for ev_info in event_infos
], consumeErrors=True,
))
@@ -1635,8 +1800,8 @@ class FederationHandler(BaseHandler):
)
except AuthError as e:
logger.warn(
- "Rejecting %s because %s",
- event.event_id, e.msg
+ "[%s %s] Rejecting: %s",
+ event.room_id, event.event_id, e.msg
)
context.rejected = RejectedReason.AUTH_ERROR
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index e484061c..4954b23a 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -14,9 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-import sys
-import six
from six import iteritems, itervalues, string_types
from canonicaljson import encode_canonical_json, json
@@ -624,6 +622,9 @@ class EventCreationHandler(object):
event, context
)
+ # reraise does not allow inlineCallbacks to preserve the stacktrace, so we
+ # hack around with a try/finally instead.
+ success = False
try:
# If we're a worker we need to hit out to the master.
if self.config.worker_app:
@@ -636,6 +637,7 @@ class EventCreationHandler(object):
ratelimit=ratelimit,
extra_users=extra_users,
)
+ success = True
return
yield self.persist_and_notify_client_event(
@@ -645,17 +647,16 @@ class EventCreationHandler(object):
ratelimit=ratelimit,
extra_users=extra_users,
)
- except: # noqa: E722, as we reraise the exception this is fine.
- # Ensure that we actually remove the entries in the push actions
- # staging area, if we calculated them.
- tp, value, tb = sys.exc_info()
-
- run_in_background(
- self.store.remove_push_actions_from_staging,
- event.event_id,
- )
- six.reraise(tp, value, tb)
+ success = True
+ finally:
+ if not success:
+ # Ensure that we actually remove the entries in the push actions
+ # staging area, if we calculated them.
+ run_in_background(
+ self.store.remove_push_actions_from_staging,
+ event.event_id,
+ )
@defer.inlineCallbacks
def persist_and_notify_client_event(
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 75b8b7ce..f284d5a3 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -278,7 +278,7 @@ class BaseProfileHandler(BaseHandler):
except Exception as e:
logger.warn(
"Failed to update join event for room %s - %s",
- room_id, str(e.message)
+ room_id, str(e)
)
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 1e53f2c6..da914c46 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -534,4 +534,5 @@ class RegistrationHandler(BaseHandler):
room_id=room_id,
remote_room_hosts=remote_room_hosts,
action="join",
+ ratelimit=False,
)
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index f6436190..07fd3e82 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -583,6 +583,11 @@ class RoomMemberHandler(object):
room_id = mapping["room_id"]
servers = mapping["servers"]
+ # put the server which owns the alias at the front of the server list.
+ if room_alias.domain in servers:
+ servers.remove(room_alias.domain)
+ servers.insert(0, room_alias.domain)
+
defer.returnValue((RoomID.from_string(room_id), servers))
@defer.inlineCallbacks
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 9bca4e70..67b8ca28 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -567,13 +567,13 @@ class SyncHandler(object):
# be a valid name or canonical_alias - i.e. we're checking that they
# haven't been "deleted" by blatting {} over the top.
if name_id:
- name = yield self.store.get_event(name_id, allow_none=False)
+ name = yield self.store.get_event(name_id, allow_none=True)
if name and name.content:
defer.returnValue(summary)
if canonical_alias_id:
canonical_alias = yield self.store.get_event(
- canonical_alias_id, allow_none=False,
+ canonical_alias_id, allow_none=True,
)
if canonical_alias and canonical_alias.content:
defer.returnValue(summary)
@@ -722,6 +722,13 @@ class SyncHandler(object):
}
if full_state:
+ if lazy_load_members:
+ # always make sure we LL ourselves so we know we're in the room
+ # (if we are) to fix https://github.com/vector-im/riot-web/issues/7209
+ # We only need apply this on full state syncs given we disabled
+ # LL for incr syncs in #3840.
+ types.append((EventTypes.Member, sync_config.user.to_string()))
+
if batch:
current_state_ids = yield self.store.get_state_ids_for_event(
batch.events[-1].event_id, types=types,
@@ -790,7 +797,7 @@ class SyncHandler(object):
else:
state_ids = {}
if lazy_load_members:
- if types:
+ if types and batch.events:
# We're returning an incremental sync, with no
# "gap" since the previous sync, so normally there would be
# no state to return.
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 2d2d3d5a..c610933d 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -20,6 +20,7 @@ from twisted.internet import defer
from synapse.api.errors import AuthError, SynapseError
from synapse.types import UserID, get_domain_from_id
+from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.logcontext import run_in_background
from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer
@@ -68,6 +69,11 @@ class TypingHandler(object):
# map room IDs to sets of users currently typing
self._room_typing = {}
+ # caches which room_ids changed at which serials
+ self._typing_stream_change_cache = StreamChangeCache(
+ "TypingStreamChangeCache", self._latest_room_serial,
+ )
+
self.clock.looping_call(
self._handle_timeouts,
5000,
@@ -218,6 +224,7 @@ class TypingHandler(object):
for domain in set(get_domain_from_id(u) for u in users):
if domain != self.server_name:
+ logger.debug("sending typing update to %s", domain)
self.federation.send_edu(
destination=domain,
edu_type="m.typing",
@@ -274,19 +281,29 @@ class TypingHandler(object):
self._latest_room_serial += 1
self._room_serials[member.room_id] = self._latest_room_serial
+ self._typing_stream_change_cache.entity_has_changed(
+ member.room_id, self._latest_room_serial,
+ )
self.notifier.on_new_event(
"typing_key", self._latest_room_serial, rooms=[member.room_id]
)
def get_all_typing_updates(self, last_id, current_id):
- # TODO: Work out a way to do this without scanning the entire state.
if last_id == current_id:
return []
+ changed_rooms = self._typing_stream_change_cache.get_all_entities_changed(
+ last_id,
+ )
+
+ if changed_rooms is None:
+ changed_rooms = self._room_serials
+
rows = []
- for room_id, serial in self._room_serials.items():
- if last_id < serial and serial <= current_id:
+ for room_id in changed_rooms:
+ serial = self._room_serials[room_id]
+ if last_id < serial <= current_id:
typing = self._room_typing[room_id]
rows.append((serial, room_id, list(typing)))
rows.sort()
diff --git a/synapse/http/client.py b/synapse/http/client.py
index ec339a92..3d05f83b 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -43,7 +43,7 @@ from twisted.web.http_headers import Headers
from synapse.api.errors import Codes, HttpResponseException, SynapseError
from synapse.http import cancelled_to_request_timed_out_error, redact_uri
from synapse.http.endpoint import SpiderEndpoint
-from synapse.util.async_helpers import add_timeout_to_deferred
+from synapse.util.async_helpers import timeout_deferred
from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.logcontext import make_deferred_yieldable
@@ -99,7 +99,7 @@ class SimpleHttpClient(object):
request_deferred = treq.request(
method, uri, agent=self.agent, data=data, headers=headers
)
- add_timeout_to_deferred(
+ request_deferred = timeout_deferred(
request_deferred, 60, self.hs.get_reactor(),
cancelled_to_request_timed_out_error,
)
diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py
index b0c93695..91025037 100644
--- a/synapse/http/endpoint.py
+++ b/synapse/http/endpoint.py
@@ -108,7 +108,7 @@ def matrix_federation_endpoint(reactor, destination, tls_client_options_factory=
Args:
reactor: Twisted reactor.
- destination (bytes): The name of the server to connect to.
+ destination (unicode): The name of the server to connect to.
tls_client_options_factory
(synapse.crypto.context_factory.ClientTLSOptionsFactory):
Factory which generates TLS options for client connections.
@@ -126,10 +126,17 @@ def matrix_federation_endpoint(reactor, destination, tls_client_options_factory=
transport_endpoint = HostnameEndpoint
default_port = 8008
else:
+ # the SNI string should be the same as the Host header, minus the port.
+ # as per https://github.com/matrix-org/synapse/issues/2525#issuecomment-336896777,
+ # the Host header and SNI should therefore be the server_name of the remote
+ # server.
+ tls_options = tls_client_options_factory.get_options(domain)
+
def transport_endpoint(reactor, host, port, timeout):
return wrapClientTLS(
- tls_client_options_factory.get_options(host),
- HostnameEndpoint(reactor, host, port, timeout=timeout))
+ tls_options,
+ HostnameEndpoint(reactor, host, port, timeout=timeout),
+ )
default_port = 8448
if port is None:
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 13b19f76..14b12cd1 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -17,10 +17,12 @@ import cgi
import logging
import random
import sys
+from io import BytesIO
from six import PY3, string_types
from six.moves import urllib
+import attr
import treq
from canonicaljson import encode_canonical_json
from prometheus_client import Counter
@@ -28,8 +30,9 @@ from signedjson.sign import sign_json
from twisted.internet import defer, protocol
from twisted.internet.error import DNSLookupError
+from twisted.internet.task import _EPSILON, Cooperator
from twisted.web._newclient import ResponseDone
-from twisted.web.client import Agent, HTTPConnectionPool
+from twisted.web.client import Agent, FileBodyProducer, HTTPConnectionPool
from twisted.web.http_headers import Headers
import synapse.metrics
@@ -41,13 +44,11 @@ from synapse.api.errors import (
SynapseError,
)
from synapse.http.endpoint import matrix_federation_endpoint
-from synapse.util import logcontext
-from synapse.util.async_helpers import timeout_no_seriously
+from synapse.util.async_helpers import timeout_deferred
from synapse.util.logcontext import make_deferred_yieldable
from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
-outbound_logger = logging.getLogger("synapse.http.outbound")
outgoing_requests_counter = Counter("synapse_http_matrixfederationclient_requests",
"", ["method"])
@@ -78,6 +79,99 @@ class MatrixFederationEndpointFactory(object):
)
+_next_id = 1
+
+
+@attr.s
+class MatrixFederationRequest(object):
+ method = attr.ib()
+ """HTTP method
+ :type: str
+ """
+
+ path = attr.ib()
+ """HTTP path
+ :type: str
+ """
+
+ destination = attr.ib()
+ """The remote server to send the HTTP request to.
+ :type: str"""
+
+ json = attr.ib(default=None)
+ """JSON to send in the body.
+ :type: dict|None
+ """
+
+ json_callback = attr.ib(default=None)
+ """A callback to generate the JSON.
+ :type: func|None
+ """
+
+ query = attr.ib(default=None)
+ """Query arguments.
+ :type: dict|None
+ """
+
+ txn_id = attr.ib(default=None)
+ """Unique ID for this request (for logging)
+ :type: str|None
+ """
+
+ def __attrs_post_init__(self):
+ global _next_id
+ self.txn_id = "%s-O-%s" % (self.method, _next_id)
+ _next_id = (_next_id + 1) % (MAXINT - 1)
+
+ def get_json(self):
+ if self.json_callback:
+ return self.json_callback()
+ return self.json
+
+
+@defer.inlineCallbacks
+def _handle_json_response(reactor, timeout_sec, request, response):
+ """
+ Reads the JSON body of a response, with a timeout
+
+ Args:
+ reactor (IReactor): twisted reactor, for the timeout
+ timeout_sec (float): number of seconds to wait for response to complete
+ request (MatrixFederationRequest): the request that triggered the response
+ response (IResponse): response to the request
+
+ Returns:
+ dict: parsed JSON response
+ """
+ try:
+ check_content_type_is_json(response.headers)
+
+ d = treq.json_content(response)
+ d = timeout_deferred(
+ d,
+ timeout=timeout_sec,
+ reactor=reactor,
+ )
+
+ body = yield make_deferred_yieldable(d)
+ except Exception as e:
+ logger.warn(
+ "{%s} [%s] Error reading response: %s",
+ request.txn_id,
+ request.destination,
+ e,
+ )
+ raise
+ logger.info(
+ "{%s} [%s] Completed: %d %s",
+ request.txn_id,
+ request.destination,
+ response.code,
+ response.phrase.decode('ascii', errors='replace'),
+ )
+ defer.returnValue(body)
+
+
class MatrixFederationHttpClient(object):
"""HTTP client used to talk to other homeservers over the federation
protocol. Send client certificates and signs requests.
@@ -102,34 +196,35 @@ class MatrixFederationHttpClient(object):
self.clock = hs.get_clock()
self._store = hs.get_datastore()
self.version_string = hs.version_string.encode('ascii')
- self._next_id = 1
self.default_timeout = 60
- def _create_url(self, destination, path_bytes, param_bytes, query_bytes):
- return urllib.parse.urlunparse(
- (b"matrix", destination, path_bytes, param_bytes, query_bytes, b"")
- )
+ def schedule(x):
+ reactor.callLater(_EPSILON, x)
+
+ self._cooperator = Cooperator(scheduler=schedule)
@defer.inlineCallbacks
- def _request(self, destination, method, path,
- json=None, json_callback=None,
- param_bytes=b"",
- query=None, retry_on_dns_fail=True,
- timeout=None, long_retries=False,
- ignore_backoff=False,
- backoff_on_404=False):
+ def _send_request(
+ self,
+ request,
+ retry_on_dns_fail=True,
+ timeout=None,
+ long_retries=False,
+ ignore_backoff=False,
+ backoff_on_404=False
+ ):
"""
- Creates and sends a request to the given server.
+ Sends a request to the given server.
Args:
- destination (str): The remote server to send the HTTP request to.
- method (str): HTTP method
- path (str): The HTTP path
- json (dict or None): JSON to send in the body.
- json_callback (func or None): A callback to generate the JSON.
- query (dict or None): Query arguments.
+ request (MatrixFederationRequest): details of request to be sent
+
+ timeout (int|None): number of milliseconds to wait for the response headers
+ (including connecting to the server). 60s by default.
+
ignore_backoff (bool): true to ignore the historical backoff data
and try the request anyway.
+
backoff_on_404 (bool): Back off if we get a 404
Returns:
@@ -154,38 +249,32 @@ class MatrixFederationHttpClient(object):
if (
self.hs.config.federation_domain_whitelist is not None and
- destination not in self.hs.config.federation_domain_whitelist
+ request.destination not in self.hs.config.federation_domain_whitelist
):
- raise FederationDeniedError(destination)
+ raise FederationDeniedError(request.destination)
limiter = yield synapse.util.retryutils.get_retry_limiter(
- destination,
+ request.destination,
self.clock,
self._store,
backoff_on_404=backoff_on_404,
ignore_backoff=ignore_backoff,
)
- headers_dict = {}
- path_bytes = path.encode("ascii")
- if query:
- query_bytes = encode_query_args(query)
+ method = request.method
+ destination = request.destination
+ path_bytes = request.path.encode("ascii")
+ if request.query:
+ query_bytes = encode_query_args(request.query)
else:
query_bytes = b""
headers_dict = {
"User-Agent": [self.version_string],
- "Host": [destination],
+ "Host": [request.destination],
}
with limiter:
- url = self._create_url(
- destination.encode("ascii"), path_bytes, param_bytes, query_bytes
- ).decode('ascii')
-
- txn_id = "%s-O-%s" % (method, self._next_id)
- self._next_id = (self._next_id + 1) % (MAXINT - 1)
-
# XXX: Would be much nicer to retry only at the transaction-layer
# (once we have reliable transactions in place)
if long_retries:
@@ -193,16 +282,19 @@ class MatrixFederationHttpClient(object):
else:
retries_left = MAX_SHORT_RETRIES
- http_url = urllib.parse.urlunparse(
- (b"", b"", path_bytes, param_bytes, query_bytes, b"")
- ).decode('ascii')
+ url = urllib.parse.urlunparse((
+ b"matrix", destination.encode("ascii"),
+ path_bytes, None, query_bytes, b"",
+ )).decode('ascii')
+
+ http_url = urllib.parse.urlunparse((
+ b"", b"",
+ path_bytes, None, query_bytes, b"",
+ )).decode('ascii')
- log_result = None
while True:
try:
- if json_callback:
- json = json_callback()
-
+ json = request.get_json()
if json:
data = encode_canonical_json(json)
headers_dict["Content-Type"] = ["application/json"]
@@ -213,29 +305,32 @@ class MatrixFederationHttpClient(object):
data = None
self.sign_request(destination, method, http_url, headers_dict)
- outbound_logger.info(
+ logger.info(
"{%s} [%s] Sending request: %s %s",
- txn_id, destination, method, url
+ request.txn_id, destination, method, url
)
+ if data:
+ producer = FileBodyProducer(
+ BytesIO(data),
+ cooperator=self._cooperator
+ )
+ else:
+ producer = None
+
request_deferred = treq.request(
method,
url,
headers=Headers(headers_dict),
- data=data,
+ data=producer,
agent=self.agent,
reactor=self.hs.get_reactor(),
unbuffered=True
)
- request_deferred.addTimeout(_sec_timeout, self.hs.get_reactor())
- # Sometimes the timeout above doesn't work, so lets hack yet
- # another layer of timeouts in in the vain hope that at some
- # point the world made sense and this really really really
- # should work.
- request_deferred = timeout_no_seriously(
+ request_deferred = timeout_deferred(
request_deferred,
- timeout=_sec_timeout * 2,
+ timeout=_sec_timeout,
reactor=self.hs.get_reactor(),
)
@@ -244,30 +339,19 @@ class MatrixFederationHttpClient(object):
request_deferred,
)
- log_result = "%d %s" % (response.code, response.phrase,)
break
except Exception as e:
- if not retry_on_dns_fail and isinstance(e, DNSLookupError):
- logger.warn(
- "DNS Lookup failed to %s with %s",
- destination,
- e
- )
- log_result = "DNS Lookup failed to %s with %s" % (
- destination, e
- )
- raise
-
logger.warn(
- "{%s} Sending request failed to %s: %s %s: %s",
- txn_id,
+ "{%s} [%s] Request failed: %s %s: %s",
+ request.txn_id,
destination,
method,
url,
_flatten_response_never_received(e),
)
- log_result = _flatten_response_never_received(e)
+ if not retry_on_dns_fail and isinstance(e, DNSLookupError):
+ raise
if retries_left and not timeout:
if long_retries:
@@ -280,33 +364,37 @@ class MatrixFederationHttpClient(object):
delay *= random.uniform(0.8, 1.4)
logger.debug(
- "{%s} Waiting %s before sending to %s...",
- txn_id,
+ "{%s} [%s] Waiting %ss before re-sending...",
+ request.txn_id,
+ destination,
delay,
- destination
)
yield self.clock.sleep(delay)
retries_left -= 1
else:
raise
- finally:
- outbound_logger.info(
- "{%s} [%s] Result: %s",
- txn_id,
- destination,
- log_result,
- )
+
+ logger.info(
+ "{%s} [%s] Got response headers: %d %s",
+ request.txn_id,
+ destination,
+ response.code,
+ response.phrase.decode('ascii', errors='replace'),
+ )
if 200 <= response.code < 300:
pass
else:
# :'(
# Update transactions table?
- with logcontext.PreserveLoggingContext():
- d = treq.content(response)
- d.addTimeout(_sec_timeout, self.hs.get_reactor())
- body = yield make_deferred_yieldable(d)
+ d = treq.content(response)
+ d = timeout_deferred(
+ d,
+ timeout=_sec_timeout,
+ reactor=self.hs.get_reactor(),
+ )
+ body = yield make_deferred_yieldable(d)
raise HttpResponseException(
response.code, response.phrase, body
)
@@ -400,29 +488,26 @@ class MatrixFederationHttpClient(object):
is not on our federation whitelist
"""
- if not json_data_callback:
- json_data_callback = lambda: data
-
- response = yield self._request(
- destination,
- "PUT",
- path,
- json_callback=json_data_callback,
+ request = MatrixFederationRequest(
+ method="PUT",
+ destination=destination,
+ path=path,
query=args,
+ json_callback=json_data_callback,
+ json=data,
+ )
+
+ response = yield self._send_request(
+ request,
long_retries=long_retries,
timeout=timeout,
ignore_backoff=ignore_backoff,
backoff_on_404=backoff_on_404,
)
- if 200 <= response.code < 300:
- # We need to update the transactions table to say it was sent?
- check_content_type_is_json(response.headers)
-
- with logcontext.PreserveLoggingContext():
- d = treq.json_content(response)
- d.addTimeout(self.default_timeout, self.hs.get_reactor())
- body = yield make_deferred_yieldable(d)
+ body = yield _handle_json_response(
+ self.hs.get_reactor(), self.default_timeout, request, response,
+ )
defer.returnValue(body)
@defer.inlineCallbacks
@@ -456,31 +541,30 @@ class MatrixFederationHttpClient(object):
Fails with ``FederationDeniedError`` if this destination
is not on our federation whitelist
"""
- response = yield self._request(
- destination,
- "POST",
- path,
+
+ request = MatrixFederationRequest(
+ method="POST",
+ destination=destination,
+ path=path,
query=args,
json=data,
+ )
+
+ response = yield self._send_request(
+ request,
long_retries=long_retries,
timeout=timeout,
ignore_backoff=ignore_backoff,
)
- if 200 <= response.code < 300:
- # We need to update the transactions table to say it was sent?
- check_content_type_is_json(response.headers)
-
- with logcontext.PreserveLoggingContext():
- d = treq.json_content(response)
- if timeout:
- _sec_timeout = timeout / 1000
- else:
- _sec_timeout = self.default_timeout
-
- d.addTimeout(_sec_timeout, self.hs.get_reactor())
- body = yield make_deferred_yieldable(d)
+ if timeout:
+ _sec_timeout = timeout / 1000
+ else:
+ _sec_timeout = self.default_timeout
+ body = yield _handle_json_response(
+ self.hs.get_reactor(), _sec_timeout, request, response,
+ )
defer.returnValue(body)
@defer.inlineCallbacks
@@ -516,25 +600,23 @@ class MatrixFederationHttpClient(object):
logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
- response = yield self._request(
- destination,
- "GET",
- path,
+ request = MatrixFederationRequest(
+ method="GET",
+ destination=destination,
+ path=path,
query=args,
+ )
+
+ response = yield self._send_request(
+ request,
retry_on_dns_fail=retry_on_dns_fail,
timeout=timeout,
ignore_backoff=ignore_backoff,
)
- if 200 <= response.code < 300:
- # We need to update the transactions table to say it was sent?
- check_content_type_is_json(response.headers)
-
- with logcontext.PreserveLoggingContext():
- d = treq.json_content(response)
- d.addTimeout(self.default_timeout, self.hs.get_reactor())
- body = yield make_deferred_yieldable(d)
-
+ body = yield _handle_json_response(
+ self.hs.get_reactor(), self.default_timeout, request, response,
+ )
defer.returnValue(body)
@defer.inlineCallbacks
@@ -565,25 +647,23 @@ class MatrixFederationHttpClient(object):
Fails with ``FederationDeniedError`` if this destination
is not on our federation whitelist
"""
- response = yield self._request(
- destination,
- "DELETE",
- path,
+ request = MatrixFederationRequest(
+ method="DELETE",
+ destination=destination,
+ path=path,
query=args,
+ )
+
+ response = yield self._send_request(
+ request,
long_retries=long_retries,
timeout=timeout,
ignore_backoff=ignore_backoff,
)
- if 200 <= response.code < 300:
- # We need to update the transactions table to say it was sent?
- check_content_type_is_json(response.headers)
-
- with logcontext.PreserveLoggingContext():
- d = treq.json_content(response)
- d.addTimeout(self.default_timeout, self.hs.get_reactor())
- body = yield make_deferred_yieldable(d)
-
+ body = yield _handle_json_response(
+ self.hs.get_reactor(), self.default_timeout, request, response,
+ )
defer.returnValue(body)
@defer.inlineCallbacks
@@ -611,11 +691,15 @@ class MatrixFederationHttpClient(object):
Fails with ``FederationDeniedError`` if this destination
is not on our federation whitelist
"""
- response = yield self._request(
- destination,
- "GET",
- path,
+ request = MatrixFederationRequest(
+ method="GET",
+ destination=destination,
+ path=path,
query=args,
+ )
+
+ response = yield self._send_request(
+ request,
retry_on_dns_fail=retry_on_dns_fail,
ignore_backoff=ignore_backoff,
)
@@ -623,14 +707,25 @@ class MatrixFederationHttpClient(object):
headers = dict(response.headers.getAllRawHeaders())
try:
- with logcontext.PreserveLoggingContext():
- d = _readBodyToFile(response, output_stream, max_size)
- d.addTimeout(self.default_timeout, self.hs.get_reactor())
- length = yield make_deferred_yieldable(d)
- except Exception:
- logger.exception("Failed to download body")
+ d = _readBodyToFile(response, output_stream, max_size)
+ d.addTimeout(self.default_timeout, self.hs.get_reactor())
+ length = yield make_deferred_yieldable(d)
+ except Exception as e:
+ logger.warn(
+ "{%s} [%s] Error reading response: %s",
+ request.txn_id,
+ request.destination,
+ e,
+ )
raise
-
+ logger.info(
+ "{%s} [%s] Completed: %d %s [%d bytes]",
+ request.txn_id,
+ request.destination,
+ response.code,
+ response.phrase.decode('ascii', errors='replace'),
+ length,
+ )
defer.returnValue((length, headers))
diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py
index 72c26546..fedb4e6b 100644
--- a/synapse/http/request_metrics.py
+++ b/synapse/http/request_metrics.py
@@ -162,7 +162,7 @@ class RequestMetrics(object):
with _in_flight_requests_lock:
_in_flight_requests.add(self)
- def stop(self, time_sec, request):
+ def stop(self, time_sec, response_code, sent_bytes):
with _in_flight_requests_lock:
_in_flight_requests.discard(self)
@@ -179,35 +179,35 @@ class RequestMetrics(object):
)
return
- response_code = str(request.code)
+ response_code = str(response_code)
- outgoing_responses_counter.labels(request.method, response_code).inc()
+ outgoing_responses_counter.labels(self.method, response_code).inc()
- response_count.labels(request.method, self.name, tag).inc()
+ response_count.labels(self.method, self.name, tag).inc()
- response_timer.labels(request.method, self.name, tag, response_code).observe(
+ response_timer.labels(self.method, self.name, tag, response_code).observe(
time_sec - self.start
)
resource_usage = context.get_resource_usage()
- response_ru_utime.labels(request.method, self.name, tag).inc(
+ response_ru_utime.labels(self.method, self.name, tag).inc(
resource_usage.ru_utime,
)
- response_ru_stime.labels(request.method, self.name, tag).inc(
+ response_ru_stime.labels(self.method, self.name, tag).inc(
resource_usage.ru_stime,
)
- response_db_txn_count.labels(request.method, self.name, tag).inc(
+ response_db_txn_count.labels(self.method, self.name, tag).inc(
resource_usage.db_txn_count
)
- response_db_txn_duration.labels(request.method, self.name, tag).inc(
+ response_db_txn_duration.labels(self.method, self.name, tag).inc(
resource_usage.db_txn_duration_sec
)
- response_db_sched_duration.labels(request.method, self.name, tag).inc(
+ response_db_sched_duration.labels(self.method, self.name, tag).inc(
resource_usage.db_sched_duration_sec
)
- response_size.labels(request.method, self.name, tag).inc(request.sentLength)
+ response_size.labels(self.method, self.name, tag).inc(sent_bytes)
# We always call this at the end to ensure that we update the metrics
# regardless of whether a call to /metrics while the request was in
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 2d5c23e6..b4b25cab 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -84,10 +84,21 @@ def wrap_json_request_handler(h):
logger.info(
"%s SynapseError: %s - %s", request, code, e.msg
)
- respond_with_json(
- request, code, e.error_dict(), send_cors=True,
- pretty_print=_request_user_agent_is_curl(request),
- )
+
+ # Only respond with an error response if we haven't already started
+ # writing, otherwise lets just kill the connection
+ if request.startedWriting:
+ if request.transport:
+ try:
+ request.transport.abortConnection()
+ except Exception:
+ # abortConnection throws if the connection is already closed
+ pass
+ else:
+ respond_with_json(
+ request, code, e.error_dict(), send_cors=True,
+ pretty_print=_request_user_agent_is_curl(request),
+ )
except Exception:
# failure.Failure() fishes the original Failure out
@@ -100,16 +111,26 @@ def wrap_json_request_handler(h):
request,
f.getTraceback().rstrip(),
)
- respond_with_json(
- request,
- 500,
- {
- "error": "Internal server error",
- "errcode": Codes.UNKNOWN,
- },
- send_cors=True,
- pretty_print=_request_user_agent_is_curl(request),
- )
+ # Only respond with an error response if we haven't already started
+ # writing, otherwise lets just kill the connection
+ if request.startedWriting:
+ if request.transport:
+ try:
+ request.transport.abortConnection()
+ except Exception:
+ # abortConnection throws if the connection is already closed
+ pass
+ else:
+ respond_with_json(
+ request,
+ 500,
+ {
+ "error": "Internal server error",
+ "errcode": Codes.UNKNOWN,
+ },
+ send_cors=True,
+ pretty_print=_request_user_agent_is_curl(request),
+ )
return wrap_async_request_handler(wrapped_request_handler)
diff --git a/synapse/http/site.py b/synapse/http/site.py
index e1e53e8a..e508c0bd 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -75,14 +75,14 @@ class SynapseRequest(Request):
return '<%s at 0x%x method=%r uri=%r clientproto=%r site=%r>' % (
self.__class__.__name__,
id(self),
- self.method,
+ self.get_method(),
self.get_redacted_uri(),
- self.clientproto,
+ self.clientproto.decode('ascii', errors='replace'),
self.site.site_tag,
)
def get_request_id(self):
- return "%s-%i" % (self.method, self.request_seq)
+ return "%s-%i" % (self.get_method(), self.request_seq)
def get_redacted_uri(self):
uri = self.uri
@@ -90,6 +90,21 @@ class SynapseRequest(Request):
uri = self.uri.decode('ascii')
return redact_uri(uri)
+ def get_method(self):
+ """Gets the method associated with the request (or placeholder if not
+ method has yet been received).
+
+ Note: This is necessary as the placeholder value in twisted is str
+ rather than bytes, so we need to sanitise `self.method`.
+
+ Returns:
+ str
+ """
+ method = self.method
+ if isinstance(method, bytes):
+ method = self.method.decode('ascii')
+ return method
+
def get_user_agent(self):
return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1]
@@ -119,7 +134,7 @@ class SynapseRequest(Request):
# dispatching to the handler, so that the handler
# can update the servlet name in the request
# metrics
- requests_counter.labels(self.method,
+ requests_counter.labels(self.get_method(),
self.request_metrics.name).inc()
@contextlib.contextmanager
@@ -207,14 +222,14 @@ class SynapseRequest(Request):
self.start_time = time.time()
self.request_metrics = RequestMetrics()
self.request_metrics.start(
- self.start_time, name=servlet_name, method=self.method.decode('ascii'),
+ self.start_time, name=servlet_name, method=self.get_method(),
)
self.site.access_logger.info(
"%s - %s - Received request: %s %s",
self.getClientIP(),
self.site.site_tag,
- self.method.decode('ascii'),
+ self.get_method(),
self.get_redacted_uri()
)
@@ -280,15 +295,15 @@ class SynapseRequest(Request):
int(usage.db_txn_count),
self.sentLength,
code,
- self.method,
+ self.get_method(),
self.get_redacted_uri(),
- self.clientproto,
+ self.clientproto.decode('ascii', errors='replace'),
user_agent,
usage.evt_db_fetch_count,
)
try:
- self.request_metrics.stop(self.finish_time, self)
+ self.request_metrics.stop(self.finish_time, self.code, self.sentLength)
except Exception as e:
logger.warn("Failed to stop metrics: %r", e)
@@ -308,7 +323,7 @@ class XForwardedForRequest(SynapseRequest):
C{b"-"}.
"""
return self.requestHeaders.getRawHeaders(
- b"x-forwarded-for", [b"-"])[0].split(b",")[0].strip()
+ b"x-forwarded-for", [b"-"])[0].split(b",")[0].strip().decode('ascii')
class SynapseRequestFactory(object):
diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py
index 167167be..17390829 100644
--- a/synapse/metrics/background_process_metrics.py
+++ b/synapse/metrics/background_process_metrics.py
@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
import threading
import six
@@ -23,6 +24,9 @@ from twisted.internet import defer
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+logger = logging.getLogger(__name__)
+
+
_background_process_start_count = Counter(
"synapse_background_process_start_count",
"Number of background processes started",
@@ -191,6 +195,8 @@ def run_as_background_process(desc, func, *args, **kwargs):
try:
yield func(*args, **kwargs)
+ except Exception:
+ logger.exception("Background process '%s' threw an exception", desc)
finally:
proc.update_metrics()
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 82f39148..340b16ce 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -24,13 +24,10 @@ from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError
from synapse.handlers.presence import format_user_presence_state
from synapse.metrics import LaterGauge
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import StreamToken
-from synapse.util.async_helpers import (
- DeferredTimeoutError,
- ObservableDeferred,
- add_timeout_to_deferred,
-)
-from synapse.util.logcontext import PreserveLoggingContext, run_in_background
+from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
+from synapse.util.logcontext import PreserveLoggingContext
from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
from synapse.visibility import filter_events_for_client
@@ -252,7 +249,10 @@ class Notifier(object):
def _on_new_room_event(self, event, room_stream_id, extra_users=[]):
"""Notify any user streams that are interested in this room event"""
# poke any interested application service.
- run_in_background(self._notify_app_services, room_stream_id)
+ run_as_background_process(
+ "notify_app_services",
+ self._notify_app_services, room_stream_id,
+ )
if self.federation_sender:
self.federation_sender.notify_new_events(room_stream_id)
@@ -337,7 +337,7 @@ class Notifier(object):
# Now we wait for the _NotifierUserStream to be told there
# is a new token.
listener = user_stream.new_listener(prev_token)
- add_timeout_to_deferred(
+ listener.deferred = timeout_deferred(
listener.deferred,
(end_time - now) / 1000.,
self.hs.get_reactor(),
@@ -354,7 +354,7 @@ class Notifier(object):
# Update the prev_token to the current_token since nothing
# has happened between the old prev_token and the current_token
prev_token = current_token
- except DeferredTimeoutError:
+ except defer.TimeoutError:
break
except defer.CancelledError:
break
@@ -559,15 +559,16 @@ class Notifier(object):
if end_time <= now:
break
- add_timeout_to_deferred(
- listener.deferred.addTimeout,
- (end_time - now) / 1000.,
- self.hs.get_reactor(),
+ listener.deferred = timeout_deferred(
+ listener.deferred,
+ timeout=(end_time - now) / 1000.,
+ reactor=self.hs.get_reactor(),
)
+
try:
with PreserveLoggingContext():
yield listener.deferred
- except DeferredTimeoutError:
+ except defer.TimeoutError:
break
except defer.CancelledError:
break
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index b78ce103..1a5a10d9 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -441,7 +441,7 @@ class Mailer(object):
def make_room_link(self, room_id):
if self.hs.config.email_riot_base_url:
- base_url = self.hs.config.email_riot_base_url
+ base_url = "%s/#/room" % (self.hs.config.email_riot_base_url)
elif self.app_name == "Vector":
# need /beta for Universal Links to work on iOS
base_url = "https://vector.im/beta/#/room"
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 0d8de600..d4d983b0 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -33,32 +33,38 @@ logger = logging.getLogger(__name__)
# [2] https://setuptools.readthedocs.io/en/latest/setuptools.html#declaring-dependencies
REQUIREMENTS = {
"jsonschema>=2.5.1": ["jsonschema>=2.5.1"],
- "frozendict>=0.4": ["frozendict"],
+ "frozendict>=1": ["frozendict"],
"unpaddedbase64>=1.1.0": ["unpaddedbase64>=1.1.0"],
"canonicaljson>=1.1.3": ["canonicaljson>=1.1.3"],
"signedjson>=1.0.0": ["signedjson>=1.0.0"],
"pynacl>=1.2.1": ["nacl>=1.2.1", "nacl.bindings"],
- "service_identity>=1.0.0": ["service_identity>=1.0.0"],
+ "service_identity>=16.0.0": ["service_identity>=16.0.0"],
"Twisted>=17.1.0": ["twisted>=17.1.0"],
"treq>=15.1": ["treq>=15.1"],
# Twisted has required pyopenssl 16.0 since about Twisted 16.6.
"pyopenssl>=16.0.0": ["OpenSSL>=16.0.0"],
- "pyyaml": ["yaml"],
- "pyasn1": ["pyasn1"],
- "daemonize": ["daemonize"],
- "bcrypt": ["bcrypt>=3.1.0"],
- "pillow": ["PIL"],
- "pydenticon": ["pydenticon"],
- "sortedcontainers": ["sortedcontainers"],
- "pysaml2>=3.0.0": ["saml2>=3.0.0"],
- "pymacaroons-pynacl": ["pymacaroons"],
+ "pyyaml>=3.11": ["yaml"],
+ "pyasn1>=0.1.9": ["pyasn1"],
+ "pyasn1-modules>=0.0.7": ["pyasn1_modules"],
+ "daemonize>=2.3.1": ["daemonize"],
+ "bcrypt>=3.1.0": ["bcrypt>=3.1.0"],
+ "pillow>=3.1.2": ["PIL"],
+ "pydenticon>=0.2": ["pydenticon"],
+ "sortedcontainers>=1.4.4": ["sortedcontainers"],
+ "pysaml2>=3.0.0": ["saml2"],
+ "pymacaroons-pynacl>=0.9.3": ["pymacaroons"],
"msgpack-python>=0.3.0": ["msgpack"],
"phonenumbers>=8.2.0": ["phonenumbers"],
- "six": ["six"],
- "prometheus_client": ["prometheus_client"],
- "attrs": ["attr"],
+ "six>=1.10": ["six"],
+
+ # prometheus_client 0.4.0 changed the format of counter metrics
+ # (cf https://github.com/matrix-org/synapse/issues/4001)
+ "prometheus_client>=0.0.18,<0.4.0": ["prometheus_client"],
+
+ # we use attr.s(slots), which arrived in 16.0.0
+ "attrs>=16.0.0": ["attr>=16.0.0"],
"netaddr>=0.7.18": ["netaddr"],
}
diff --git a/synapse/rest/media/v1/download_resource.py b/synapse/rest/media/v1/download_resource.py
index ca90964d..f911b120 100644
--- a/synapse/rest/media/v1/download_resource.py
+++ b/synapse/rest/media/v1/download_resource.py
@@ -52,6 +52,7 @@ class DownloadResource(Resource):
b" script-src 'none';"
b" plugin-types application/pdf;"
b" style-src 'unsafe-inline';"
+ b" media-src 'self';"
b" object-src 'self';"
)
server_name, media_id, name = parse_media_id(request)
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index cad2dec3..af01040a 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -79,7 +79,6 @@ class PreviewUrlResource(Resource):
# don't spider URLs more often than once an hour
expiry_ms=60 * 60 * 1000,
)
- self._cache.start()
self._cleaner_loop = self.clock.looping_call(
self._start_expire_url_cache_data, 10 * 1000,
diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py
index d7ae22a6..b22495c1 100644
--- a/synapse/state/__init__.py
+++ b/synapse/state/__init__.py
@@ -95,10 +95,6 @@ class StateHandler(object):
self.hs = hs
self._state_resolution_handler = hs.get_state_resolution_handler()
- def start_caching(self):
- # TODO: remove this shim
- self._state_resolution_handler.start_caching()
-
@defer.inlineCallbacks
def get_current_state(self, room_id, event_type=None, state_key="",
latest_event_ids=None):
@@ -428,9 +424,6 @@ class StateResolutionHandler(object):
self._state_cache = None
self.resolve_linearizer = Linearizer(name="state_resolve_lock")
- def start_caching(self):
- logger.debug("start_caching")
-
self._state_cache = ExpiringCache(
cache_name="state_cache",
clock=self.clock,
@@ -440,8 +433,6 @@ class StateResolutionHandler(object):
reset_expiry_on_get=True,
)
- self._state_cache.start()
-
@defer.inlineCallbacks
@log_function
def resolve_state_groups(
diff --git a/synapse/state/v1.py b/synapse/state/v1.py
index c95477d3..7a7157b3 100644
--- a/synapse/state/v1.py
+++ b/synapse/state/v1.py
@@ -65,10 +65,15 @@ def resolve_events_with_factory(state_sets, event_map, state_map_factory):
for event_ids in itervalues(conflicted_state)
for event_id in event_ids
)
+ needed_event_count = len(needed_events)
if event_map is not None:
needed_events -= set(iterkeys(event_map))
- logger.info("Asking for %d conflicted events", len(needed_events))
+ logger.info(
+ "Asking for %d/%d conflicted events",
+ len(needed_events),
+ needed_event_count,
+ )
# dict[str, FrozenEvent]: a map from state event id to event. Only includes
# the state events which are in conflict (and those in event_map)
@@ -85,11 +90,16 @@ def resolve_events_with_factory(state_sets, event_map, state_map_factory):
)
new_needed_events = set(itervalues(auth_events))
+ new_needed_event_count = len(new_needed_events)
new_needed_events -= needed_events
if event_map is not None:
new_needed_events -= set(iterkeys(event_map))
- logger.info("Asking for %d auth events", len(new_needed_events))
+ logger.info(
+ "Asking for %d/%d auth events",
+ len(new_needed_events),
+ new_needed_event_count,
+ )
state_map_new = yield state_map_factory(new_needed_events)
state_map.update(state_map_new)
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index 8fc678fa..9ad17b7c 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -119,21 +119,25 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
for entry in iteritems(to_update):
(user_id, access_token, ip), (user_agent, device_id, last_seen) = entry
- self._simple_upsert_txn(
- txn,
- table="user_ips",
- keyvalues={
- "user_id": user_id,
- "access_token": access_token,
- "ip": ip,
- "user_agent": user_agent,
- "device_id": device_id,
- },
- values={
- "last_seen": last_seen,
- },
- lock=False,
- )
+ try:
+ self._simple_upsert_txn(
+ txn,
+ table="user_ips",
+ keyvalues={
+ "user_id": user_id,
+ "access_token": access_token,
+ "ip": ip,
+ "user_agent": user_agent,
+ "device_id": device_id,
+ },
+ values={
+ "last_seen": last_seen,
+ },
+ lock=False,
+ )
+ except Exception as e:
+ # Failed to upsert, log and continue
+ logger.error("Failed to insert client IP %r: %r", entry, e)
@defer.inlineCallbacks
def get_last_client_ip_by_device(self, user_id, device_id):
diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py
index 80819423..cfb687cb 100644
--- a/synapse/storage/directory.py
+++ b/synapse/storage/directory.py
@@ -75,7 +75,6 @@ class DirectoryWorkerStore(SQLBaseStore):
},
retcol="creator",
desc="get_room_alias_creator",
- allow_none=True
)
@cached(max_entries=5000)
diff --git a/synapse/storage/monthly_active_users.py b/synapse/storage/monthly_active_users.py
index 59580949..0fe8c8e2 100644
--- a/synapse/storage/monthly_active_users.py
+++ b/synapse/storage/monthly_active_users.py
@@ -172,6 +172,10 @@ class MonthlyActiveUsersStore(SQLBaseStore):
Deferred[bool]: True if a new entry was created, False if an
existing one was updated.
"""
+ # Am consciously deciding to lock the table on the basis that is ought
+ # never be a big table and alternative approaches (batching multiple
+ # upserts into a single txn) introduced a lot of extra complexity.
+ # See https://github.com/matrix-org/synapse/issues/3854 for more
is_insert = yield self._simple_upsert(
desc="upsert_monthly_active_user",
table="monthly_active_users",
@@ -181,7 +185,6 @@ class MonthlyActiveUsersStore(SQLBaseStore):
values={
"timestamp": int(self._clock.time_msec()),
},
- lock=False,
)
if is_insert:
self.user_last_seen_monthly_active.invalidate((user_id,))
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 4b971efd..3f4cbd61 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -255,7 +255,17 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
)
@defer.inlineCallbacks
- def get_state_groups_ids(self, room_id, event_ids):
+ def get_state_groups_ids(self, _room_id, event_ids):
+ """Get the event IDs of all the state for the state groups for the given events
+
+ Args:
+ _room_id (str): id of the room for these events
+ event_ids (iterable[str]): ids of the events
+
+ Returns:
+ Deferred[dict[int, dict[tuple[str, str], str]]]:
+ dict of state_group_id -> (dict of (type, state_key) -> event id)
+ """
if not event_ids:
defer.returnValue({})
@@ -270,7 +280,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
@defer.inlineCallbacks
def get_state_ids_for_group(self, state_group):
- """Get the state IDs for the given state group
+ """Get the event IDs of all the state in the given state group
Args:
state_group (int)
@@ -286,7 +296,9 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
def get_state_groups(self, room_id, event_ids):
""" Get the state groups for the given list of event_ids
- The return value is a dict mapping group names to lists of events.
+ Returns:
+ Deferred[dict[int, list[EventBase]]]:
+ dict of state_group_id -> list of state events.
"""
if not event_ids:
defer.returnValue({})
@@ -324,7 +336,9 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
member events (if True), or to exclude member events (if False)
Returns:
- dictionary state_group -> (dict of (type, state_key) -> event id)
+ Returns:
+ Deferred[dict[int, dict[tuple[str, str], str]]]:
+ dict of state_group_id -> (dict of (type, state_key) -> event id)
"""
results = {}
@@ -732,8 +746,8 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
If None, `types` filtering is applied to all events.
Returns:
- Deferred[dict[int, dict[(type, state_key), EventBase]]]
- a dictionary mapping from state group to state dictionary.
+ Deferred[dict[int, dict[tuple[str, str], str]]]:
+ dict of state_group_id -> (dict of (type, state_key) -> event id)
"""
if types is not None:
non_member_types = [t for t in types if t[0] != EventTypes.Member]
@@ -788,8 +802,8 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
If None, `types` filtering is applied to all events.
Returns:
- Deferred[dict[int, dict[(type, state_key), EventBase]]]
- a dictionary mapping from state group to state dictionary.
+ Deferred[dict[int, dict[tuple[str, str], str]]]:
+ dict of state_group_id -> (dict of (type, state_key) -> event id)
"""
if types:
types = frozenset(types)
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index 0c42bd33..a3032cdc 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -23,7 +23,7 @@ from canonicaljson import encode_canonical_json
from twisted.internet import defer
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.util.caches.descriptors import cached
+from synapse.util.caches.expiringcache import ExpiringCache
from ._base import SQLBaseStore, db_to_json
@@ -50,6 +50,8 @@ _UpdateTransactionRow = namedtuple(
)
)
+SENTINEL = object()
+
class TransactionStore(SQLBaseStore):
"""A collection of queries for handling PDUs.
@@ -60,6 +62,12 @@ class TransactionStore(SQLBaseStore):
self._clock.looping_call(self._start_cleanup_transactions, 30 * 60 * 1000)
+ self._destination_retry_cache = ExpiringCache(
+ cache_name="get_destination_retry_timings",
+ clock=self._clock,
+ expiry_ms=5 * 60 * 1000,
+ )
+
def get_received_txn_response(self, transaction_id, origin):
"""For an incoming transaction from a given origin, check if we have
already responded to it. If so, return the response code and response
@@ -156,7 +164,7 @@ class TransactionStore(SQLBaseStore):
"""
pass
- @cached(max_entries=10000)
+ @defer.inlineCallbacks
def get_destination_retry_timings(self, destination):
"""Gets the current retry timings (if any) for a given destination.
@@ -167,10 +175,20 @@ class TransactionStore(SQLBaseStore):
None if not retrying
Otherwise a dict for the retry scheme
"""
- return self.runInteraction(
+
+ result = self._destination_retry_cache.get(destination, SENTINEL)
+ if result is not SENTINEL:
+ defer.returnValue(result)
+
+ result = yield self.runInteraction(
"get_destination_retry_timings",
self._get_destination_retry_timings, destination)
+ # We don't hugely care about race conditions between getting and
+ # invalidating the cache, since we time out fairly quickly anyway.
+ self._destination_retry_cache[destination] = result
+ defer.returnValue(result)
+
def _get_destination_retry_timings(self, txn, destination):
result = self._simple_select_one_txn(
txn,
@@ -198,8 +216,7 @@ class TransactionStore(SQLBaseStore):
retry_interval (int) - how long until next retry in ms
"""
- # XXX: we could chose to not bother persisting this if our cache thinks
- # this is a NOOP
+ self._destination_retry_cache.pop(destination, None)
return self.runInteraction(
"set_destination_retry_timings",
self._set_destination_retry_timings,
@@ -212,10 +229,6 @@ class TransactionStore(SQLBaseStore):
retry_last_ts, retry_interval):
self.database_engine.lock_table(txn, "destinations")
- self._invalidate_cache_and_stream(
- txn, self.get_destination_retry_timings, (destination,)
- )
-
# We need to be careful here as the data may have changed from under us
# due to a worker setting the timings.
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index 40c29461..ec7b2c96 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -374,29 +374,25 @@ class ReadWriteLock(object):
defer.returnValue(_ctx_manager())
-class DeferredTimeoutError(Exception):
- """
- This error is raised by default when a L{Deferred} times out.
- """
-
+def _cancelled_to_timed_out_error(value, timeout):
+ if isinstance(value, failure.Failure):
+ value.trap(CancelledError)
+ raise defer.TimeoutError(timeout, "Deferred")
+ return value
-def add_timeout_to_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
- """
- Add a timeout to a deferred by scheduling it to be cancelled after
- timeout seconds.
- This is essentially a backport of deferred.addTimeout, which was introduced
- in twisted 16.5.
+def timeout_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
+ """The in built twisted `Deferred.addTimeout` fails to time out deferreds
+ that have a canceller that throws exceptions. This method creates a new
+ deferred that wraps and times out the given deferred, correctly handling
+ the case where the given deferred's canceller throws.
- If the deferred gets timed out, it errbacks with a DeferredTimeoutError,
- unless a cancelable function was passed to its initialization or unless
- a different on_timeout_cancel callable is provided.
+ NOTE: Unlike `Deferred.addTimeout`, this function returns a new deferred
Args:
- deferred (defer.Deferred): deferred to be timed out
- timeout (Number): seconds to time out after
- reactor (twisted.internet.reactor): the Twisted reactor to use
-
+ deferred (Deferred)
+ timeout (float): Timeout in seconds
+ reactor (twisted.internet.reactor): The twisted reactor to use
on_timeout_cancel (callable): A callable which is called immediately
after the deferred times out, and not if this deferred is
otherwise cancelled before the timeout.
@@ -406,48 +402,10 @@ def add_timeout_to_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
the timeout.
The default callable (if none is provided) will translate a
- CancelledError Failure into a DeferredTimeoutError.
- """
- timed_out = [False]
-
- def time_it_out():
- timed_out[0] = True
- deferred.cancel()
-
- delayed_call = reactor.callLater(timeout, time_it_out)
-
- def convert_cancelled(value):
- if timed_out[0]:
- to_call = on_timeout_cancel or _cancelled_to_timed_out_error
- return to_call(value, timeout)
- return value
-
- deferred.addBoth(convert_cancelled)
+ CancelledError Failure into a defer.TimeoutError.
- def cancel_timeout(result):
- # stop the pending call to cancel the deferred if it's been fired
- if delayed_call.active():
- delayed_call.cancel()
- return result
-
- deferred.addBoth(cancel_timeout)
-
-
-def _cancelled_to_timed_out_error(value, timeout):
- if isinstance(value, failure.Failure):
- value.trap(CancelledError)
- raise DeferredTimeoutError(timeout, "Deferred")
- return value
-
-
-def timeout_no_seriously(deferred, timeout, reactor):
- """The in build twisted deferred addTimeout (and the method above)
- completely fail to time things out under some unknown circumstances.
-
- Lets try a different way of timing things out and maybe that will make
- things work?!
-
- TODO: Kill this with fire.
+ Returns:
+ Deferred
"""
new_d = defer.Deferred()
@@ -457,16 +415,20 @@ def timeout_no_seriously(deferred, timeout, reactor):
def time_it_out():
timed_out[0] = True
- if not new_d.called:
- new_d.errback(DeferredTimeoutError(timeout, "Deferred"))
+ try:
+ deferred.cancel()
+ except: # noqa: E722, if we throw any exception it'll break time outs
+ logger.exception("Canceller failed during timeout")
- deferred.cancel()
+ if not new_d.called:
+ new_d.errback(defer.TimeoutError(timeout, "Deferred"))
delayed_call = reactor.callLater(timeout, time_it_out)
def convert_cancelled(value):
if timed_out[0]:
- return _cancelled_to_timed_out_error(value, timeout)
+ to_call = on_timeout_cancel or _cancelled_to_timed_out_error
+ return to_call(value, timeout)
return value
deferred.addBoth(convert_cancelled)
diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py
index 7b065b19..f37d5bec 100644
--- a/synapse/util/caches/__init__.py
+++ b/synapse/util/caches/__init__.py
@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
import os
import six
@@ -20,6 +21,8 @@ from six.moves import intern
from prometheus_client.core import REGISTRY, Gauge, GaugeMetricFamily
+logger = logging.getLogger(__name__)
+
CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.5))
@@ -76,16 +79,20 @@ def register_cache(cache_type, cache_name, cache):
return []
def collect(self):
- if cache_type == "response_cache":
- response_cache_size.labels(cache_name).set(len(cache))
- response_cache_hits.labels(cache_name).set(self.hits)
- response_cache_evicted.labels(cache_name).set(self.evicted_size)
- response_cache_total.labels(cache_name).set(self.hits + self.misses)
- else:
- cache_size.labels(cache_name).set(len(cache))
- cache_hits.labels(cache_name).set(self.hits)
- cache_evicted.labels(cache_name).set(self.evicted_size)
- cache_total.labels(cache_name).set(self.hits + self.misses)
+ try:
+ if cache_type == "response_cache":
+ response_cache_size.labels(cache_name).set(len(cache))
+ response_cache_hits.labels(cache_name).set(self.hits)
+ response_cache_evicted.labels(cache_name).set(self.evicted_size)
+ response_cache_total.labels(cache_name).set(self.hits + self.misses)
+ else:
+ cache_size.labels(cache_name).set(len(cache))
+ cache_hits.labels(cache_name).set(self.hits)
+ cache_evicted.labels(cache_name).set(self.evicted_size)
+ cache_total.labels(cache_name).set(self.hits + self.misses)
+ except Exception as e:
+ logger.warn("Error calculating metrics for %s: %s", cache_name, e)
+ raise
yield GaugeMetricFamily("__unused", "")
diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py
index ce85b2ae..f3697802 100644
--- a/synapse/util/caches/expiringcache.py
+++ b/synapse/util/caches/expiringcache.py
@@ -16,12 +16,17 @@
import logging
from collections import OrderedDict
+from six import iteritems, itervalues
+
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.caches import register_cache
logger = logging.getLogger(__name__)
+SENTINEL = object()
+
+
class ExpiringCache(object):
def __init__(self, cache_name, clock, max_len=0, expiry_ms=0,
reset_expiry_on_get=False, iterable=False):
@@ -54,11 +59,8 @@ class ExpiringCache(object):
self.iterable = iterable
- self._size_estimate = 0
-
self.metrics = register_cache("expiring", cache_name, self)
- def start(self):
if not self._expiry_ms:
# Don't bother starting the loop if things never expire
return
@@ -75,16 +77,11 @@ class ExpiringCache(object):
now = self._clock.time_msec()
self._cache[key] = _CacheEntry(now, value)
- if self.iterable:
- self._size_estimate += len(value)
-
# Evict if there are now too many items
while self._max_len and len(self) > self._max_len:
_key, value = self._cache.popitem(last=False)
if self.iterable:
- removed_len = len(value.value)
- self.metrics.inc_evictions(removed_len)
- self._size_estimate -= removed_len
+ self.metrics.inc_evictions(len(value.value))
else:
self.metrics.inc_evictions()
@@ -101,6 +98,21 @@ class ExpiringCache(object):
return entry.value
+ def pop(self, key, default=SENTINEL):
+ """Removes and returns the value with the given key from the cache.
+
+ If the key isn't in the cache then `default` will be returned if
+ specified, otherwise `KeyError` will get raised.
+
+ Identical functionality to `dict.pop(..)`.
+ """
+
+ value = self._cache.pop(key, default)
+ if value is SENTINEL:
+ raise KeyError(key)
+
+ return value
+
def __contains__(self, key):
return key in self._cache
@@ -128,14 +140,16 @@ class ExpiringCache(object):
keys_to_delete = set()
- for key, cache_entry in self._cache.items():
+ for key, cache_entry in iteritems(self._cache):
if now - cache_entry.time > self._expiry_ms:
keys_to_delete.add(key)
for k in keys_to_delete:
value = self._cache.pop(k)
if self.iterable:
- self._size_estimate -= len(value.value)
+ self.metrics.inc_evictions(len(value.value))
+ else:
+ self.metrics.inc_evictions()
logger.debug(
"[%s] _prune_cache before: %d, after len: %d",
@@ -144,12 +158,14 @@ class ExpiringCache(object):
def __len__(self):
if self.iterable:
- return self._size_estimate
+ return sum(len(entry.value) for entry in itervalues(self._cache))
else:
return len(self._cache)
class _CacheEntry(object):
+ __slots__ = ["time", "value"]
+
def __init__(self, time, value):
self.time = time
self.value = value
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index a0c2d376..89224b26 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -200,7 +200,7 @@ class LoggingContext(object):
sentinel = Sentinel()
- def __init__(self, name=None, parent_context=None):
+ def __init__(self, name=None, parent_context=None, request=None):
self.previous_context = LoggingContext.current_context()
self.name = name
@@ -218,6 +218,13 @@ class LoggingContext(object):
self.parent_context = parent_context
+ if self.parent_context is not None:
+ self.parent_context.copy_to(self)
+
+ if request is not None:
+ # the request param overrides the request from the parent context
+ self.request = request
+
def __str__(self):
return "%s@%x" % (self.name, id(self))
@@ -256,9 +263,6 @@ class LoggingContext(object):
)
self.alive = True
- if self.parent_context is not None:
- self.parent_context.copy_to(self)
-
return self
def __exit__(self, type, value, traceback):
@@ -439,6 +443,35 @@ class PreserveLoggingContext(object):
)
+def nested_logging_context(suffix, parent_context=None):
+ """Creates a new logging context as a child of another.
+
+ The nested logging context will have a 'request' made up of the parent context's
+ request, plus the given suffix.
+
+ CPU/db usage stats will be added to the parent context's on exit.
+
+ Normal usage looks like:
+
+ with nested_logging_context(suffix):
+ # ... do stuff
+
+ Args:
+ suffix (str): suffix to add to the parent context's 'request'.
+ parent_context (LoggingContext|None): parent context. Will use the current context
+ if None.
+
+ Returns:
+ LoggingContext: new logging context.
+ """
+ if parent_context is None:
+ parent_context = LoggingContext.current_context()
+ return LoggingContext(
+ parent_context=parent_context,
+ request=parent_context.request + "-" + suffix,
+ )
+
+
def preserve_fn(f):
"""Function decorator which wraps the function with run_in_background"""
def g(*args, **kwargs):
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index 8a3a06fd..26cce7d1 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -188,7 +188,7 @@ class RetryDestinationLimiter(object):
else:
self.retry_interval = self.min_retry_interval
- logger.debug(
+ logger.info(
"Connection to %s was unsuccessful (%s(%s)); backoff now %i",
self.destination, exc_type, exc_val, self.retry_interval
)
diff --git a/synctl b/synctl
index 1bdceda2..356e5cb6 120000..100755
--- a/synctl
+++ b/synctl
@@ -1 +1,294 @@
-./synapse/app/synctl.py \ No newline at end of file
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Copyright 2014-2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import argparse
+import collections
+import errno
+import glob
+import os
+import os.path
+import signal
+import subprocess
+import sys
+import time
+
+from six import iteritems
+
+import yaml
+
+SYNAPSE = [sys.executable, "-B", "-m", "synapse.app.homeserver"]
+
+GREEN = "\x1b[1;32m"
+YELLOW = "\x1b[1;33m"
+RED = "\x1b[1;31m"
+NORMAL = "\x1b[m"
+
+
+def pid_running(pid):
+ try:
+ os.kill(pid, 0)
+ return True
+ except OSError as err:
+ if err.errno == errno.EPERM:
+ return True
+ return False
+
+
+def write(message, colour=NORMAL, stream=sys.stdout):
+ if colour == NORMAL:
+ stream.write(message + "\n")
+ else:
+ stream.write(colour + message + NORMAL + "\n")
+
+
+def abort(message, colour=RED, stream=sys.stderr):
+ write(message, colour, stream)
+ sys.exit(1)
+
+
+def start(configfile):
+ write("Starting ...")
+ args = SYNAPSE
+ args.extend(["--daemonize", "-c", configfile])
+
+ try:
+ subprocess.check_call(args)
+ write("started synapse.app.homeserver(%r)" %
+ (configfile,), colour=GREEN)
+ except subprocess.CalledProcessError as e:
+ write(
+ "error starting (exit code: %d); see above for logs" % e.returncode,
+ colour=RED,
+ )
+
+
+def start_worker(app, configfile, worker_configfile):
+ args = [
+ "python", "-B",
+ "-m", app,
+ "-c", configfile,
+ "-c", worker_configfile
+ ]
+
+ try:
+ subprocess.check_call(args)
+ write("started %s(%r)" % (app, worker_configfile), colour=GREEN)
+ except subprocess.CalledProcessError as e:
+ write(
+ "error starting %s(%r) (exit code: %d); see above for logs" % (
+ app, worker_configfile, e.returncode,
+ ),
+ colour=RED,
+ )
+
+
+def stop(pidfile, app):
+ if os.path.exists(pidfile):
+ pid = int(open(pidfile).read())
+ try:
+ os.kill(pid, signal.SIGTERM)
+ write("stopped %s" % (app,), colour=GREEN)
+ except OSError as err:
+ if err.errno == errno.ESRCH:
+ write("%s not running" % (app,), colour=YELLOW)
+ elif err.errno == errno.EPERM:
+ abort("Cannot stop %s: Operation not permitted" % (app,))
+ else:
+ abort("Cannot stop %s: Unknown error" % (app,))
+
+
+Worker = collections.namedtuple("Worker", [
+ "app", "configfile", "pidfile", "cache_factor", "cache_factors",
+])
+
+
+def main():
+
+ parser = argparse.ArgumentParser()
+
+ parser.add_argument(
+ "action",
+ choices=["start", "stop", "restart"],
+ help="whether to start, stop or restart the synapse",
+ )
+ parser.add_argument(
+ "configfile",
+ nargs="?",
+ default="homeserver.yaml",
+ help="the homeserver config file, defaults to homeserver.yaml",
+ )
+ parser.add_argument(
+ "-w", "--worker",
+ metavar="WORKERCONFIG",
+ help="start or stop a single worker",
+ )
+ parser.add_argument(
+ "-a", "--all-processes",
+ metavar="WORKERCONFIGDIR",
+ help="start or stop all the workers in the given directory"
+ " and the main synapse process",
+ )
+
+ options = parser.parse_args()
+
+ if options.worker and options.all_processes:
+ write(
+ 'Cannot use "--worker" with "--all-processes"',
+ stream=sys.stderr
+ )
+ sys.exit(1)
+
+ configfile = options.configfile
+
+ if not os.path.exists(configfile):
+ write(
+ "No config file found\n"
+ "To generate a config file, run '%s -c %s --generate-config"
+ " --server-name=<server name>'\n" % (
+ " ".join(SYNAPSE), options.configfile
+ ),
+ stream=sys.stderr,
+ )
+ sys.exit(1)
+
+ with open(configfile) as stream:
+ config = yaml.load(stream)
+
+ pidfile = config["pid_file"]
+ cache_factor = config.get("synctl_cache_factor")
+ start_stop_synapse = True
+
+ if cache_factor:
+ os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
+
+ cache_factors = config.get("synctl_cache_factors", {})
+ for cache_name, factor in iteritems(cache_factors):
+ os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
+
+ worker_configfiles = []
+ if options.worker:
+ start_stop_synapse = False
+ worker_configfile = options.worker
+ if not os.path.exists(worker_configfile):
+ write(
+ "No worker config found at %r" % (worker_configfile,),
+ stream=sys.stderr,
+ )
+ sys.exit(1)
+ worker_configfiles.append(worker_configfile)
+
+ if options.all_processes:
+ # To start the main synapse with -a you need to add a worker file
+ # with worker_app == "synapse.app.homeserver"
+ start_stop_synapse = False
+ worker_configdir = options.all_processes
+ if not os.path.isdir(worker_configdir):
+ write(
+ "No worker config directory found at %r" % (worker_configdir,),
+ stream=sys.stderr,
+ )
+ sys.exit(1)
+ worker_configfiles.extend(sorted(glob.glob(
+ os.path.join(worker_configdir, "*.yaml")
+ )))
+
+ workers = []
+ for worker_configfile in worker_configfiles:
+ with open(worker_configfile) as stream:
+ worker_config = yaml.load(stream)
+ worker_app = worker_config["worker_app"]
+ if worker_app == "synapse.app.homeserver":
+ # We need to special case all of this to pick up options that may
+ # be set in the main config file or in this worker config file.
+ worker_pidfile = (
+ worker_config.get("pid_file")
+ or pidfile
+ )
+ worker_cache_factor = worker_config.get("synctl_cache_factor") or cache_factor
+ worker_cache_factors = (
+ worker_config.get("synctl_cache_factors")
+ or cache_factors
+ )
+ daemonize = worker_config.get("daemonize") or config.get("daemonize")
+ assert daemonize, "Main process must have daemonize set to true"
+
+ # The master process doesn't support using worker_* config.
+ for key in worker_config:
+ if key == "worker_app": # But we allow worker_app
+ continue
+ assert not key.startswith("worker_"), \
+ "Main process cannot use worker_* config"
+ else:
+ worker_pidfile = worker_config["worker_pid_file"]
+ worker_daemonize = worker_config["worker_daemonize"]
+ assert worker_daemonize, "In config %r: expected '%s' to be True" % (
+ worker_configfile, "worker_daemonize")
+ worker_cache_factor = worker_config.get("synctl_cache_factor")
+ worker_cache_factors = worker_config.get("synctl_cache_factors", {})
+ workers.append(Worker(
+ worker_app, worker_configfile, worker_pidfile, worker_cache_factor,
+ worker_cache_factors,
+ ))
+
+ action = options.action
+
+ if action == "stop" or action == "restart":
+ for worker in workers:
+ stop(worker.pidfile, worker.app)
+
+ if start_stop_synapse:
+ stop(pidfile, "synapse.app.homeserver")
+
+ # Wait for synapse to actually shutdown before starting it again
+ if action == "restart":
+ running_pids = []
+ if start_stop_synapse and os.path.exists(pidfile):
+ running_pids.append(int(open(pidfile).read()))
+ for worker in workers:
+ if os.path.exists(worker.pidfile):
+ running_pids.append(int(open(worker.pidfile).read()))
+ if len(running_pids) > 0:
+ write("Waiting for process to exit before restarting...")
+ for running_pid in running_pids:
+ while pid_running(running_pid):
+ time.sleep(0.2)
+ write("All processes exited; now restarting...")
+
+ if action == "start" or action == "restart":
+ if start_stop_synapse:
+ # Check if synapse is already running
+ if os.path.exists(pidfile) and pid_running(int(open(pidfile).read())):
+ abort("synapse.app.homeserver already running")
+ start(configfile)
+
+ for worker in workers:
+ env = os.environ.copy()
+
+ if worker.cache_factor:
+ os.environ["SYNAPSE_CACHE_FACTOR"] = str(worker.cache_factor)
+
+ for cache_name, factor in worker.cache_factors.iteritems():
+ os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
+
+ start_worker(worker.app, configfile, worker.configfile)
+
+ # Reset env back to the original
+ os.environ.clear()
+ os.environ.update(env)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/test_postgresql.sh b/test_postgresql.sh
new file mode 100755
index 00000000..1ffcaabd
--- /dev/null
+++ b/test_postgresql.sh
@@ -0,0 +1,12 @@
+#!/bin/bash
+
+# This script builds the Docker image to run the PostgreSQL tests, and then runs
+# the tests.
+
+set -e
+
+# Build, and tag
+docker build docker/ -f docker/Dockerfile-pgtests -t synapsepgtests
+
+# Run, mounting the current directory into /src
+docker run --rm -it -v $(pwd)\:/src synapsepgtests
diff --git a/tests/http/test_fedclient.py b/tests/http/test_fedclient.py
index d45d2bcb..f3cb1423 100644
--- a/tests/http/test_fedclient.py
+++ b/tests/http/test_fedclient.py
@@ -18,9 +18,14 @@ from mock import Mock
from twisted.internet.defer import TimeoutError
from twisted.internet.error import ConnectingCancelledError, DNSLookupError
from twisted.web.client import ResponseNeverReceived
+from twisted.web.http import HTTPChannel
-from synapse.http.matrixfederationclient import MatrixFederationHttpClient
+from synapse.http.matrixfederationclient import (
+ MatrixFederationHttpClient,
+ MatrixFederationRequest,
+)
+from tests.server import FakeTransport
from tests.unittest import HomeserverTestCase
@@ -40,7 +45,7 @@ class FederationClientTests(HomeserverTestCase):
"""
If the DNS raising returns an error, it will bubble up.
"""
- d = self.cl._request("testserv2:8008", "GET", "foo/bar", timeout=10000)
+ d = self.cl.get_json("testserv2:8008", "foo/bar", timeout=10000)
self.pump()
f = self.failureResultOf(d)
@@ -51,7 +56,7 @@ class FederationClientTests(HomeserverTestCase):
If the HTTP request is not connected and is timed out, it'll give a
ConnectingCancelledError or TimeoutError.
"""
- d = self.cl._request("testserv:8008", "GET", "foo/bar", timeout=10000)
+ d = self.cl.get_json("testserv:8008", "foo/bar", timeout=10000)
self.pump()
@@ -78,7 +83,7 @@ class FederationClientTests(HomeserverTestCase):
If the HTTP request is connected, but gets no response before being
timed out, it'll give a ResponseNeverReceived.
"""
- d = self.cl._request("testserv:8008", "GET", "foo/bar", timeout=10000)
+ d = self.cl.get_json("testserv:8008", "foo/bar", timeout=10000)
self.pump()
@@ -108,7 +113,12 @@ class FederationClientTests(HomeserverTestCase):
"""
Once the client gets the headers, _request returns successfully.
"""
- d = self.cl._request("testserv:8008", "GET", "foo/bar", timeout=10000)
+ request = MatrixFederationRequest(
+ method="GET",
+ destination="testserv:8008",
+ path="foo/bar",
+ )
+ d = self.cl._send_request(request, timeout=10000)
self.pump()
@@ -155,3 +165,26 @@ class FederationClientTests(HomeserverTestCase):
f = self.failureResultOf(d)
self.assertIsInstance(f.value, TimeoutError)
+
+ def test_client_sends_body(self):
+ self.cl.post_json(
+ "testserv:8008", "foo/bar", timeout=10000,
+ data={"a": "b"}
+ )
+
+ self.pump()
+
+ clients = self.reactor.tcpClients
+ self.assertEqual(len(clients), 1)
+ client = clients[0][2].buildProtocol(None)
+ server = HTTPChannel()
+
+ client.makeConnection(FakeTransport(server, self.reactor))
+ server.makeConnection(FakeTransport(client, self.reactor))
+
+ self.pump(0.1)
+
+ self.assertEqual(len(server.requests), 1)
+ request = server.requests[0]
+ content = request.content.read()
+ self.assertEqual(content, b'{"a":"b"}')
diff --git a/tests/replication/slave/storage/_base.py b/tests/replication/slave/storage/_base.py
index 089cecfb..9e9fbbfe 100644
--- a/tests/replication/slave/storage/_base.py
+++ b/tests/replication/slave/storage/_base.py
@@ -15,8 +15,6 @@
from mock import Mock, NonCallableMock
-import attr
-
from synapse.replication.tcp.client import (
ReplicationClientFactory,
ReplicationClientHandler,
@@ -24,6 +22,7 @@ from synapse.replication.tcp.client import (
from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
from tests import unittest
+from tests.server import FakeTransport
class BaseSlavedStoreTestCase(unittest.HomeserverTestCase):
@@ -56,36 +55,8 @@ class BaseSlavedStoreTestCase(unittest.HomeserverTestCase):
server = server_factory.buildProtocol(None)
client = client_factory.buildProtocol(None)
- @attr.s
- class FakeTransport(object):
-
- other = attr.ib()
- disconnecting = False
- buffer = attr.ib(default=b'')
-
- def registerProducer(self, producer, streaming):
-
- self.producer = producer
-
- def _produce():
- self.producer.resumeProducing()
- reactor.callLater(0.1, _produce)
-
- reactor.callLater(0.0, _produce)
-
- def write(self, byt):
- self.buffer = self.buffer + byt
-
- if getattr(self.other, "transport") is not None:
- self.other.dataReceived(self.buffer)
- self.buffer = b""
-
- def writeSequence(self, seq):
- for x in seq:
- self.write(x)
-
- client.makeConnection(FakeTransport(server))
- server.makeConnection(FakeTransport(client))
+ client.makeConnection(FakeTransport(server, reactor))
+ server.makeConnection(FakeTransport(client, reactor))
def replicate(self):
"""Tell the master side of replication that something has happened, and then
diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py
index db44d33c..41be5d5a 100644
--- a/tests/replication/slave/storage/test_events.py
+++ b/tests/replication/slave/storage/test_events.py
@@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from canonicaljson import encode_canonical_json
+
from synapse.events import FrozenEvent, _EventInternalMetadata
from synapse.events.snapshot import EventContext
from synapse.replication.slave.storage.events import SlavedEventStore
@@ -26,7 +28,9 @@ ROOM_ID = "!room:blue"
def dict_equals(self, other):
- return self.__dict__ == other.__dict__
+ me = encode_canonical_json(self._event_dict)
+ them = encode_canonical_json(other._event_dict)
+ return me == them
def patch__eq__(cls):
diff --git a/tests/server.py b/tests/server.py
index 420ec4e0..7bee58df 100644
--- a/tests/server.py
+++ b/tests/server.py
@@ -98,7 +98,7 @@ class FakeSite:
return FakeLogger()
-def make_request(method, path, content=b"", access_token=None):
+def make_request(method, path, content=b"", access_token=None, request=SynapseRequest):
"""
Make a web request using the given method and path, feed it the
content, and return the Request and the Channel underneath.
@@ -120,14 +120,16 @@ def make_request(method, path, content=b"", access_token=None):
site = FakeSite()
channel = FakeChannel()
- req = SynapseRequest(site, channel)
+ req = request(site, channel)
req.process = lambda: b""
req.content = BytesIO(content)
if access_token:
req.requestHeaders.addRawHeader(b"Authorization", b"Bearer " + access_token)
- req.requestHeaders.addRawHeader(b"X-Forwarded-For", b"127.0.0.1")
+ if content:
+ req.requestHeaders.addRawHeader(b"Content-Type", b"application/json")
+
req.requestReceived(method, path, b"1.1")
return req, channel
@@ -280,3 +282,84 @@ def get_clock():
clock = ThreadedMemoryReactorClock()
hs_clock = Clock(clock)
return (clock, hs_clock)
+
+
+@attr.s
+class FakeTransport(object):
+ """
+ A twisted.internet.interfaces.ITransport implementation which sends all its data
+ straight into an IProtocol object: it exists to connect two IProtocols together.
+
+ To use it, instantiate it with the receiving IProtocol, and then pass it to the
+ sending IProtocol's makeConnection method:
+
+ server = HTTPChannel()
+ client.makeConnection(FakeTransport(server, self.reactor))
+
+ If you want bidirectional communication, you'll need two instances.
+ """
+
+ other = attr.ib()
+ """The Protocol object which will receive any data written to this transport.
+
+ :type: twisted.internet.interfaces.IProtocol
+ """
+
+ _reactor = attr.ib()
+ """Test reactor
+
+ :type: twisted.internet.interfaces.IReactorTime
+ """
+
+ disconnecting = False
+ buffer = attr.ib(default=b'')
+ producer = attr.ib(default=None)
+
+ def getPeer(self):
+ return None
+
+ def getHost(self):
+ return None
+
+ def loseConnection(self):
+ self.disconnecting = True
+
+ def abortConnection(self):
+ self.disconnecting = True
+
+ def pauseProducing(self):
+ self.producer.pauseProducing()
+
+ def unregisterProducer(self):
+ if not self.producer:
+ return
+
+ self.producer = None
+
+ def registerProducer(self, producer, streaming):
+ self.producer = producer
+ self.producerStreaming = streaming
+
+ def _produce():
+ d = self.producer.resumeProducing()
+ d.addCallback(lambda x: self._reactor.callLater(0.1, _produce))
+
+ if not streaming:
+ self._reactor.callLater(0.0, _produce)
+
+ def write(self, byt):
+ self.buffer = self.buffer + byt
+
+ def _write():
+ if getattr(self.other, "transport") is not None:
+ self.other.dataReceived(self.buffer)
+ self.buffer = b""
+ return
+
+ self._reactor.callLater(0.0, _write)
+
+ _write()
+
+ def writeSequence(self, seq):
+ for x in seq:
+ self.write(x)
diff --git a/tests/server_notices/test_consent.py b/tests/server_notices/test_consent.py
new file mode 100644
index 00000000..95badc98
--- /dev/null
+++ b/tests/server_notices/test_consent.py
@@ -0,0 +1,100 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.rest.client.v1 import admin, login, room
+from synapse.rest.client.v2_alpha import sync
+
+from tests import unittest
+
+
+class ConsentNoticesTests(unittest.HomeserverTestCase):
+
+ servlets = [
+ sync.register_servlets,
+ admin.register_servlets,
+ login.register_servlets,
+ room.register_servlets,
+ ]
+
+ def make_homeserver(self, reactor, clock):
+
+ self.consent_notice_message = "consent %(consent_uri)s"
+ config = self.default_config()
+ config.user_consent_version = "1"
+ config.user_consent_server_notice_content = {
+ "msgtype": "m.text",
+ "body": self.consent_notice_message,
+ }
+ config.public_baseurl = "https://example.com/"
+ config.form_secret = "123abc"
+
+ config.server_notices_mxid = "@notices:test"
+ config.server_notices_mxid_display_name = "test display name"
+ config.server_notices_mxid_avatar_url = None
+ config.server_notices_room_name = "Server Notices"
+
+ hs = self.setup_test_homeserver(config=config)
+
+ return hs
+
+ def prepare(self, reactor, clock, hs):
+ self.user_id = self.register_user("bob", "abc123")
+ self.access_token = self.login("bob", "abc123")
+
+ def test_get_sync_message(self):
+ """
+ When user consent server notices are enabled, a sync will cause a notice
+ to fire (in a room which the user is invited to). The notice contains
+ the notice URL + an authentication code.
+ """
+ # Initial sync, to get the user consent room invite
+ request, channel = self.make_request(
+ "GET", "/_matrix/client/r0/sync", access_token=self.access_token
+ )
+ self.render(request)
+ self.assertEqual(channel.code, 200)
+
+ # Get the Room ID to join
+ room_id = list(channel.json_body["rooms"]["invite"].keys())[0]
+
+ # Join the room
+ request, channel = self.make_request(
+ "POST",
+ "/_matrix/client/r0/rooms/" + room_id + "/join",
+ access_token=self.access_token,
+ )
+ self.render(request)
+ self.assertEqual(channel.code, 200)
+
+ # Sync again, to get the message in the room
+ request, channel = self.make_request(
+ "GET", "/_matrix/client/r0/sync", access_token=self.access_token
+ )
+ self.render(request)
+ self.assertEqual(channel.code, 200)
+
+ # Get the message
+ room = channel.json_body["rooms"]["join"][room_id]
+ messages = [
+ x for x in room["timeline"]["events"] if x["type"] == "m.room.message"
+ ]
+
+ # One message, with the consent URL
+ self.assertEqual(len(messages), 1)
+ self.assertTrue(
+ messages[0]["content"]["body"].startswith(
+ "consent https://example.com/_matrix/consent"
+ )
+ )
diff --git a/tests/storage/test_client_ips.py b/tests/storage/test_client_ips.py
index c9b02a06..4577e942 100644
--- a/tests/storage/test_client_ips.py
+++ b/tests/storage/test_client_ips.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -12,35 +13,41 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
from mock import Mock
from twisted.internet import defer
-import tests.unittest
-import tests.utils
+from synapse.http.site import XForwardedForRequest
+from synapse.rest.client.v1 import admin, login
+
+from tests import unittest
-class ClientIpStoreTestCase(tests.unittest.TestCase):
- def __init__(self, *args, **kwargs):
- super(ClientIpStoreTestCase, self).__init__(*args, **kwargs)
- self.store = None # type: synapse.storage.DataStore
- self.clock = None # type: tests.utils.MockClock
+class ClientIpStoreTestCase(unittest.HomeserverTestCase):
+ def make_homeserver(self, reactor, clock):
+ hs = self.setup_test_homeserver()
+ return hs
- @defer.inlineCallbacks
- def setUp(self):
- self.hs = yield tests.utils.setup_test_homeserver(self.addCleanup)
+ def prepare(self, hs, reactor, clock):
self.store = self.hs.get_datastore()
- self.clock = self.hs.get_clock()
- @defer.inlineCallbacks
def test_insert_new_client_ip(self):
- self.clock.now = 12345678
+ self.reactor.advance(12345678)
+
user_id = "@user:id"
- yield self.store.insert_client_ip(
- user_id, "access_token", "ip", "user_agent", "device_id"
+ self.get_success(
+ self.store.insert_client_ip(
+ user_id, "access_token", "ip", "user_agent", "device_id"
+ )
)
- result = yield self.store.get_last_client_ip_by_device(user_id, "device_id")
+ # Trigger the storage loop
+ self.reactor.advance(10)
+
+ result = self.get_success(
+ self.store.get_last_client_ip_by_device(user_id, "device_id")
+ )
r = result[(user_id, "device_id")]
self.assertDictContainsSubset(
@@ -55,18 +62,18 @@ class ClientIpStoreTestCase(tests.unittest.TestCase):
r,
)
- @defer.inlineCallbacks
def test_disabled_monthly_active_user(self):
self.hs.config.limit_usage_by_mau = False
self.hs.config.max_mau_value = 50
user_id = "@user:server"
- yield self.store.insert_client_ip(
- user_id, "access_token", "ip", "user_agent", "device_id"
+ self.get_success(
+ self.store.insert_client_ip(
+ user_id, "access_token", "ip", "user_agent", "device_id"
+ )
)
- active = yield self.store.user_last_seen_monthly_active(user_id)
+ active = self.get_success(self.store.user_last_seen_monthly_active(user_id))
self.assertFalse(active)
- @defer.inlineCallbacks
def test_adding_monthly_active_user_when_full(self):
self.hs.config.limit_usage_by_mau = True
self.hs.config.max_mau_value = 50
@@ -76,38 +83,119 @@ class ClientIpStoreTestCase(tests.unittest.TestCase):
self.store.get_monthly_active_count = Mock(
return_value=defer.succeed(lots_of_users)
)
- yield self.store.insert_client_ip(
- user_id, "access_token", "ip", "user_agent", "device_id"
+ self.get_success(
+ self.store.insert_client_ip(
+ user_id, "access_token", "ip", "user_agent", "device_id"
+ )
)
- active = yield self.store.user_last_seen_monthly_active(user_id)
+ active = self.get_success(self.store.user_last_seen_monthly_active(user_id))
self.assertFalse(active)
- @defer.inlineCallbacks
def test_adding_monthly_active_user_when_space(self):
self.hs.config.limit_usage_by_mau = True
self.hs.config.max_mau_value = 50
user_id = "@user:server"
- active = yield self.store.user_last_seen_monthly_active(user_id)
+ active = self.get_success(self.store.user_last_seen_monthly_active(user_id))
self.assertFalse(active)
- yield self.store.insert_client_ip(
- user_id, "access_token", "ip", "user_agent", "device_id"
+ # Trigger the saving loop
+ self.reactor.advance(10)
+
+ self.get_success(
+ self.store.insert_client_ip(
+ user_id, "access_token", "ip", "user_agent", "device_id"
+ )
)
- active = yield self.store.user_last_seen_monthly_active(user_id)
+ active = self.get_success(self.store.user_last_seen_monthly_active(user_id))
self.assertTrue(active)
- @defer.inlineCallbacks
def test_updating_monthly_active_user_when_space(self):
self.hs.config.limit_usage_by_mau = True
self.hs.config.max_mau_value = 50
user_id = "@user:server"
- yield self.store.register(user_id=user_id, token="123", password_hash=None)
+ self.get_success(
+ self.store.register(user_id=user_id, token="123", password_hash=None)
+ )
- active = yield self.store.user_last_seen_monthly_active(user_id)
+ active = self.get_success(self.store.user_last_seen_monthly_active(user_id))
self.assertFalse(active)
- yield self.store.insert_client_ip(
- user_id, "access_token", "ip", "user_agent", "device_id"
+ # Trigger the saving loop
+ self.reactor.advance(10)
+
+ self.get_success(
+ self.store.insert_client_ip(
+ user_id, "access_token", "ip", "user_agent", "device_id"
+ )
)
- active = yield self.store.user_last_seen_monthly_active(user_id)
+ active = self.get_success(self.store.user_last_seen_monthly_active(user_id))
self.assertTrue(active)
+
+
+class ClientIpAuthTestCase(unittest.HomeserverTestCase):
+
+ servlets = [admin.register_servlets, login.register_servlets]
+
+ def make_homeserver(self, reactor, clock):
+ hs = self.setup_test_homeserver()
+ return hs
+
+ def prepare(self, hs, reactor, clock):
+ self.store = self.hs.get_datastore()
+ self.user_id = self.register_user("bob", "abc123", True)
+
+ def test_request_with_xforwarded(self):
+ """
+ The IP in X-Forwarded-For is entered into the client IPs table.
+ """
+ self._runtest(
+ {b"X-Forwarded-For": b"127.9.0.1"},
+ "127.9.0.1",
+ {"request": XForwardedForRequest},
+ )
+
+ def test_request_from_getPeer(self):
+ """
+ The IP returned by getPeer is entered into the client IPs table, if
+ there's no X-Forwarded-For header.
+ """
+ self._runtest({}, "127.0.0.1", {})
+
+ def _runtest(self, headers, expected_ip, make_request_args):
+ device_id = "bleb"
+
+ access_token = self.login("bob", "abc123", device_id=device_id)
+
+ # Advance to a known time
+ self.reactor.advance(123456 - self.reactor.seconds())
+
+ request, channel = self.make_request(
+ "GET",
+ "/_matrix/client/r0/admin/users/" + self.user_id,
+ access_token=access_token,
+ **make_request_args
+ )
+ request.requestHeaders.addRawHeader(b"User-Agent", b"Mozzila pizza")
+
+ # Add the optional headers
+ for h, v in headers.items():
+ request.requestHeaders.addRawHeader(h, v)
+ self.render(request)
+
+ # Advance so the save loop occurs
+ self.reactor.advance(100)
+
+ result = self.get_success(
+ self.store.get_last_client_ip_by_device(self.user_id, device_id)
+ )
+ r = result[(self.user_id, device_id)]
+ self.assertDictContainsSubset(
+ {
+ "user_id": self.user_id,
+ "device_id": device_id,
+ "ip": expected_ip,
+ "user_agent": "Mozzila pizza",
+ "last_seen": 123456100,
+ },
+ r,
+ )
diff --git a/tests/storage/test_state.py b/tests/storage/test_state.py
index b9109659..b9c5b39d 100644
--- a/tests/storage/test_state.py
+++ b/tests/storage/test_state.py
@@ -75,6 +75,45 @@ class StateStoreTestCase(tests.unittest.TestCase):
self.assertEqual(len(s1), len(s2))
@defer.inlineCallbacks
+ def test_get_state_groups_ids(self):
+ e1 = yield self.inject_state_event(
+ self.room, self.u_alice, EventTypes.Create, '', {}
+ )
+ e2 = yield self.inject_state_event(
+ self.room, self.u_alice, EventTypes.Name, '', {"name": "test room"}
+ )
+
+ state_group_map = yield self.store.get_state_groups_ids(self.room, [e2.event_id])
+ self.assertEqual(len(state_group_map), 1)
+ state_map = list(state_group_map.values())[0]
+ self.assertDictEqual(
+ state_map,
+ {
+ (EventTypes.Create, ''): e1.event_id,
+ (EventTypes.Name, ''): e2.event_id,
+ },
+ )
+
+ @defer.inlineCallbacks
+ def test_get_state_groups(self):
+ e1 = yield self.inject_state_event(
+ self.room, self.u_alice, EventTypes.Create, '', {}
+ )
+ e2 = yield self.inject_state_event(
+ self.room, self.u_alice, EventTypes.Name, '', {"name": "test room"}
+ )
+
+ state_group_map = yield self.store.get_state_groups(
+ self.room, [e2.event_id])
+ self.assertEqual(len(state_group_map), 1)
+ state_list = list(state_group_map.values())[0]
+
+ self.assertEqual(
+ {ev.event_id for ev in state_list},
+ {e1.event_id, e2.event_id},
+ )
+
+ @defer.inlineCallbacks
def test_get_state_for_event(self):
# this defaults to a linear DAG as each new injection defaults to whatever
diff --git a/tests/storage/test_transactions.py b/tests/storage/test_transactions.py
new file mode 100644
index 00000000..14169afa
--- /dev/null
+++ b/tests/storage/test_transactions.py
@@ -0,0 +1,45 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from tests.unittest import HomeserverTestCase
+
+
+class TransactionStoreTestCase(HomeserverTestCase):
+ def prepare(self, reactor, clock, homeserver):
+ self.store = homeserver.get_datastore()
+
+ def test_get_set_transactions(self):
+ """Tests that we can successfully get a non-existent entry for
+ destination retries, as well as testing tht we can set and get
+ correctly.
+ """
+ d = self.store.get_destination_retry_timings("example.com")
+ r = self.get_success(d)
+ self.assertIsNone(r)
+
+ d = self.store.set_destination_retry_timings("example.com", 50, 100)
+ self.get_success(d)
+
+ d = self.store.get_destination_retry_timings("example.com")
+ r = self.get_success(d)
+
+ self.assert_dict({"retry_last_ts": 50, "retry_interval": 100}, r)
+
+ def test_initial_set_transactions(self):
+ """Tests that we can successfully set the destination retries (there
+ was a bug around invalidating the cache that broke this)
+ """
+ d = self.store.set_destination_retry_timings("example.com", 50, 100)
+ self.get_success(d)
diff --git a/tests/test_federation.py b/tests/test_federation.py
index 2540604f..952a0a7b 100644
--- a/tests/test_federation.py
+++ b/tests/test_federation.py
@@ -6,6 +6,7 @@ from twisted.internet.defer import maybeDeferred, succeed
from synapse.events import FrozenEvent
from synapse.types import Requester, UserID
from synapse.util import Clock
+from synapse.util.logcontext import LoggingContext
from tests import unittest
from tests.server import ThreadedMemoryReactorClock, setup_test_homeserver
@@ -117,9 +118,10 @@ class MessageAcceptTests(unittest.TestCase):
}
)
- d = self.handler.on_receive_pdu(
- "test.serv", lying_event, sent_to_us_directly=True
- )
+ with LoggingContext(request="lying_event"):
+ d = self.handler.on_receive_pdu(
+ "test.serv", lying_event, sent_to_us_directly=True
+ )
# Step the reactor, so the database fetches come back
self.reactor.advance(1)
@@ -139,107 +141,3 @@ class MessageAcceptTests(unittest.TestCase):
self.homeserver.datastore.get_latest_event_ids_in_room, self.room_id
)
self.assertEqual(self.successResultOf(extrem)[0], "$join:test.serv")
-
- def test_cant_hide_past_history(self):
- """
- If you send a message, you must be able to provide the direct
- prev_events that said event references.
- """
-
- def post_json(destination, path, data, headers=None, timeout=0):
- if path.startswith("/_matrix/federation/v1/get_missing_events/"):
- return {
- "events": [
- {
- "room_id": self.room_id,
- "sender": "@baduser:test.serv",
- "event_id": "three:test.serv",
- "depth": 1000,
- "origin_server_ts": 1,
- "type": "m.room.message",
- "origin": "test.serv",
- "content": "hewwo?",
- "auth_events": [],
- "prev_events": [("four:test.serv", {})],
- }
- ]
- }
-
- self.http_client.post_json = post_json
-
- def get_json(destination, path, args, headers=None):
- if path.startswith("/_matrix/federation/v1/state_ids/"):
- d = self.successResultOf(
- self.homeserver.datastore.get_state_ids_for_event("one:test.serv")
- )
-
- return succeed(
- {
- "pdu_ids": [
- y
- for x, y in d.items()
- if x == ("m.room.member", "@us:test")
- ],
- "auth_chain_ids": list(d.values()),
- }
- )
-
- self.http_client.get_json = get_json
-
- # Figure out what the most recent event is
- most_recent = self.successResultOf(
- maybeDeferred(
- self.homeserver.datastore.get_latest_event_ids_in_room, self.room_id
- )
- )[0]
-
- # Make a good event
- good_event = FrozenEvent(
- {
- "room_id": self.room_id,
- "sender": "@baduser:test.serv",
- "event_id": "one:test.serv",
- "depth": 1000,
- "origin_server_ts": 1,
- "type": "m.room.message",
- "origin": "test.serv",
- "content": "hewwo?",
- "auth_events": [],
- "prev_events": [(most_recent, {})],
- }
- )
-
- d = self.handler.on_receive_pdu(
- "test.serv", good_event, sent_to_us_directly=True
- )
- self.reactor.advance(1)
- self.assertEqual(self.successResultOf(d), None)
-
- bad_event = FrozenEvent(
- {
- "room_id": self.room_id,
- "sender": "@baduser:test.serv",
- "event_id": "two:test.serv",
- "depth": 1000,
- "origin_server_ts": 1,
- "type": "m.room.message",
- "origin": "test.serv",
- "content": "hewwo?",
- "auth_events": [],
- "prev_events": [("one:test.serv", {}), ("three:test.serv", {})],
- }
- )
-
- d = self.handler.on_receive_pdu(
- "test.serv", bad_event, sent_to_us_directly=True
- )
- self.reactor.advance(1)
-
- extrem = maybeDeferred(
- self.homeserver.datastore.get_latest_event_ids_in_room, self.room_id
- )
- self.assertEqual(self.successResultOf(extrem)[0], "two:test.serv")
-
- state = self.homeserver.get_state_handler().get_current_state_ids(self.room_id)
- self.reactor.advance(1)
- self.assertIn(("m.room.member", "@us:test"), self.successResultOf(state).keys())
diff --git a/tests/test_server.py b/tests/test_server.py
index ef74544e..4045fdad 100644
--- a/tests/test_server.py
+++ b/tests/test_server.py
@@ -1,14 +1,35 @@
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
import re
+from six import StringIO
+
from twisted.internet.defer import Deferred
-from twisted.test.proto_helpers import MemoryReactorClock
+from twisted.python.failure import Failure
+from twisted.test.proto_helpers import AccumulatingProtocol, MemoryReactorClock
+from twisted.web.resource import Resource
+from twisted.web.server import NOT_DONE_YET
from synapse.api.errors import Codes, SynapseError
from synapse.http.server import JsonResource
+from synapse.http.site import SynapseSite, logger
from synapse.util import Clock
from tests import unittest
-from tests.server import make_request, render, setup_test_homeserver
+from tests.server import FakeTransport, make_request, render, setup_test_homeserver
class JsonResourceTests(unittest.TestCase):
@@ -121,3 +142,52 @@ class JsonResourceTests(unittest.TestCase):
self.assertEqual(channel.result["code"], b'400')
self.assertEqual(channel.json_body["error"], "Unrecognized request")
self.assertEqual(channel.json_body["errcode"], "M_UNRECOGNIZED")
+
+
+class SiteTestCase(unittest.HomeserverTestCase):
+ def test_lose_connection(self):
+ """
+ We log the URI correctly redacted when we lose the connection.
+ """
+
+ class HangingResource(Resource):
+ """
+ A Resource that strategically hangs, as if it were processing an
+ answer.
+ """
+
+ def render(self, request):
+ return NOT_DONE_YET
+
+ # Set up a logging handler that we can inspect afterwards
+ output = StringIO()
+ handler = logging.StreamHandler(output)
+ logger.addHandler(handler)
+ old_level = logger.level
+ logger.setLevel(10)
+ self.addCleanup(logger.setLevel, old_level)
+ self.addCleanup(logger.removeHandler, handler)
+
+ # Make a resource and a Site, the resource will hang and allow us to
+ # time out the request while it's 'processing'
+ base_resource = Resource()
+ base_resource.putChild(b'', HangingResource())
+ site = SynapseSite("test", "site_tag", {}, base_resource, "1.0")
+
+ server = site.buildProtocol(None)
+ client = AccumulatingProtocol()
+ client.makeConnection(FakeTransport(server, self.reactor))
+ server.makeConnection(FakeTransport(client, self.reactor))
+
+ # Send a request with an access token that will get redacted
+ server.dataReceived(b"GET /?access_token=bar HTTP/1.0\r\n\r\n")
+ self.pump()
+
+ # Lose the connection
+ e = Failure(Exception("Failed123"))
+ server.connectionLost(e)
+ handler.flush()
+
+ # Our access token is redacted and the failure reason is logged.
+ self.assertIn("/?access_token=<redacted>", output.getvalue())
+ self.assertIn("Failed123", output.getvalue())
diff --git a/tests/unittest.py b/tests/unittest.py
index a3d39920..a59291cc 100644
--- a/tests/unittest.py
+++ b/tests/unittest.py
@@ -14,6 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import hashlib
+import hmac
import logging
from mock import Mock
@@ -26,11 +28,13 @@ from twisted.internet.defer import Deferred
from twisted.trial import unittest
from synapse.http.server import JsonResource
+from synapse.http.site import SynapseRequest
from synapse.server import HomeServer
from synapse.types import UserID, create_requester
from synapse.util.logcontext import LoggingContextFilter
from tests.server import get_clock, make_request, render, setup_test_homeserver
+from tests.utils import default_config
# Set up putting Synapse's logs into Trial's.
rootLogger = logging.getLogger()
@@ -219,7 +223,17 @@ class HomeserverTestCase(TestCase):
Function to be overridden in subclasses.
"""
- raise NotImplementedError()
+ hs = self.setup_test_homeserver()
+ return hs
+
+ def default_config(self, name="test"):
+ """
+ Get a default HomeServer config object.
+
+ Args:
+ name (str): The homeserver name/domain.
+ """
+ return default_config(name)
def prepare(self, reactor, clock, homeserver):
"""
@@ -236,7 +250,9 @@ class HomeserverTestCase(TestCase):
Function to optionally be overridden in subclasses.
"""
- def make_request(self, method, path, content=b""):
+ def make_request(
+ self, method, path, content=b"", access_token=None, request=SynapseRequest
+ ):
"""
Create a SynapseRequest at the path using the method and containing the
given content.
@@ -254,7 +270,7 @@ class HomeserverTestCase(TestCase):
if isinstance(content, dict):
content = json.dumps(content).encode('utf8')
- return make_request(method, path, content)
+ return make_request(method, path, content, access_token, request)
def render(self, request):
"""
@@ -293,3 +309,69 @@ class HomeserverTestCase(TestCase):
return d
self.pump()
return self.successResultOf(d)
+
+ def register_user(self, username, password, admin=False):
+ """
+ Register a user. Requires the Admin API be registered.
+
+ Args:
+ username (bytes/unicode): The user part of the new user.
+ password (bytes/unicode): The password of the new user.
+ admin (bool): Whether the user should be created as an admin
+ or not.
+
+ Returns:
+ The MXID of the new user (unicode).
+ """
+ self.hs.config.registration_shared_secret = u"shared"
+
+ # Create the user
+ request, channel = self.make_request("GET", "/_matrix/client/r0/admin/register")
+ self.render(request)
+ nonce = channel.json_body["nonce"]
+
+ want_mac = hmac.new(key=b"shared", digestmod=hashlib.sha1)
+ nonce_str = b"\x00".join([username.encode('utf8'), password.encode('utf8')])
+ if admin:
+ nonce_str += b"\x00admin"
+ else:
+ nonce_str += b"\x00notadmin"
+ want_mac.update(nonce.encode('ascii') + b"\x00" + nonce_str)
+ want_mac = want_mac.hexdigest()
+
+ body = json.dumps(
+ {
+ "nonce": nonce,
+ "username": username,
+ "password": password,
+ "admin": admin,
+ "mac": want_mac,
+ }
+ )
+ request, channel = self.make_request(
+ "POST", "/_matrix/client/r0/admin/register", body.encode('utf8')
+ )
+ self.render(request)
+ self.assertEqual(channel.code, 200)
+
+ user_id = channel.json_body["user_id"]
+ return user_id
+
+ def login(self, username, password, device_id=None):
+ """
+ Log in a user, and get an access token. Requires the Login API be
+ registered.
+
+ """
+ body = {"type": "m.login.password", "user": username, "password": password}
+ if device_id:
+ body["device_id"] = device_id
+
+ request, channel = self.make_request(
+ "POST", "/_matrix/client/r0/login", json.dumps(body).encode('utf8')
+ )
+ self.render(request)
+ self.assertEqual(channel.code, 200)
+
+ access_token = channel.json_body["access_token"].encode('ascii')
+ return access_token
diff --git a/tests/util/test_expiring_cache.py b/tests/util/test_expiring_cache.py
index 5cbada4e..50bc7702 100644
--- a/tests/util/test_expiring_cache.py
+++ b/tests/util/test_expiring_cache.py
@@ -65,7 +65,6 @@ class ExpiringCacheTestCase(unittest.TestCase):
def test_time_eviction(self):
clock = MockClock()
cache = ExpiringCache("test", clock, expiry_ms=1000)
- cache.start()
cache["key"] = 1
clock.advance_time(0.5)
diff --git a/tests/util/test_logcontext.py b/tests/util/test_logcontext.py
index 4633db77..8adaee3c 100644
--- a/tests/util/test_logcontext.py
+++ b/tests/util/test_logcontext.py
@@ -159,6 +159,11 @@ class LoggingContextTestCase(unittest.TestCase):
self.assertEqual(r, "bum")
self._check_test_key("one")
+ def test_nested_logging_context(self):
+ with LoggingContext(request="foo"):
+ nested_context = logcontext.nested_logging_context(suffix="bar")
+ self.assertEqual(nested_context.request, "foo-bar")
+
# a function which returns a deferred which has been "called", but
# which had a function which returned another incomplete deferred on
diff --git a/tests/utils.py b/tests/utils.py
index 215226de..dd347a0c 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -16,7 +16,9 @@
import atexit
import hashlib
import os
+import time
import uuid
+import warnings
from inspect import getcallargs
from mock import Mock, patch
@@ -94,6 +96,64 @@ def setupdb():
atexit.register(_cleanup)
+def default_config(name):
+ """
+ Create a reasonable test config.
+ """
+ config = Mock()
+ config.signing_key = [MockKey()]
+ config.event_cache_size = 1
+ config.enable_registration = True
+ config.macaroon_secret_key = "not even a little secret"
+ config.expire_access_token = False
+ config.server_name = name
+ config.trusted_third_party_id_servers = []
+ config.room_invite_state_types = []
+ config.password_providers = []
+ config.worker_replication_url = ""
+ config.worker_app = None
+ config.email_enable_notifs = False
+ config.block_non_admin_invites = False
+ config.federation_domain_whitelist = None
+ config.federation_rc_reject_limit = 10
+ config.federation_rc_sleep_limit = 10
+ config.federation_rc_sleep_delay = 100
+ config.federation_rc_concurrent = 10
+ config.filter_timeline_limit = 5000
+ config.user_directory_search_all_users = False
+ config.user_consent_server_notice_content = None
+ config.block_events_without_consent_error = None
+ config.media_storage_providers = []
+ config.auto_join_rooms = []
+ config.limit_usage_by_mau = False
+ config.hs_disabled = False
+ config.hs_disabled_message = ""
+ config.hs_disabled_limit_type = ""
+ config.max_mau_value = 50
+ config.mau_trial_days = 0
+ config.mau_limits_reserved_threepids = []
+ config.admin_contact = None
+ config.rc_messages_per_second = 10000
+ config.rc_message_burst_count = 10000
+
+ config.use_frozen_dicts = False
+
+ # we need a sane default_room_version, otherwise attempts to create rooms will
+ # fail.
+ config.default_room_version = "1"
+
+ # disable user directory updates, because they get done in the
+ # background, which upsets the test runner.
+ config.update_user_directory = False
+
+ def is_threepid_reserved(threepid):
+ return ServerConfig.is_threepid_reserved(config, threepid)
+
+ config.is_threepid_reserved.side_effect = is_threepid_reserved
+
+ return config
+
+
class TestHomeServer(HomeServer):
DATASTORE_CLASS = DataStore
@@ -122,56 +182,8 @@ def setup_test_homeserver(
from twisted.internet import reactor
if config is None:
- config = Mock()
- config.signing_key = [MockKey()]
- config.event_cache_size = 1
- config.enable_registration = True
- config.macaroon_secret_key = "not even a little secret"
- config.expire_access_token = False
- config.server_name = name
- config.trusted_third_party_id_servers = []
- config.room_invite_state_types = []
- config.password_providers = []
- config.worker_replication_url = ""
- config.worker_app = None
- config.email_enable_notifs = False
- config.block_non_admin_invites = False
- config.federation_domain_whitelist = None
- config.federation_rc_reject_limit = 10
- config.federation_rc_sleep_limit = 10
- config.federation_rc_sleep_delay = 100
- config.federation_rc_concurrent = 10
- config.filter_timeline_limit = 5000
- config.user_directory_search_all_users = False
- config.user_consent_server_notice_content = None
- config.block_events_without_consent_error = None
- config.media_storage_providers = []
- config.auto_join_rooms = []
- config.limit_usage_by_mau = False
- config.hs_disabled = False
- config.hs_disabled_message = ""
- config.hs_disabled_limit_type = ""
- config.max_mau_value = 50
- config.mau_trial_days = 0
- config.mau_limits_reserved_threepids = []
- config.admin_contact = None
- config.rc_messages_per_second = 10000
- config.rc_message_burst_count = 10000
-
- # we need a sane default_room_version, otherwise attempts to create rooms will
- # fail.
- config.default_room_version = "1"
-
- # disable user directory updates, because they get done in the
- # background, which upsets the test runner.
- config.update_user_directory = False
-
- def is_threepid_reserved(threepid):
- return ServerConfig.is_threepid_reserved(config, threepid)
-
- config.is_threepid_reserved.side_effect = is_threepid_reserved
-
- config.use_frozen_dicts = True
+ config = default_config(name)
+
config.ldap_enabled = False
if "clock" not in kargs:
@@ -237,20 +249,41 @@ def setup_test_homeserver(
else:
# We need to do cleanup on PostgreSQL
def cleanup():
+ import psycopg2
+
# Close all the db pools
hs.get_db_pool().close()
+ dropped = False
+
# Drop the test database
db_conn = db_engine.module.connect(
database=POSTGRES_BASE_DB, user=POSTGRES_USER
)
db_conn.autocommit = True
cur = db_conn.cursor()
- cur.execute("DROP DATABASE IF EXISTS %s;" % (test_db,))
- db_conn.commit()
+
+ # Try a few times to drop the DB. Some things may hold on to the
+ # database for a few more seconds due to flakiness, preventing
+ # us from dropping it when the test is over. If we can't drop
+ # it, warn and move on.
+ for x in range(5):
+ try:
+ cur.execute("DROP DATABASE IF EXISTS %s;" % (test_db,))
+ db_conn.commit()
+ dropped = True
+ except psycopg2.OperationalError as e:
+ warnings.warn(
+ "Couldn't drop old db: " + str(e), category=UserWarning
+ )
+ time.sleep(0.5)
+
cur.close()
db_conn.close()
+ if not dropped:
+ warnings.warn("Failed to drop old DB.", category=UserWarning)
+
if not LEAVE_DB:
# Register the cleanup hook
cleanup_func(cleanup)
diff --git a/tox.ini b/tox.ini
index 80ac9324..87b5e478 100644
--- a/tox.ini
+++ b/tox.ini
@@ -4,7 +4,7 @@ envlist = packaging, py27, py36, pep8, check_isort
[base]
deps =
coverage
- Twisted>=15.1
+ Twisted>=17.1
mock
python-subunit
junitxml
@@ -64,12 +64,42 @@ setenv =
{[base]setenv}
SYNAPSE_POSTGRES = 1
+# A test suite for the oldest supported versions of Python libraries, to catch
+# any uses of APIs not available in them.
+[testenv:py27-old]
+skip_install=True
+deps =
+ # Old automat version for Twisted
+ Automat == 0.3.0
+
+ mock
+ lxml
+commands =
+ /usr/bin/find "{toxinidir}" -name '*.pyc' -delete
+ # Make all greater-thans equals so we test the oldest version of our direct
+ # dependencies, but make the pyopenssl 17.0, which can work against an
+ # OpenSSL 1.1 compiled cryptography (as older ones don't compile on Travis).
+ /bin/sh -c 'python -m synapse.python_dependencies | sed -e "s/>=/==/g" -e "s/psycopg2==2.6//" -e "s/pyopenssl==16.0.0/pyopenssl==17.0.0/" | xargs pip install'
+ # Install Synapse itself. This won't update any libraries.
+ pip install -e .
+ {envbindir}/trial {env:TRIAL_FLAGS:} {posargs:tests} {env:TOXSUFFIX:}
+
[testenv:py35]
usedevelop=true
[testenv:py36]
usedevelop=true
+[testenv:py36-postgres]
+usedevelop=true
+deps =
+ {[base]deps}
+ psycopg2
+setenv =
+ {[base]setenv}
+ SYNAPSE_POSTGRES = 1
+
+
[testenv:packaging]
deps =
check-manifest