you are going to need to pass the full path and filename to current_media instead of a URL
this new script sets up an http server that should setup everything the device needs.
example of setting the media
current_media = r'c:\some_folder\some_avi_file.avi'
you need to make sure you put the r in the front or escape all of the backslashes
You can change the PORT to whatever port you want the server to listen on.
Code: Select all
import re
import socket
import threading
from contextlib import contextmanager
from urllib2 import urlopen
import mimetypes
import os
import SimpleHTTPServer
import SocketServer
PORT = 8000
class DLNA(object):
DIDL = '''<?xml version="1.0"?>
<DIDL-Lite xmlns="urn:schemas-upnp-org:metadata-1-0/DIDL-Lite/" xmlns:upnp="urn:schemas-upnp-org:metadata-1-0/upnp/" xmlns:dc="http://purl.org/dc/elements/1.1/">
<item id="{id}" parentID="{parent_id}" restricted="0">
<dc:title>{title}</dc:title>
<dc:creator>{creator}</dc:creator>
<upnp:class>{klass}</upnp:class>
<res protocolInfo="http-get:*:{mime_type}:*">{url}</res>
</item>
</DIDL-Lite>
'''
PORT = PORT
SSDP_GROUP = ("239.255.255.250", 1900)
URN_AVTransport = "urn:schemas-upnp-org:service:AVTransport:1"
URN_AVTransport_Fmt = "urn:schemas-upnp-org:service:AVTransport:{0}"
URN_RenderingControl = "urn:schemas-upnp-org:service:RenderingControl:1"
URN_RenderingControl_Fmt = "urn:schemas-upnp-org:service:RenderingControl:{0}"
SSDP_ALL = "ssdp:all"
class Handler(SimpleHTTPServer.SimpleHTTPRequestHandler):
file_path = ''
def do_HEAD(self):
self.send_response(200)
self.send_header('contentFeatures.dlna.org', '*')
self.send_header('transferMode.dlna.org', 'Streaming')
self.send_header('Accept-Ranges', 'bytes')
self.send_header('Cache-Control', 'no-cache')
self.send_header("Content-type", self.guess_type(self.file_path))
self.send_header("Content-Length", os.path.getsize(self.file_path))
self.end_headers()
self.copyfile(open(self.file_path, 'rb'), self.wfile)
def get_device(self, device):
return self[device]
def __iter__(self):
for device in self._devices:
yield device
def __contains__(self, item):
return self[item] is not None
def __getitem__(self, item):
for device in self._devices:
if item in (device.name, device.ip):
return device
return None
def __init__(self):
self._devices = []
def do():
self.discover(timeout=10)
t = threading.Thread(target=do)
t.daemon = True
t.start()
def discover(self, timeout=1, st=SSDP_ALL, mx=3, ssdp_version=1):
""" Discover UPnP devices in the local network.
name -- name or part of the name to filter devices
timeout -- timeout to perform discover
st -- st field of discovery packet
mx -- mx field of discovery packet
return -- list of DlnapDevice
"""
st = st.format(ssdp_version)
payload = "\r\n".join([
'M-SEARCH * HTTP/1.1',
'User-Agent: EventGhost/0.5',
'HOST: {0}:{1}'.format(*DLNA.SSDP_GROUP),
'Accept: */*',
'MAN: "ssdp:discover"',
'ST: {0}'.format(st),
'MX: {0}'.format(mx),
'',
''])
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.settimeout(timeout)
sock.bind((socket.gethostbyname(socket.gethostname()), 1025))
for _ in range(3):
sock.sendto(payload, DLNA.SSDP_GROUP)
try:
while True:
data, addr = sock.recvfrom(1024)
if DLNA.URN_AVTransport in data or DLNA.URN_RenderingControl in data:
d = self.DlnapDevice(data, addr[0])
d.ssdp_version = ssdp_version
if d not in self._devices:
self._devices.append(d)
except socket.timeout:
pass
finally:
try:
sock.close()
except socket.error:
pass
class DlnapDevice:
def __init__(self, raw, ip):
self.ip = ip
self.ssdp_version = 1
self.port = None
self.name = 'Unknown'
self.control_url = None
self.rendering_control_url = None
self.has_av_transport = False
self.__raw = raw.decode()
self.location = DLNA._get_location_url(self.__raw)
self.port = DLNA._get_port(self.location)
raw_desc_xml = urlopen(self.location).read().decode()
self.__desc_xml = DLNA._xml2dict(raw_desc_xml)
self.name = DLNA._get_friendly_name(
self.__desc_xml
)
self.control_url = DLNA._get_control_url(
self.__desc_xml,
DLNA.URN_AVTransport
)
self.rendering_control_url = DLNA._get_control_url(
self.__desc_xml,
DLNA.URN_RenderingControl
)
self.has_av_transport = self.control_url is not None
def __repr__(self):
return '{0} @ {1}'.format(self.name, self.ip)
def __eq__(self, d):
return self.name == d.name and self.ip == d.ip
def _payload_from_template(self, action, data, urn):
""" Assembly payload from template.
"""
fields = ''
for tag, value in data.items():
fields += '<{tag}>{value}</{tag}>'.format(tag=tag, value=value)
payload = """<?xml version="1.0" encoding="utf-8"?>
<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<s:Body>
<u:{action} xmlns:u="{urn}">
{fields}
</u:{action}>
</s:Body>
</s:Envelope>""".format(action=action, urn=urn, fields=fields)
return payload
def _create_packet(self, action, data):
""" Create packet to send to device control url.
action -- control action
data -- dictionary with XML fields value
"""
if action in ["SetVolume", "SetMute", "GetVolume"]:
url = self.rendering_control_url
urn = DLNA.URN_RenderingControl_Fmt.format(self.ssdp_version)
else:
url = self.control_url
urn = DLNA.URN_AVTransport_Fmt.format(self.ssdp_version)
payload = self._payload_from_template(action=action, data=data,
urn=urn)
packet = "\r\n".join([
'POST {} HTTP/1.1'.format(url),
'User-Agent: EventGhost/0.5',
'Accept: */*',
'Content-Type: text/xml; charset="utf-8"',
'HOST: {0}:{1}'.format(self.ip, self.port),
'Content-Length: {0}'.format(len(payload)),
'SOAPACTION: "{0}#{1}"'.format(urn, action),
'Connection: close',
'',
payload,
])
return packet
def guess_type(self, path):
if not mimetypes.inited:
mimetypes.init()
base, ext = os.path.splitext(path)
if ext in mimetypes.types_map:
return mimetypes.types_map[ext]
return ''
@property
def current_media(self):
return self.media_info
@current_media.setter
def current_media(self, path):
""" Set media to playback.
url -- media url
instance_id -- device instance id
"""
DLNA.Handler.file_path = path
id = 1
parent_id = 1
mime_type = self.guess_type(path)
if mime_type.startswith('audio'):
klass = 'object.item.audioItem.musicTrack'
else:
klass = 'object.item.videoItem'
title = os.path.splitext(os.path.split(path)[1])[0]
creator= 'None'
url = 'http://{host}:port/file_name'.format(
host=socket.gethostbyname(socket.gethostname()),
port=PORT,
file_name=os.path.split(path)[1]
)
packet = self._create_packet(
'SetAVTransportURI',
dict(
InstanceID=0,
CurrentURI=url,
CurrentURIMetaData=DLNA._escape_xml(DLNA.DIDL.format(
url=url,
klass=klass,
mime_type=mime_type,
title=title,
creator=creator,
id=id,
parent_id=parent_id
))
)
)
DLNA._send_tcp((self.ip, self.port), packet)
def play(self):
""" Play media that was already set as current.
instance_id -- device instance id
"""
packet = self._create_packet(
'Play',
dict(InstanceID=0, Speed=1)
)
DLNA._send_tcp((self.ip, self.port), packet)
def pause(self):
""" Pause media that is currently playing back.
instance_id -- device instance id
"""
packet = self._create_packet(
'Pause',
dict(InstanceID=0, Speed=1)
)
DLNA._send_tcp((self.ip, self.port), packet)
def stop(self):
""" Stop media that is currently playing back.
instance_id -- device instance id
"""
packet = self._create_packet(
'Stop',
dict(InstanceID=0, Speed=1)
)
DLNA._send_tcp((self.ip, self.port), packet)
def seek(self, position):
"""
Seek position
"""
packet = self._create_packet(
'Seek',
dict(InstanceID=0, Unit='REL_TIME', Target=position)
)
DLNA._send_tcp((self.ip, self.port), packet)
@property
def volume(self):
packet = self._create_packet(
'GetVolume',
dict(InstanceID=0, Channel='Master')
)
return DLNA._send_tcp((self.ip, self.port), packet)
@volume.setter
def volume(self, volume=10):
""" Stop media that is currently playing back.
instance_id -- device instance id
"""
packet = self._create_packet(
'SetVolume',
dict(InstanceID=0, DesiredVolume=volume,
Channel='Master')
)
DLNA._send_tcp((self.ip, self.port), packet)
@property
def mute(self):
return None
@mute.setter
def mute(self, value):
""" Stop media that is currently playing back.
instance_id -- device instance id
"""
packet = self._create_packet(
'SetMute',
dict(
InstanceID=0,
DesiredMute=str(int(value)),
Channel='Master'
)
)
DLNA._send_tcp((self.ip, self.port), packet)
@property
def info(self):
""" Transport info.
instance_id -- device instance id
"""
packet = self._create_packet(
'GetTransportInfo',
dict(InstanceID=0)
)
return DLNA._send_tcp((self.ip, self.port), packet)
@property
def media_info(self):
""" Media info.
instance_id -- device instance id
"""
packet = self._create_packet(
'GetMediaInfo',
dict(InstanceID=0)
)
return DLNA._send_tcp((self.ip, self.port), packet)
@property
def position_info(self):
""" Position info.
instance_id -- device instance id
"""
packet = self._create_packet(
'GetPositionInfo',
dict(InstanceID=0)
)
return DLNA._send_tcp((self.ip, self.port), packet)
@staticmethod
def _get_tag_value(x, i=0):
""" Get the nearest to 'i' position xml tag name.
x -- xml string
i -- position to start searching tag from
return -- (tag, value) pair.
e.g
<d>
<e>value4</e>
</d>
result is ('d', '<e>value4</e>')
"""
x = x.strip()
value = ''
tag = ''
# skip <? > tag
if x[i:].startswith('<?'):
i += 2
while i < len(x) and x[i] != '<':
i += 1
# check for empty tag like '</tag>'
if x[i:].startswith('</'):
i += 2
in_attr = False
while i < len(x) and x[i] != '>':
if x[i] == ' ':
in_attr = True
if not in_attr:
tag += x[i]
i += 1
return (tag.strip(), '', x[i + 1:])
# not an xml, treat like a value
if not x[i:].startswith('<'):
return ('', x[i:], '')
i += 1 # <
# read first open tag
in_attr = False
while i < len(x) and x[i] != '>':
# get rid of attributes
if x[i] == ' ':
in_attr = True
if not in_attr:
tag += x[i]
i += 1
i += 1 # >
# replace self-closing <tag/> by <tag>None</tag>
empty_elmt = '<' + tag + ' />'
closed_elmt = '<' + tag + '>None</' + tag + '>'
if x.startswith(empty_elmt):
x = x.replace(empty_elmt, closed_elmt)
while i < len(x):
value += x[i]
if x[i] == '>' and value.endswith('</' + tag + '>'):
# Note: will not work with xml like <a> <a></a> </a>
close_tag_len = len(tag) + 2 # />
value = value[:-close_tag_len]
break
i += 1
return (tag.strip(), value[:-1], x[i + 1:])
@staticmethod
def _xml2dict(s, ignoreUntilXML=False):
if ignoreUntilXML:
s = ''.join(re.findall(".*?(<.*)", s, re.M))
d = {}
while s:
tag, value, s = DLNA._get_tag_value(s)
value = value.strip()
isXml, dummy, dummy2 = DLNA._get_tag_value(value)
if tag not in d:
d[tag] = []
if not isXml:
if not value:
continue
d[tag].append(value.strip())
else:
if tag not in d:
d[tag] = []
d[tag].append(DLNA._xml2dict(value))
return d
s = """
hello
this is a bad
strings
<?xml version="1.0"?>
<a any_tag="tag value">
<b><bb>value1</bb></b>
<b><bb>value2</bb> <v>value3</v></b>
</c>
<d>
<e>value4</e>
</d>
<g>value</g>
</a>
"""
@staticmethod
def _xpath(d, path):
""" Return value from xml dictionary at path.
d -- xml dictionary
path -- string path like root/device/serviceList/service@serviceType=URN_AVTransport/controlURL
return -- value at path or None if path not found
"""
for p in path.split('/'):
tag_attr = p.split('@')
tag = tag_attr[0]
if tag not in d:
return None
attr = tag_attr[1] if len(tag_attr) > 1 else ''
if attr:
a, aval = attr.split('=')
for s in d[tag]:
if s[a] == [aval]:
d = s
break
else:
d = d[tag][0]
return d
running = False
@staticmethod
def _get_port(location):
""" Extract port number from url.
location -- string like http://anyurl:port/whatever/path
return -- port number
"""
port = re.findall('http://.*?:(\d+).*', location)
return int(port[0]) if port else 80
@staticmethod
def _get_control_url(xml, urn):
""" Extract AVTransport contol url from device description xml
xml -- device description xml
return -- control url or empty string if wasn't found
"""
return DLNA._xpath(
xml,
'root/device/serviceList/service'
'@serviceType={0}/controlURL'.format(urn)
)
@staticmethod
@contextmanager
def _send_udp(to, packet):
""" Send UDP message to group
to -- (host, port) group to send the packet to
packet -- message to send
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM,
socket.IPPROTO_UDP)
sock.sendto(packet.encode(), to)
yield sock
sock.close()
@staticmethod
def _escape_xml(xml):
""" Replace escaped xml symbols with real ones.
"""
return xml.replace('<', '<').replace('>', '>').replace('"', '"')
@staticmethod
def _unescape_xml(xml):
""" Replace escaped xml symbols with real ones.
"""
return xml.replace('<', '<').replace('>', '>').replace('"',
'"')
@staticmethod
def _send_tcp(to, payload):
""" Send TCP message to group
to -- (host, port) group to send to payload to
payload -- message to send
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
data = ''
try:
sock.connect(to)
sock.sendall(payload.encode('utf-8'))
data = sock.recv(2048)
data = DLNA._xml2dict(DLNA._unescape_xml(data), True)
errorDescription = DLNA._xpath(
data,
's:Envelope/s:Body/s:Fault/detail/UPnPError/errorDescription'
)
if errorDescription is not None:
data = dict(
data=data,
error=errorDescription
)
except socket.error:
pass
finally:
try:
sock.close()
except socket.error:
pass
return data
@staticmethod
def _get_location_url(raw):
""" Extract device description url from discovery response
raw -- raw discovery response
return -- location url string
"""
t = re.findall('\n(?i)location:\s*(.*)\r\s*', raw, re.M)
if len(t) > 0:
return t[0]
return ''
@staticmethod
def _get_friendly_name(xml):
""" Extract device name from description xml
xml -- device description xml
return -- device name
"""
name = DLNA._xpath(xml, 'root/device/friendlyName')
return name if name is not None else 'Unknown'
@staticmethod
def _get_serve_ip(target_ip, target_port=80):
""" Find ip address of network interface used to communicate with target
target-ip -- ip address of target
return -- ip address of interface connected to target
"""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect((target_ip, target_port))
my_ip = s.getsockname()[0]
s.close()
return my_ip
eg.globals.DLNA = DLNA()
def run(prt):
eg.globals.DLNA.httpd = SocketServer.TCPServer(("", prt), eg.globals.DLNA.Handler)
eg.globals.DLNA.httpd.serve_forever()
threading.Thread(target=run, args=(PORT,)).start()