Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

306

307

308

309

310

311

312

313

314

315

316

317

318

319

320

321

322

323

324

325

326

327

328

329

330

331

332

333

334

335

336

337

338

339

340

341

342

343

344

345

346

347

348

349

350

351

352

353

354

355

356

357

358

359

360

from __future__ import absolute_import, division, print_function 

 

import time 

import json 

import threading 

from collections import defaultdict, namedtuple, Iterable 

import numpy as np 

from lcm import LCM 

from robotlocomotion import viewer2_comms_t 

from director.thirdparty import transformations 

 

 

def to_lcm(data): 

    msg = viewer2_comms_t() 

    msg.utime = data["utime"] 

    msg.format = "treeviewer_json" 

    msg.format_version_major = 1 

    msg.format_version_minor = 0 

    msg.data = json.dumps(data) 

    msg.num_bytes = len(msg.data) 

    return msg 

 

 

def serialize_transform(tform): 

    return { 

        "translation": list(transformations.translation_from_matrix(tform)), 

        "quaternion": list(transformations.quaternion_from_matrix(tform)) 

    } 

 

 

class GeometryData(object): 

    __slots__ = ["geometry", "color", "transform"] 

 

    def __init__(self, geometry, color=(1., 1., 1., 1.), transform=np.eye(4)): 

        self.geometry = geometry 

        self.color = color 

        self.transform = transform 

 

    def serialize(self): 

        params = self.geometry.serialize() 

        params["color"] = list(self.color) 

        params["transform"] = serialize_transform(self.transform) 

        return params 

 

 

class BaseGeometry(object): 

    def serialize(self): 

        raise NotImplementedError() 

 

 

class Box(BaseGeometry, namedtuple("Box", ["lengths"])): 

    def serialize(self): 

        return { 

            "type": "box", 

            "lengths": list(self.lengths) 

        } 

 

 

class Sphere(BaseGeometry, namedtuple("Sphere", ["radius"])): 

    def serialize(self): 

        return { 

            "type": "sphere", 

            "radius": self.radius 

        } 

 

 

class Ellipsoid(BaseGeometry, namedtuple("Ellipsoid", ["radii"])): 

    def serialize(self): 

        return { 

            "type": "ellipsoid", 

            "radii": list(self.radii) 

        } 

 

 

class Cylinder(BaseGeometry, namedtuple("Cylinder", ["length", "radius"])): 

    def serialize(self): 

        return { 

            "type": "cylinder", 

            "length": self.length, 

            "radius": self.radius 

        } 

 

 

class Triad(BaseGeometry, namedtuple("Triad", [])): 

    def serialize(self): 

        return { 

            "type": "triad" 

        } 

 

 

class LazyTree(object): 

    __slots__ = ["geometries", "transform", "children"] 

 

    def __init__(self, geometries=None, transform=np.eye(4)): 

        if geometries is None: 

            geometries = [] 

        self.geometries = geometries 

        self.transform = transform 

        self.children = defaultdict(lambda: LazyTree()) 

 

    def __getitem__(self, item): 

        return self.children[item] 

 

    def getdescendant(self, path): 

        t = self 

        for p in path: 

            t = t[p] 

        return t 

 

    def descendants(self, prefix=tuple()): 

        result = [] 

        for (key, val) in self.children.items(): 

            childpath = prefix + (key,) 

            result.append(childpath) 

            result.extend(val.descendants(childpath)) 

        return result 

 

 

class CommandQueue(object): 

    def __init__(self): 

        self.settransform = set() 

        self.setgeometry = set() 

        self.delete = set() 

 

    def isempty(self): 

        return not (self.settransform or self.setgeometry or self.delete) 

 

    def empty(self): 

        self.settransform = set() 

        self.setgeometry = set() 

        self.delete = set() 

 

 

class Visualizer(object): 

    """ 

    A Visualizer is a lightweight object that contains a CoreVisualizer and a 

    path. The CoreVisualizer does all of the work of storing geometries and 

    publishing LCM messages. By storing the path in the Visualizer instance, 

    we make it easy to do things like store or pass a Visualizer that draws to 

    a sub-part of the viewer tree. 

    Many Visualizer objects can all share the same CoreVisualizer. 

    """ 

    __slots__ = ["core", "path"] 

 

    def __init__(self, path=None, lcm=None, core=None): 

        if core is None: 

            core = CoreVisualizer(lcm) 

        if path is None: 

            path = tuple() 

        else: 

            if isinstance(path, str): 

                path = tuple(path.split("/")) 

                if not path[0]: 

                    path = tuple([p for p in path if p]) 

        self.core = core 

        self.path = path 

 

    def setgeometry(self, geomdata): 

        """ 

        Set the geometries at this visualizer's path to the given 

        geomdata (replacing whatever was there before). 

        geomdata can be any one of: 

          * a single BaseGeometry 

          * a single GeometryData 

          * a collection of any combinations of BaseGeometry and GeometryData 

        """ 

        self.core.setgeometry(self.path, geomdata) 

        return self 

 

    def settransform(self, tform): 

        """ 

        Set the transform for this visualizer's path (and, implicitly, 

        any descendants of that path). 

        tform should be a 4x4 numpy array representing a homogeneous transform 

        """ 

        self.core.settransform(self.path, tform) 

 

    def delete(self): 

        """ 

        Delete the geometry at this visualizer's path. 

        """ 

        self.core.delete(self.path) 

 

    def __getitem__(self, path): 

        """ 

        Indexing into a visualizer returns a new visualizer with the given 

        path appended to this visualizer's path. 

        """ 

        return Visualizer(path=self.path + (path,), 

                          lcm=self.core.lcm, 

                          core=self.core) 

 

    def start_handler(self): 

        """ 

        Start a Python thread that will subscribe to messages from the remote 

        viewer and handle those responses. This enables automatic reloading of 

        geometry into the viewer if, for example, the viewer is restarted 

        later. 

        """ 

        self.core.start_handler() 

 

 

