// DOC 01 / 07
AUTONOMOUS
NAVIGATION
Panduan komprehensif navigasi otonom: LiDAR, ultrasonik, IR, A*, DWA, VFH, AMCL, SLAM, Behavior Trees, Sensor Fusion EKF — source code Python & Arduino C.
01
Overview
ARSITEKTUR NAVIGASI
Navigasi otonom memecahkan tiga masalah secara bersamaan: lokalisasi (robot di mana?), pemetaan (lingkungan seperti apa?), dan perencanaan jalur (bagaimana ke tujuan?).
📍
Localization
Estimasi pose (x,y,θ) menggunakan AMCL particle filter atau EKF.
🗺️
Mapping
Occupancy grid dari LiDAR, update log-odds per cell via Bresenham ray casting.
🎯
Path Planning
A* untuk global path, DWA/VFH untuk local reactive navigation.
⚠️
Obstacle Avoidance
Reaktif: sensor threshold + costmap inflation + real-time replanning.
Navigation Stack Architecture
# ROS2 Nav2 Data Flow:
# GOAL -> Global Planner (A*/NavFn) -> Global Path
# | |
# SENSORS -> Costmap 2D -> Local Planner (DWA/TEB) -> /cmd_vel
# LiDAR static+odom real-time replanning
# |
# AMCL Localizer <-- map_server (pgm/yaml)
# |
# /odom (encoder odometry)Differential Drive Kinematics
ΔsL = r·ΔθL_encoder ΔsR = r·ΔθR_encoder
Δs = (ΔsR + ΔsL) / 2 linear displacement
Δθ = (ΔsR - ΔsL) / L angular (L = wheel base)
x += Δs·cos(θ + Δθ/2)
y += Δs·sin(θ + Δθ/2)
θ += Δθ
02
Hardware
SENSOR SUITE
LiDAR Comparison
| Model | Range | FOV | Hz | Interface | Est. Harga |
|---|---|---|---|---|---|
| RPLiDAR A1 | 12m | 360° | 5.5 | UART | ~$99 |
| RPLiDAR A3 | 25m | 360° | 15 | UART | ~$299 |
| YDLiDAR X4 | 10m | 360° | 6 | UART | ~$60 |
| SICK TiM571 | 25m | 270° | 15 | Ethernet | ~$800 |
# Python: Baca RPLiDAR, deteksi obstacle from rplidar import RPLidar import numpy as np lidar = RPLidar('/dev/ttyUSB0') scan = np.zeros(360) for s in lidar.iter_scans(): for (_, ang, dist) in s: idx = int(ang) % 360 if dist > 0: scan[idx] = dist/1000.0 front = np.concatenate([scan[330:], scan[:30]]) valid = front[front > 0.05] if len(valid) > 0 and valid.min() < 0.5: print(f"OBSTACLE: {valid.min():.2f}m") break
HC-SR04 Ultrasonik — Arduino
Jarak (cm) = (PulseWidth_us / 2) / 29.1
Atau: Jarak = SpeedOfSound(343 m/s) × t/2
Range: 2-400cm, Akurasi: ±3mm, Dead zone: <2cm
// Arduino: 5-sensor ultrasonik array dengan NewPing #include <NewPing.h> #define N 5 const int T[N]={22,24,26,28,30}, E[N]={23,25,27,29,31}; NewPing sonar[N]={NewPing(T[0],E[0],200),NewPing(T[1],E[1],200), NewPing(T[2],E[2],200),NewPing(T[3],E[3],200),NewPing(T[4],E[4],200)}; float d[N]; // distances in cm void readAll() { for(int i=0;i<N;i++) { unsigned int us = sonar[i].ping_median(3); d[i] = (us==0) ? 200 : us/US_ROUNDTRIP_CM; } }
IR Sharp GP2Y0A21 — Analog
// Arduino: Sharp IR jarak 10-80cm float readIR(int pin) { int raw = analogRead(pin); float v = raw * (5.0/1023.0); if(v < 0.4) return 80; // out of range return 27.728 * pow(v, -1.2045); }
03
Algoritma
OBSTACLE AVOIDANCE
VFH — Vector Field Histogram
1. Bangun polar histogram H(k) dari scan LiDAR:
H(k) = sum(m_ij^2 * (a - b*d_ij)) untuk tiap obstacle di sector k
2. Threshold binary: H_b(k) = 1 jika H(k) > tau_high (blocked)
3. Cari valley (run of free sectors) terlebar mendekati target direction
4. Steering = tengah valley terdekat ke target heading
import numpy as np def vfh_steering(scan_360, target_deg, alpha=5, tau_hi=3000): """scan_360: distances[0..359], target_deg: goal direction""" n = 360// alpha hist = np.zeros(n) a, b = 4, 0.4 for i, dist in enumerate(scan_360): if dist < 0.05 or dist > 5: continue k = i // alpha m = 1/(1+dist) hist[k] += m**2 * (a - b*dist) binary = (hist > tau_hi).astype(int) target_k = (target_deg // alpha) % n best_angle, best_width = target_deg, 0 in_v, vs, valleys = False, 0, [] ext = np.tile(binary, 3) for k in range(n*3): if not ext[k] and not in_v: vs=k; in_v=True elif ext[k] and in_v: w=k-vs; mid=(vs+k-1)//2%n if w>=3: valleys.append((mid,w)); in_v=False if valleys: best_k = min(valleys, key=lambda v: abs(v[0]-target_k))[0] best_angle = best_k * alpha return best_angle
Dynamic Window Approach (DWA)
Feasible Velocity Set = Vs ∩ Vd ∩ Vr
Vd = [v±a·dt] x [ω±α·dt] -- dynamic window (accel limits)
Vr = {v | v ≤ sqrt(2·d_obstacle·a_decel)} -- admissible
Score(v,ω) = α·heading + β·clearance + γ·velocity → maximize
import numpy as np def dwa_plan(x,y,yaw,v,w, gx,gy, obstacles, max_v=0.5,max_w=1.0,max_a=2.0,max_dw=3.2,dt=0.1,pt=2.0): best = (-999,0,0) vmin=max(0,v-max_a*dt); vmax=min(max_v,v+max_a*dt) wmin=max(-max_w,w-max_dw*dt); wmax=min(max_w,w+max_dw*dt) for tv in np.arange(vmin,vmax+0.02,0.02): for tw in np.arange(wmin,wmax+0.05,0.05): cx,cy,cth=x,y,yaw; md=999 for _ in np.arange(0,pt,dt): cx+=tv*np.cos(cth)*dt; cy+=tv*np.sin(cth)*dt; cth+=tw*dt for ox,oy in obstacles: d=np.hypot(cx-ox,cy-oy)-0.25 if d<0: md=-1; break md=min(md,d) if md<0: break if md<=0: continue ha=np.arctan2(gy-cy,gx-cx) sc=np.pi-abs(ha-cth)+1.2*md+0.5*tv/max_v if sc>best[0]: best=(sc,tv,tw) return best[1], best[2]
04
Algoritma
A* PATH PLANNING
A* dengan Inflated Costmap
f(n) = g(n) + h(n)
g(n) = biaya aktual dari start ke n
h(n) = octile heuristic: (dx+dy) + (sqrt(2)-2)*min(dx,dy)
Robot radius inflation: binary_dilation(grid, struct=disk(r_cells))
Ensures robot body tidak menabrak obstacle tepi
import heapq, numpy as np from scipy.ndimage import binary_dilation class AStarGrid: MOVES=[(-1,-1),(-1,0),(-1,1),(0,-1),(0,1),(1,-1),(1,0),(1,1)] COSTS=[1.414,1,1.414,1,1,1.414,1,1.414] def __init__(self, grid, res=0.05): self.g=grid; self.res=res; self.H,self.W=grid.shape def _h(self,a,b): dx,dy=abs(a[0]-b[0]),abs(a[1]-b[1]) return (dx+dy)+(1.414-2)*min(dx,dy) def plan(self,start,goal,r_cells=5): ig=binary_dilation(self.g,np.ones((2*r_cells+1,)*2)) oh=[(self._h(start,goal),0,start)]; cf={}; gs={start:0} while oh: _,g,c=heapq.heappop(oh) if c==goal: p=[]; while c in cf: p.append(c); c=cf[c] p.append(start); p.reverse() return [(r*self.res,c2*self.res) for r,c2 in p] for (dr,dc),cost in zip(self.MOVES,self.COSTS): nb=(c[0]+dr,c[1]+dc) if not(0<=nb[0]<self.H and 0<=nb[1]<self.W): continue if ig[nb[0],nb[1]]: continue ng=gs[c]+cost if ng<gs.get(nb,1e9): gs[nb]=ng; cf[nb]=c heapq.heappush(oh,(ng+self._h(nb,goal),ng,nb)) return []
05
Localization
AMCL PARTICLE FILTER
Particle Filter: N particles = N hypothesis pose robot
Predict: x_i = motion_model(x_i-1, u) + noise_motion
Update: w_i = p(z | x_i, map) -- likelihood sensor measurement
Normalize: sum(w_i) = 1
Resample: N new particles sampled proportional to weights
AMCL (Adaptive): KLD-sampling -- N adaptif berdasarkan uncertainty
import numpy as np from scipy.stats import norm class AMCL: def __init__(self,N=500): self.N=N; self.p=None; self.w=np.ones(N)/N self.alpha=[0.05,0.01,0.05,0.01] def init_uniform(self,x0,x1,y0,y1): self.p=np.column_stack([ np.random.uniform(x0,x1,self.N), np.random.uniform(y0,y1,self.N), np.random.uniform(-np.pi,np.pi,self.N)]) def predict(self,dr1,dt,dr2): a=self.alpha; N=self.N r1=dr1+np.random.normal(0,a[0]*abs(dr1)+a[1]*dt,N) tr=dt+np.random.normal(0,a[2]*dt+a[3]*(dr1+dr2),N) r2=dr2+np.random.normal(0,a[0]*abs(dr2)+a[1]*dt,N) self.p[:,0]+=tr*np.cos(self.p[:,2]+r1) self.p[:,1]+=tr*np.sin(self.p[:,2]+r1) self.p[:,2]+=r1+r2 def sensor_update(self,scan,expect_fn,sigma=0.2): for i in range(self.N): ex=expect_fn(self.p[i]) self.w[i]=np.exp(np.clip(np.sum(norm.logpdf(scan,ex,sigma)),-500,0)) self.w+=1e-300; self.w/=self.w.sum() def resample(self): idx=np.zeros(self.N,dtype=int); c=self.w[0]; i=0 u=np.random.uniform(0,1/self.N) for j in range(self.N): beta=u+j/self.N while beta>c: i+=1; c+=self.w[i] idx[j]=i self.p=self.p[idx]; self.w[:]=1/self.N def pose(self): x=np.average(self.p[:,0],weights=self.w) y=np.average(self.p[:,1],weights=self.w) t=np.arctan2(np.average(np.sin(self.p[:,2]),weights=self.w), np.average(np.cos(self.p[:,2]),weights=self.w)) return x,y,t
06
Mapping
OCCUPANCY GRID SLAM
Log-odds update:
l_t(m) = l_(t-1)(m) + l_sensor(m|z,x) - l_prior
l_occ = +0.85 (cell occupied) l_free = -0.85 (cell free) l_prior = 0
Probability: P = 1 - 1/(1 + exp(l))
Clamp: l in [-20, +20]
import numpy as np class OccGrid: def __init__(self,W=400,H=400,res=0.05): self.W=W;self.H=H;self.res=res self.lo=np.zeros((H,W)) def w2g(self,wx,wy): return int(wx/self.res+self.W/2),int(wy/self.res+self.H/2) def bres(self,x0,y0,x1,y1): pts=[];dx,dy=abs(x1-x0),abs(y1-y0) sx=1 if x1>x0 else -1;sy=1 if y1>y0 else -1;e=dx-dy while True: pts.append((x0,y0)) if x0==x1 and y0==y1: break e2=2*e if e2>-dy: e-=dy;x0+=sx if e2<dx: e+=dx;y0+=sy return pts def update(self,rx,ry,angles,dists): gx,gy=self.w2g(rx,ry) for a,d in zip(angles,dists): if d<0.05 or d>8: continue ex=rx+d*np.cos(a);ey=ry+d*np.sin(a) egx,egy=self.w2g(ex,ey) for px,py in self.bres(gx,gy,egx,egy)[:-1]: if 0<=px<self.W and 0<=py<self.H: self.lo[py,px]=np.clip(self.lo[py,px]-0.85,-20,20) if 0<=egx<self.W and 0<=egy<self.H: self.lo[egy,egx]=np.clip(self.lo[egy,egx]+0.85,-20,20) def prob(self): return 1-1/(1+np.exp(self.lo))
07
Arduino C
ARDUINO — ULTRASONIK NAVIGATION
// Arduino Mega: 5x ultrasonik + L298N state machine #include <NewPing.h> #define N 5 NewPing sonar[N]={NewPing(22,23,200),NewPing(24,25,200),NewPing(26,27,200), NewPing(28,29,200),NewPing(30,31,200)}; #define IN1 2; #define IN2 3; #define IN3 4; #define IN4 5; #define ENA 9; #define ENB 10; float d[N]; int turn_ms=0; enum {FWD,TL,TR,BACK,STOP} state=FWD; void mot(int L,int R){ digitalWrite(IN1,L>=0); digitalWrite(IN2,L<0); digitalWrite(IN3,R>=0); digitalWrite(IN4,R<0); analogWrite(ENA,constrain(abs(L),0,255)); analogWrite(ENB,constrain(abs(R),0,255));} void read(){ for(int i=0;i<N;i++){ unsigned int us=sonar[i].ping_median(3); d[i]=(us==0)?200:us/US_ROUNDTRIP_CM;}} void nav(){ float F=d[2],L=min(d[0],d[1]),R=min(d[3],d[4]); switch(state){ case FWD: if(F<18){state=(L>R)?TL:TR; turn_ms=500; mot(-150,-150); delay(200);} else if(F<35){ int sp=map((int)F,18,35,80,180); mot(sp-(int)(R-L)*2,sp+(int)(R-L)*2);} else mot(200,200); break; case TL: mot(-150,150); turn_ms-=50; if(turn_ms<=0) state=FWD; break; case TR: mot(150,-150); turn_ms-=50; if(turn_ms<=0) state=FWD; break; case STOP: mot(0,0); break;}} void setup(){ Serial.begin(115200); } void loop(){ read(); nav(); delay(50); }
08
Arduino C
ARDUINO — ENCODER + PID VELOCITY
// Encoder quadrature + PID velocity + odometry + ROS serial bridge #define EL_A 2; #define EL_B 4; #define ER_A 3; #define ER_B 5; volatile long eL=0,eR=0; void ISRL(){ eL+=(digitalRead(EL_B)==digitalRead(EL_A))?1:-1; } void ISRR(){ eR+=(digitalRead(ER_B)==digitalRead(ER_A))?1:-1; } const float RW=0.033,BL=0.23,CPR=4096; const float DPT=2*PI*RW/CPR; // dist per tick float px=0,py=0,pth=0,tvL=0,tvR=0,avL=0,avR=0; long peL=0,peR=0; struct PID{float kp,ki,kd,ii,pe;} pidL={200,15,8,0,0},pidR={200,15,8,0,0}; float pidCalc(PID& p,float tgt,float act,float dt){ float e=tgt-act; p.ii=constrain(p.ii+e*dt,-80,80); float out=p.kp*e+p.ki*p.ii+p.kd*(e-p.pe)/dt; p.pe=e; return out;} void odom(float dt){ long dL=eL-peL,dR=eR-peR; peL=eL; peR=eR; float dsL=dL*DPT,dsR=dR*DPT; avL=dsL/dt; avR=dsR/dt; float ds=(dsR+dsL)/2,dth=(dsR-dsL)/BL; px+=ds*cos(pth+dth/2); py+=ds*sin(pth+dth/2); pth+=dth;} void setup(){ Serial.begin(115200); attachInterrupt(digitalPinToInterrupt(EL_A),ISRL,CHANGE); attachInterrupt(digitalPinToInterrupt(ER_A),ISRR,CHANGE);} unsigned long t0=0; void loop(){ unsigned long now=micros(); float dt=(now-t0)/1e6; if(dt<0.02) return; t0=now; odom(dt); if(Serial.available()>=8){ float v,w; Serial.readBytes((char*)&v,4); Serial.readBytes((char*)&w,4); tvL=v-w*BL/2; tvR=v+w*BL/2;} motorPWM(pidCalc(pidL,tvL,avL,dt),pidCalc(pidR,tvR,avR,dt)); Serial.printf("O:%.4f,%.4f,%.4f\n",px,py,pth);}
09
Python ROS2
ROS2 NAV2 INTEGRATION
#!/usr/bin/env python3 import rclpy from nav2_simple_commander.robot_navigator import BasicNavigator, TaskResult from geometry_msgs.msg import PoseStamped import tf_transformations, time, math def make_pose(nav,x,y,yaw_deg=0): p=PoseStamped(); p.header.frame_id='map' p.header.stamp=nav.get_clock().now().to_msg() p.pose.position.x=x; p.pose.position.y=y q=tf_transformations.quaternion_from_euler(0,0,math.radians(yaw_deg)) p.pose.orientation.z=q[2]; p.pose.orientation.w=q[3] return p rclpy.init(); nav=BasicNavigator() nav.setInitialPose(make_pose(nav,0,0)) nav.waitUntilNav2Active() # Waypoint patrol wps=[(1.0,0.0,0),(1.0,1.5,90),(0.0,1.5,180),(0.0,0.0,270)] route=[make_pose(nav,x,y,t) for x,y,t in wps] nav.followWaypoints(route) while not nav.isTaskComplete(): fb=nav.getFeedback() if fb: print(f"WP {fb.current_waypoint}/{len(wps)} dist={fb.distance_remaining:.2f}m") time.sleep(0.5) print(nav.getResult())
Serial Bridge Node (ROS2 ↔ Arduino)
import rclpy, serial, struct from rclpy.node import Node from geometry_msgs.msg import Twist; from nav_msgs.msg import Odometry class Bridge(Node): def __init__(self): super().__init__('bridge') self.ser=serial.Serial('/dev/ttyUSB0',115200,timeout=0.01) self.pub=self.create_publisher(Odometry,'/odom',10) self.create_subscription(Twist,'/cmd_vel',self.cmd_cb,10) self.create_timer(0.02,self.serial_cb) def cmd_cb(self,msg): self.ser.write(struct.pack('ff',msg.linear.x,msg.angular.z)) def serial_cb(self): if self.ser.in_waiting: line=self.ser.readline().decode().strip() if line.startswith('O:'): x,y,th=map(float,line[2:].split(',')) msg=Odometry(); msg.header.frame_id='odom' msg.pose.pose.position.x=x; msg.pose.pose.position.y=y self.pub.publish(msg) rclpy.init(); n=Bridge(); rclpy.spin(n)
010
Advanced
BEHAVIOR TREES
Behavior Trees (BT) adalah arsitektur kontrol modular. Setiap node mengembalikan SUCCESS, FAILURE, atau RUNNING.
| Node | Logika | Use Case |
|---|---|---|
| Sequence → | AND: semua child harus SUCCESS | Urutan aksi |
| Selector ? | OR: cukup satu SUCCESS | Prioritized fallback |
| Parallel | Jalankan semua serentak | Concurrent tasks |
| Action | Leaf: eksekusi aksi | MoveBase, Pick |
| Condition | Leaf: cek kondisi | BatteryOK, ObstacleAhead |
import py_trees as pt from py_trees.behaviour import Behaviour from py_trees.common import Status class BatteryOK(Behaviour): def update(self): level=pt.blackboard.Blackboard().get('battery') return Status.SUCCESS if level>20 else Status.FAILURE class ObstacleAhead(Behaviour): def update(self): return Status.SUCCESS if pt.blackboard.Blackboard().get('obstacle') else Status.FAILURE class AvoidObstacle(Behaviour): def update(self): print('Avoiding...'); return Status.SUCCESS class Navigate(Behaviour): def update(self): print('Navigating...'); return Status.RUNNING root=pt.composites.Sequence('Root',memory=False) safety=pt.composites.Selector('Safety',memory=False) safety.add_children([BatteryOK()]) nav=pt.composites.Selector('NavTask',memory=False) avoid_seq=pt.composites.Sequence('AvoidIfObs') avoid_seq.add_children([ObstacleAhead(),AvoidObstacle()]) nav.add_children([avoid_seq,Navigate()]) root.add_children([safety,nav]) bt=pt.trees.BehaviourTree(root) bt.tick_tock(period_ms=100)
011
Advanced
SENSOR FUSION EKF
EKF: state x = [x, y, θ, vx, vy, ω]
Predict:
x_pred = F·x_prev
P_pred = F·P·F^T + Q
Update (per sensor):
K = P_pred·H^T·(H·P_pred·H^T + R)^-1
x = x_pred + K·(z - H·x_pred)
P = (I - K·H)·P_pred
import numpy as np class NavEKF: def __init__(self,dt=0.02): self.dt=dt; self.x=np.zeros(6) self.P=np.eye(6)*0.1 self.F=np.eye(6); self.F[0,3]=dt; self.F[1,4]=dt; self.F[2,5]=dt self.Q=np.diag([1e-4,1e-4,1e-4,0.01,0.01,0.01]) def predict(self,v,w): th=self.x[2]; vx=v*np.cos(th); vy=v*np.sin(th) self.x[0]+=vx*self.dt; self.x[1]+=vy*self.dt; self.x[2]+=w*self.dt self.x[3]=vx; self.x[4]=vy; self.x[5]=w [email protected]@self.F.T+self.Q def update_gps(self,gx,gy): H=np.zeros((2,6)); H[0,0]=1; H[1,1]=1 self._upd(H,np.diag([0.5,0.5]),np.array([gx,gy])) def update_imu(self,omega): H=np.zeros((1,6)); H[0,5]=1 self._upd(H,np.array([[0.01]]),np.array([omega])) def _upd(self,H,R,z): [email protected]; [email protected]@H.T+R [email protected]@np.linalg.inv(S) self.x+=K@y; self.P=(np.eye(6)-K@H)@self.P def pose(self): return self.x[0],self.x[1],self.x[2]
AUTONOMOUS NAVIGATION — DOC 01 / 07
Antonius - bluedragonsec.com
Antonius - bluedragonsec.com