summaryrefslogtreecommitdiff
path: root/netdisco/smartglass.py
blob: 4244a2b8ba9abfc8bd9e32799533bc15b0db8710 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
"""Xbox One SmartGlass device discovery."""
import socket
import struct
import binascii
from datetime import timedelta


DISCOVERY_PORT = 5050
DISCOVERY_ADDRESS_BCAST = '<broadcast>'
DISCOVERY_ADDRESS_MCAST = '239.255.255.250'
DISCOVERY_REQUEST = 0xDD00
DISCOVERY_RESPONSE = 0xDD01
DISCOVERY_TIMEOUT = timedelta(seconds=2)

"""
SmartGlass Client type
XboxOne = 1
Xbox360 = 2
WindowsDesktop = 3
WindowsStore = 4
WindowsPhone = 5
iPhone = 6
iPad = 7
Android = 8
"""
DISCOVERY_CLIENT_TYPE = 4


class XboxSmartGlass:
    """Base class to discover Xbox SmartGlass devices."""

    def __init__(self):
        """Initialize the Xbox SmartGlass discovery."""
        self.entries = []
        self._discovery_payload = self.discovery_packet()

    @staticmethod
    def discovery_packet():
        """Assemble discovery payload."""
        version = 0
        flags = 0
        min_version = 0
        max_version = 2

        payload = struct.pack(
            '>IHHH',
            flags, DISCOVERY_CLIENT_TYPE, min_version, max_version
        )
        header = struct.pack(
            '>HHH',
            DISCOVERY_REQUEST, len(payload), version
        )
        return header + payload

    @staticmethod
    def parse_discovery_response(data):
        """Parse console's discovery response."""
        pos = 0
        # Header
        # pkt_type, payload_len, version = struct.unpack_from(
        #    '>HHH',
        #    data, pos
        # )
        pos += 6
        # Payload
        flags, type_, name_len = struct.unpack_from(
            '>IHH',
            data, pos
        )
        pos += 8
        name = data[pos:pos + name_len]
        pos += name_len + 1  # including null terminator
        uuid_len = struct.unpack_from(
            '>H',
            data, pos
        )[0]
        pos += 2
        uuid = data[pos:pos + uuid_len]
        pos += uuid_len + 1  # including null terminator
        last_error, cert_len = struct.unpack_from(
            '>IH',
            data, pos
        )
        pos += 6
        cert = data[pos:pos + cert_len]

        return {
            'device_type': type_,
            'flags': flags,
            'name': name.decode('utf-8'),
            'uuid': uuid.decode('utf-8'),
            'last_error': last_error,
            'certificate': binascii.hexlify(cert).decode('utf-8')
        }

    def scan(self):
        """Scan the network."""
        self.update()

    def all(self):
        """Scan and return all found entries."""
        self.scan()
        return self.entries

    @staticmethod
    def verify_packet(data):
        """Parse packet if it has correct magic"""
        if len(data) < 2:
            return None

        pkt_type = struct.unpack_from('>H', data)[0]
        if pkt_type != DISCOVERY_RESPONSE:
            return None

        return XboxSmartGlass.parse_discovery_response(data)

    def update(self):
        """Scan network for Xbox SmartGlass devices."""
        entries = []

        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock.settimeout(DISCOVERY_TIMEOUT.seconds)
        sock.sendto(self._discovery_payload,
                    (DISCOVERY_ADDRESS_BCAST, DISCOVERY_PORT))
        sock.sendto(self._discovery_payload,
                    (DISCOVERY_ADDRESS_MCAST, DISCOVERY_PORT))

        while True:
            try:
                data, (address, _) = sock.recvfrom(1024)

                response = self.verify_packet(data)
                if response:
                    entries.append((address, response))

            except socket.timeout:
                break

            self.entries = entries

        sock.close()


def main():
    """Test XboxOne discovery."""
    from pprint import pprint
    xbsmartglass = XboxSmartGlass()
    pprint("Scanning for Xbox One SmartGlass consoles devices..")
    xbsmartglass.update()
    pprint(xbsmartglass.entries)


if __name__ == "__main__":
    main()