Mbed LS
linux.py
Go to the documentation of this file.
1"""
2mbed SDK
3Copyright (c) 2011-2015 ARM Limited
4
5Licensed under the Apache License, Version 2.0 (the "License");
6you may not use this file except in compliance with the License.
7You may obtain a copy of the License at
8
9 http://www.apache.org/licenses/LICENSE-2.0
10
11Unless required by applicable law or agreed to in writing, software
12distributed under the License is distributed on an "AS IS" BASIS,
13WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14See the License for the specific language governing permissions and
15limitations under the License.
16"""
17
18import re
19from os.path import join, isdir, dirname, abspath
20import os
21
22from .lstools_base import MbedLsToolsBase
23
24import logging
25logger = logging.getLogger("mbedls.lstools_linux")
26logger.addHandler(logging.NullHandler())
27del logging
28
29SYSFS_BLOCK_DEVICE_PATH = '/sys/class/block'
30
31def _readlink(link):
32 content = os.readlink(link)
33 if content.startswith(".."):
34 return abspath(join(dirname(link), content))
35 else:
36 return content
37
38
40 """ mbed-enabled platform for Linux with udev
41 """
42 def __init__(self, **kwargs):
43 """! ctor
44 """
45 MbedLsToolsBase.__init__(self, **kwargs)
46 self.nlp = re.compile(
47 r'(pci|usb)-[0-9a-zA-Z:_-]*_(?P<usbid>[0-9a-zA-Z]*)-.*$')
48 self.mmp = re.compile(
49 r'(?P<dev>(/[^/ ]*)+) on (?P<dir>(/[^/ ]*)+) ')
50 self.udp = re.compile(r'^[0-9]+-[0-9]+[^:\s]*$')
51
52 def find_candidates(self):
53 disk_ids = self._dev_by_id('disk')
54 serial_ids = self._dev_by_id('serial')
55 mount_ids = dict(self._fat_mounts())
56 usb_info = self._sysfs_block_devices(disk_ids.values())
57 logger.debug("Mount mapping %r", mount_ids)
58
59 return [
60 {
61 'mount_point' : mount_ids.get(disk_dev),
62 'serial_port' : serial_ids.get(disk_uuid),
63 'target_id_usb_id' : disk_uuid,
64 'vendor_id': usb_info.get(disk_dev, {}).get('vendor_id'),
65 'product_id': usb_info.get(disk_dev, {}).get('product_id')
66 } for disk_uuid, disk_dev in disk_ids.items()
67 ]
68
69 def _dev_by_id(self, device_type):
70 """! Get a dict, USBID -> device, for a device class
71 @param device_type The type of devices to search. For exmaple, "serial"
72 looks for all serial devices connected to this computer
73 @return A dict: Device USBID -> device file in /dev
74 """
75 dir = join("/dev", device_type, "by-id")
76 if isdir(dir):
77 to_ret = dict(self._hex_ids([join(dir, f) for f in os.listdir(dir)]))
78 logger.debug("Found %s devices by id %r", device_type, to_ret)
79 return to_ret
80 else:
81 logger.error("Could not get %s devices by id. "
82 "This could be because your Linux distribution "
83 "does not use udev, or does not create /dev/%s/by-id "
84 "symlinks. Please submit an issue to github.com/"
85 "armmbed/mbed-ls.", device_type, device_type)
86 return {}
87
88 def _fat_mounts(self):
89 """! Lists mounted devices with vfat file system (potential mbeds)
90 @result Returns list of all mounted vfat devices
91 @details Uses Linux shell command: 'mount'
92 """
93 _stdout, _, retval = self._run_cli_process('mount')
94 if not retval:
95 for line in _stdout.splitlines():
96 if b'vfat' in line:
97 match = self.mmp.search(line.decode('utf-8'))
98 if match:
99 yield match.group("dev"), match.group("dir")
100
101 def _hex_ids(self, dev_list):
102 """! Build a USBID map for a device list
103 @param disk_list List of disks in a system with USBID decoration
104 @return Returns map USBID -> device file in /dev
105 @details Uses regular expressions to get a USBID (TargeTIDs) a "by-id"
106 symbolic link
107 """
108 logger.debug("Converting device list %r", dev_list)
109 for dl in dev_list:
110 match = self.nlp.search(dl)
111 if match:
112 yield match.group("usbid"), _readlink(dl)
113
114 def _sysfs_block_devices(self, block_devices):
115 device_names = { os.path.basename(d): d for d in block_devices }
116 sysfs_block_devices = set(os.listdir(SYSFS_BLOCK_DEVICE_PATH))
117 common_device_names = sysfs_block_devices.intersection(set(device_names.keys()))
118 result = {}
119
120 for common_device_name in common_device_names:
121 sysfs_path = os.path.join(SYSFS_BLOCK_DEVICE_PATH, common_device_name)
122 full_sysfs_path = os.readlink(sysfs_path)
123 path_parts = full_sysfs_path.split('/')
124
125 end_index = None
126 for index, part in enumerate(path_parts):
127 if self.udp.search(part):
128 end_index = index
129
130 if end_index == None:
131 logger.debug('Did not find suitable usb folder for usb info: %s', full_sysfs_path)
132 continue
133
134 usb_info_rel_path = path_parts[:end_index + 1]
135 usb_info_path = os.path.join(SYSFS_BLOCK_DEVICE_PATH, os.sep.join(usb_info_rel_path))
136
137 vendor_id = None
138 product_id = None
139
140 vendor_id_file_paths = os.path.join(usb_info_path, 'idVendor')
141 product_id_file_paths = os.path.join(usb_info_path, 'idProduct')
142
143 try:
144 with open(vendor_id_file_paths, 'r') as vendor_file:
145 vendor_id = vendor_file.read().strip()
146 except OSError as e:
147 logger.debug('Failed to read vendor id file %s weith error:', vendor_id_file_paths, e)
148
149 try:
150 with open(product_id_file_paths, 'r') as product_file:
151 product_id = product_file.read().strip()
152 except OSError as e:
153 logger.debug('Failed to read product id file %s weith error:', product_id_file_paths, e)
154
155 result[device_names[common_device_name]] = {
156 'vendor_id': vendor_id,
157 'product_id': product_id
158 }
159
160 return result
def _sysfs_block_devices(self, block_devices)
Definition: linux.py:114
def _dev_by_id(self, device_type)
Definition: linux.py:69
def __init__(self, **kwargs)
ctor
Definition: linux.py:42