Spaces:
Build error
Build error
| import gradio as gr | |
| from gradio_rangeslider import RangeSlider | |
| import numpy as np | |
| from scipy.spatial import KDTree | |
| import pandas as pd | |
| from dysts.base import DynSys | |
| from dysts.flows import Lorenz, Chua | |
| import io | |
| import matplotlib.pyplot as plt | |
| from PIL import Image | |
| def fig2img(fig): | |
| """Convert a Matplotlib figure to a PIL Image and return it""" | |
| buf = io.BytesIO() | |
| fig.savefig(buf) | |
| buf.seek(0) | |
| img = Image.open(buf) | |
| return img | |
| """ ----------------------------------------------------------------------------------------- | |
| ROBOTIC ARM SYSTEM | |
| ----------------------------------------------------------------------------------------- """ | |
| class RoboticArm(object): | |
| """A simple robotic arm. | |
| Receives an array of angles as a motor commands. | |
| Return the position of the tip of the arm as a 2-tuple. | |
| """ | |
| def __init__(self, dim=7, limit=150, D=0.05): | |
| """`dim` is the number of joints, which are able to move in (-limit, +limit)""" | |
| self.dim, self.limit, self.D = dim, limit, D | |
| self.trajectory = np.zeros((dim+1, 2)) # hold the last executed posture (x, y position of all joints) | |
| def execute(self, angles, return_img=True): | |
| """Return the position of the end effector. Accepts values in degrees.""" | |
| u, v, sum_a, length = 0, 0, 0, 1.0/len(angles) | |
| self.trajectory[0] = np.array([u, v]) | |
| for i, a in enumerate(angles): | |
| sum_a += np.radians(a) | |
| u, v = u + length*np.sin(sum_a), v + length*np.cos(sum_a) # at zero pose, the tip is at x=0,y=1. | |
| self.trajectory[i+1] = np.array([u, v]) | |
| if return_img: | |
| fig = plt.figure() # save discovery as image for display | |
| plt.plot(self.trajectory[:, 0], self.trajectory[..., 1].T, '-o', color="gray") | |
| plt.xlim([-1.0, 1.0]) | |
| plt.ylim([-1.0, 1.0]) | |
| img = fig2img(fig) | |
| plt.close() | |
| else: | |
| img = None | |
| return np.array([u, v]), img | |
| def execute_random(self, N): | |
| """Random trajectories in parallel""" | |
| def random_params(self): | |
| """Return a random (and legal) motor command.""" | |
| return np.random.uniform(-self.limit, self.limit, size=(self.dim)) | |
| def perturb_params(self, params): | |
| """With a range of ±150°, creates a perturbation of ±15° (5% of 300° in each direction).""" | |
| min_perturb = np.minimum(np.repeat(self.limit, self.dim), params + 2*self.D*self.limit) # create a random perturbation inside the legal values. | |
| max_perturb = np.maximum(np.repeat(-self.limit, self.dim), params - 2*self.D*self.limit) | |
| new_params = np.random.uniform(min_perturb, max_perturb) | |
| return new_params | |
| def random_goal(self, reached_goals=None): | |
| """Goal babbling strategy""" | |
| goal = np.random.uniform(-1, 1, size=(2, )) # goal as random point in [-1, 1]x[-1, 1]. | |
| return goal | |
| """ ----------------------------------------------------------------------------------------- | |
| LORENZ ATTRACTOR SYSTEM | |
| ----------------------------------------------------------------------------------------- """ | |
| class ChaoticAttractor(DynSys): | |
| """The Lorenz Attractor. | |
| """ | |
| def __init__(self, name, params_range, T, D=0.05): | |
| super().__init__() | |
| if name == "Lorenz": | |
| self._rhs = Lorenz._rhs | |
| self._jac = Lorenz._jac | |
| self.pts_per_period = 100 | |
| elif name == "Chua": | |
| self._rhs = Chua._rhs | |
| self._jac = Chua._jac | |
| self.pts_per_period = 100 | |
| else: | |
| raise NotImplementedError | |
| params_range["ic"] = [np.repeat(params_range["ic"][0], self.embedding_dimension), np.repeat(params_range["ic"][1], self.embedding_dimension)] | |
| self.params_range = params_range | |
| self.T, self.D = T, D | |
| self.trajectory = np.zeros((T, self.embedding_dimension)) # hold the executed trajectory (x, y, z positions in time) | |
| def execute(self, params, return_img=True): | |
| """Return the position in 3D space at time T""" | |
| self.params = {} | |
| for k, v in params.items(): | |
| setattr(self, k, v) | |
| if k != "ic": | |
| self.params[k] = v | |
| self.trajectory = self.make_trajectory(self.T, pts_per_period=self.pts_per_period) | |
| if return_img: | |
| fig = plt.figure() # save discovery as image for display | |
| plt.plot(self.trajectory[:, 0], self.trajectory[..., 1].T) | |
| img = fig2img(fig) | |
| plt.close() | |
| else: | |
| img = None | |
| return self.trajectory[-1], img | |
| def random_params(self): | |
| """Return a random (and legal) motor command.""" | |
| params = {} | |
| for k, v in self.params_range.items(): | |
| params[k] = np.random.uniform(v[0], v[1]) | |
| return params | |
| def perturb_params(self, params): | |
| """Creates a perturbation of parameters within allowed range.""" | |
| new_params = {} | |
| for k, v in params.items(): | |
| param_range = self.params_range[k][1] - self.params_range[k][0] | |
| min_perturb = np.minimum(self.params_range[k][1], v + self.D*param_range) | |
| max_perturb = np.maximum(self.params_range[k][0], v - self.D*param_range) | |
| new_params[k] = np.random.uniform(min_perturb, max_perturb) | |
| return new_params | |
| def random_goal(self, reached_goals=None): | |
| """Goal babbling strategy""" | |
| goal = np.random.uniform(np.stack(reached_goals).min(0), np.stack(reached_goals).max(0)) # goal as random point in hypercube of currently reached goals. | |
| return goal | |
| """ ----------------------------------------------------------------------------------------- | |
| RANDOM EXPLORATION IN INPUT PARAMETER SPACE | |
| ----------------------------------------------------------------------------------------- """ | |
| def explore_rip(system, n): | |
| """Explore the system using random input parameters (RIP) during n steps.""" | |
| history = {"params": [], "effects": []} | |
| for t in range(n): | |
| params = system.random_params() # choosing a random parameter command; does not depend on history. | |
| effect, _ = system.execute(params, return_img=False) # executing the parameter command and observing its effect. | |
| history["params"].append(params) # updating history. | |
| history["effects"].append(effect) | |
| return history | |
| """ ----------------------------------------------------------------------------------------- | |
| RANDOM EXPLORATION IN EFFECT (GOAL) SPACE | |
| ----------------------------------------------------------------------------------------- """ | |
| def inverse(system, goal, history): | |
| """Transform a goal into a parameter command""" | |
| tree = KDTree(history["effects"]) # find the nearest neighbor of the goal. | |
| d, i = tree.query(goal, k=1) | |
| nn_params = history["params"][i] | |
| return system.perturb_params(nn_params) | |
| def goal_babbling(system, history): | |
| goal = system.random_goal(history["effects"]) | |
| return inverse(system, goal, history) | |
| def explore_rge(system, n): | |
| """Explore the arm using random goal exploration (RGE) during n steps.""" | |
| history = {"params": [], "effects": [], "images": []} | |
| for t in range(n): | |
| if t < 0.2*n: # random parameter exploration for the first 20% steps. | |
| params = system.random_params() | |
| else: # then random goal exploration. | |
| params = goal_babbling(system, history) # goal exploration depends on history. | |
| effect, img = system.execute(params) # executing the motor command and observing its effect. | |
| history["params"].append(params) # updating history. | |
| history["effects"].append(effect) | |
| history["images"].append((img, f"Run {t}")) | |
| return history | |
| """ ----------------------------------------------------------------------------------------- | |
| MAIN | |
| ----------------------------------------------------------------------------------------- """ | |
| def explore_robotic_arm(seed, N, dim, limit): | |
| system = RoboticArm(dim, limit) | |
| return explore(system, seed, N) | |
| def explore_chaotic_attractor(seed, N, T, name, | |
| lorenz_beta_range, lorenz_rho_range, lorenz_sigma_range, lorenz_ic_range, | |
| chua_alpha_range, chua_beta_range, chua_m0_range, chua_m1_range, chua_ic_range): | |
| if name == "Lorenz": | |
| params_range = {"beta": lorenz_beta_range, "rho": lorenz_rho_range, "sigma": lorenz_sigma_range, "ic": lorenz_ic_range} | |
| elif name == "Chua": | |
| params_range = {"alpha": chua_alpha_range, "beta": chua_beta_range, "m0": chua_m0_range, "m1": chua_m1_range, "ic": chua_ic_range} | |
| else: | |
| raise NotImplementedError | |
| system = ChaoticAttractor(name, params_range, T) | |
| return explore(system, seed, N) | |
| def explore(system, seed, N): | |
| # Random Exploration in Input Space | |
| np.random.seed(seed) | |
| history_rip = explore_rip(system, N) | |
| effects_rip = np.stack(history_rip["effects"]) | |
| # Goal Exploration in Output Space | |
| np.random.seed(seed) | |
| history_rge = explore_rge(system, N) | |
| effects_rge = np.stack(history_rge["effects"]) | |
| df_rip = pd.DataFrame({"x": effects_rip[:, 0], "y": effects_rip[:, 1]}) | |
| df_rge = pd.DataFrame({"x": effects_rge[:, 0], "y": effects_rge[:, 1]}) | |
| gallery_rge = history_rge["images"] | |
| return df_rip , df_rge, gallery_rge | |
| def change_attractor(name): | |
| if name == "Chua": | |
| return gr.update(visible=True), gr.update(visible=True), gr.update(visible=True), gr.update(visible=True), gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False) | |
| elif name == "Lorenz": | |
| return gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=True), gr.update(visible=True), gr.update(visible=True), gr.update(visible=True) | |
| else: | |
| raise NotImplementedError | |
| with gr.Blocks() as demo: | |
| with gr.Tab("Robotic Arm Exploration"): | |
| with gr.Row(): | |
| gr.Markdown("# Goal-based Exploration of a Robotic Arm") | |
| with gr.Row(): | |
| gr.Markdown( | |
| """This example is based on Fabien Benureau goal babbling tutorial, which can also be run in [this notebook](https://fabien.benureau.com/recode/benureau2015_gb/benureau2015_gb.html) | |
| """ | |
| ) | |
| with gr.Row(): | |
| seed = gr.Number(value=0, minimum=0, maximum=1000, precision=0, label="Random seed") | |
| N = gr.Number(value=300, minimum=10, maximum=5000, precision=0, label="Number of exploration steps") | |
| dim = gr.Number(value=7, minimum=2, maximum=100, precision=0, label="Number of joints") | |
| limit = gr.Slider(value=150, minimum=20, maximum=180,step=10, label="Joint Angle Limit") | |
| with gr.Row(): | |
| examples = gr.Examples([[300, 2, 150], [300, 7, 150], [300, 20, 150], [300, 100, 150]], | |
| inputs=[N, dim, limit]) | |
| with gr.Row(): | |
| explo_btn = gr.Button("Launch Exploration! ", variant="primary") | |
| with gr.Row(): | |
| rip_plot = gr.ScatterPlot(pd.DataFrame({"x": [], "y": []}), x="x", y="y", title="Reached Positions with Random Input Parameters", | |
| x_lim=[-1.1, 1.1], y_lim=[-1.1, 1.1], | |
| width=400, height=400, label="Random Input Parameters") | |
| rge_plot = gr.ScatterPlot(pd.DataFrame({"x": [], "y": []}), x="x", y="y", title="Reached Positions with Random Goal Exploration", | |
| x_lim=[-1.1, 1.1], y_lim=[-1.1, 1.1], | |
| width=400, height=400, label="Random Goal Exploration") | |
| with gr.Row(): | |
| gallery = gr.Gallery(object_fit="contain", height="600px", columns=8, label="IMGEP Discoveries") | |
| explo_btn.click(explore_robotic_arm, inputs=[seed, N, dim, limit], outputs=[rip_plot, rge_plot, gallery]) | |
| with gr.Tab("Chaotic Attractors Exploration"): | |
| with gr.Row(): | |
| gr.Markdown("# Goal-based Exploration of a Chaotic System") | |
| with gr.Row(): | |
| gr.Markdown( | |
| """This example uses William Gilpin's `dysts` library, which contains many examples of chaotic systems (see [github repo](https://github.com/williamgilpin/dysts/)) | |
| """ | |
| ) | |
| with gr.Row(): | |
| seed2 = gr.Number(value=0, minimum=0, maximum=1000, precision=0, label="Random seed") | |
| N2 = gr.Number(value=100, minimum=10, maximum=1000, precision=0, label="Number of exploration steps") | |
| T = gr.Number(value=100, minimum=10, maximum=200, precision=0, label="Number of trajectory timesteps") | |
| with gr.Row(): | |
| gr.Markdown("Select Chaotic Attractor and Parameter Range") | |
| with gr.Row(): | |
| name = gr.Dropdown(value="Chua", choices=["Chua", "Lorenz"], label="Chaotic Attractor") | |
| with gr.Row(): | |
| # Chua Attractor | |
| chua_alpha_range = RangeSlider(value=(13, 18), minimum=10, maximum=20, step=0.1, label="Alpha Range", visible=True) | |
| chua_beta_range = RangeSlider(value=(25, 30), minimum=20, maximum=40, step=0.1, label="Beta Range", visible=True) | |
| chua_m0_range = RangeSlider(value=(-1.3, -1), minimum=-1.5, maximum=-1, step=0.001, label="m0 Range", visible=True) | |
| chua_m1_range = RangeSlider(value=(-0.85, -0.55), minimum=-0.9, maximum=-0.5, step=0.001, label="m1 Range", visible=True) | |
| chua_ic_range = RangeSlider(value=(-0.5, 0.5), minimum=-1, maximum=1, step=1, label="Initial Conditions Range", visible=True) | |
| # Lorenz Attractor | |
| lorenz_beta_range = RangeSlider(value=(2, 6), minimum=2, maximum=6, step=0.001, label="Beta Range", visible=False) | |
| lorenz_rho_range = RangeSlider(value=(20, 50), minimum=20, maximum=50, step=10, label="Rho Range", visible=False) | |
| lorenz_sigma_range = RangeSlider(value=(6, 35), minimum=6, maximum=35, step=1, label="Sigma Range", visible=False) | |
| lorenz_ic_range = RangeSlider(value=(-20, 20), minimum=-20, maximum=20, step=1, label="Initial Conditions Range", visible=False) | |
| # Rossler Attractor | |
| with gr.Row(): | |
| explo_btn2 = gr.Button("Launch Exploration! ", variant="primary") | |
| with gr.Row(): | |
| rip_plot2 = gr.ScatterPlot(pd.DataFrame({"x": [], "y": []}), x="x", y="y", title="Reached Positions with Random Input Parameters", | |
| width=400, height=400, label="Random Input Parameters") | |
| rge_plot2 = gr.ScatterPlot(pd.DataFrame({"x": [], "y": []}), x="x", y="y", title="Reached Positions with Random Goal Exploration", | |
| width=400, height=400, label="Random Goal Exploration") | |
| with gr.Row(): | |
| gallery2 = gr.Gallery(object_fit="contain", height="600px", columns=8, label="IMGEP Discoveries") | |
| name.input(change_attractor, inputs=name, queue=False, show_progress="hidden", | |
| outputs=[lorenz_beta_range, lorenz_rho_range, lorenz_sigma_range, lorenz_ic_range, | |
| chua_alpha_range, chua_beta_range, chua_m0_range, chua_m1_range, chua_ic_range]) | |
| explo_btn2.click(explore_chaotic_attractor, | |
| inputs=[seed2, N2, T, name, | |
| lorenz_beta_range, lorenz_rho_range, lorenz_sigma_range, lorenz_ic_range, | |
| chua_alpha_range, chua_beta_range, chua_m0_range, chua_m1_range, chua_ic_range], | |
| outputs=[rip_plot2, rge_plot2, gallery2]) | |
| demo.launch() | |