class CoreVisualizer(object): 

    def __init__(self, lcm=None): 

        if lcm is None: 

            lcm = LCM() 

        self.lcm = lcm 

        self.tree = LazyTree() 

        self.queue = CommandQueue() 

        self.publish_immediately = True 

        self.lcm.subscribe("DIRECTOR_TREE_VIEWER_RESPONSE", 

                           self._handle_response) 

        self.handler_thread = None 

 

    def _handler_loop(self): 

        while True: 

            self.lcm.handle() 

 

    def start_handler(self): 

        if self.handler_thread is not None: 

            return 

        self.handler_thread = threading.Thread( 

            target=self._handler_loop) 

        self.handler_thread.daemon = True 

        self.handler_thread.start() 

 

    def _handle_response(self, channel, msgdata): 

        msg = viewer2_comms_t.decode(msgdata) 

        data = json.loads(msg.data) 

        if data["status"] == 0: 

            pass 

        elif data["status"] == 1: 

            for path in self.tree.descendants(): 

                self.queue.setgeometry.add(path) 

                self.queue.settransform.add(path) 

        else: 

            raise ValueError( 

                "Unhandled response from viewer: {}".format(msg.data)) 

 

    def setgeometry(self, path, geomdata): 

        if isinstance(geomdata, BaseGeometry): 

            self._load(path, [GeometryData(geomdata)]) 

        elif isinstance(geomdata, Iterable): 

            self._load(path, geomdata) 

        else: 

            self._load(path, [geomdata]) 

 

    def _load(self, path, geoms): 

        converted_geom_data = [] 

        for geom in geoms: 

            if isinstance(geom, GeometryData): 

                converted_geom_data.append(geom) 

            else: 

                converted_geom_data.append(GeometryData(geom)) 

        self.tree.getdescendant(path).geometries = converted_geom_data 

        self.queue.setgeometry.add(path) 

        self._maybe_publish() 

 

    def settransform(self, path, tform): 

        self.tree.getdescendant(path).transform = tform 

        self.queue.settransform.add(path) 

        self._maybe_publish() 

 

    def delete(self, path): 

        if not path: 

            self.tree = LazyTree() 

        else: 

            t = self.tree.getdescendant(path[:-1]) 

            del t.children[path[-1]] 

        self.queue.delete.add(path) 

        self._maybe_publish() 

 

    def _maybe_publish(self): 

        if self.publish_immediately: 

            self.publish() 

 

    def publish(self): 

        if not self.queue.isempty(): 

            data = self.serialize_queue() 

            msg = to_lcm(data) 

            self.lcm.publish("DIRECTOR_TREE_VIEWER_REQUEST", msg.encode()) 

            self.queue.empty() 

 

    def serialize_queue(self): 

        delete = [] 

        setgeometry = [] 

        settransform = [] 

        for path in self.queue.delete: 

            delete.append({"path": path}) 

        for path in self.queue.setgeometry: 

            geoms = self.tree.getdescendant(path).geometries 

            if geoms: 

                setgeometry.append({ 

                    "path": path, 

                    "geometries": [geom.serialize() for geom in geoms] 

                }) 

        for path in self.queue.settransform: 

            settransform.append({ 

                "path": path, 

                "transform": serialize_transform( 

                    self.tree.getdescendant(path).transform) 

            }) 

        return { 

            "utime": int(time.time() * 1e6), 

            "delete": delete, 

            "setgeometry": setgeometry, 

            "settransform": settransform 

        } 

 

 

if __name__ == '__main__': 

    # We can provide an initial path if we want 

    vis = Visualizer(path="/root/folder1") 

 

    # Start a thread to handle responses from the viewer. Doing this enables 

    # the automatic reloading of missing geometry if the viewer is restarted. 

    vis.start_handler() 

 

    vis["boxes"].setgeometry( 

        [GeometryData(Box([1, 1, 1]), 

         color=np.random.rand(4), 

         transform=transformations.translation_matrix([x, -2, 0])) 

         for x in range(10)]) 

 

    # Index into the visualizer to get a sub-tree. vis.__getitem__ is lazily 

    # implemented, so these sub-visualizers come into being as soon as they're 

    # asked for 

    vis = vis["group1"] 

 

    box_vis = vis["box"] 

    sphere_vis = vis["sphere"] 

 

    box = Box([1, 1, 1]) 

    geom = GeometryData(box, color=[0, 1, 0, 0.5]) 

    box_vis.setgeometry(geom) 

 

    sphere_vis.setgeometry(Sphere(0.5)) 

    sphere_vis.settransform(transformations.translation_matrix([1, 0, 0])) 

 

    vis["test"].setgeometry(Triad()) 

    vis["test"].settransform(transformations.concatenate_matrices( 

        transformations.rotation_matrix(1.0, [0, 0, 1]), 

        transformations.translation_matrix([-1, 0, 1]))) 

 

    vis["triad"].setgeometry(Triad()) 

 

    # Setting the geometry preserves the transform at that path. 

    # Call settransform(np.eye(4)) if you want to clear the transform. 

    vis["test"].setgeometry(Triad()) 

 

    # bug, the sphere is loaded and replaces the previous 

    # geometry but it is not drawn with the correct color mode 

    vis["test"].setgeometry(Sphere(0.5)) 

 

 

    for theta in np.linspace(0, 2 * np.pi, 100): 

        vis.settransform(transformations.rotation_matrix(theta, [0, 0, 1])) 

        time.sleep(0.01) 

 

    #vis.delete()