DOC 1 — AUTONOMOUS NAVIGATION
// DOC 01 / 07

AUTONOMOUS
NAVIGATION

Panduan komprehensif navigasi otonom: LiDAR, ultrasonik, IR, A*, DWA, VFH, AMCL, SLAM, Behavior Trees, Sensor Fusion EKF — source code Python & Arduino C.

LiDAR/SonarA* DWA VFHArduino CROS2 Nav2AMCL SLAM
01
Overview

ARSITEKTUR NAVIGASI

Navigasi otonom memecahkan tiga masalah secara bersamaan: lokalisasi (robot di mana?), pemetaan (lingkungan seperti apa?), dan perencanaan jalur (bagaimana ke tujuan?).

📍
Localization
Estimasi pose (x,y,θ) menggunakan AMCL particle filter atau EKF.
🗺️
Mapping
Occupancy grid dari LiDAR, update log-odds per cell via Bresenham ray casting.
🎯
Path Planning
A* untuk global path, DWA/VFH untuk local reactive navigation.
⚠️
Obstacle Avoidance
Reaktif: sensor threshold + costmap inflation + real-time replanning.

Navigation Stack Architecture

# ROS2 Nav2 Data Flow:
# GOAL -> Global Planner (A*/NavFn) -> Global Path
#              |                           |
# SENSORS -> Costmap 2D -> Local Planner (DWA/TEB) -> /cmd_vel
# LiDAR      static+odom    real-time replanning
#    |
# AMCL Localizer <-- map_server (pgm/yaml)
#    |
# /odom (encoder odometry)

Differential Drive Kinematics

ΔsL = r·ΔθL_encoder ΔsR = r·ΔθR_encoder Δs = (ΔsR + ΔsL) / 2 linear displacement Δθ = (ΔsR - ΔsL) / L angular (L = wheel base) x += Δs·cos(θ + Δθ/2) y += Δs·sin(θ + Δθ/2) θ += Δθ
02
Hardware

SENSOR SUITE

LiDAR Comparison

ModelRangeFOVHzInterfaceEst. Harga
RPLiDAR A112m360°5.5UART~$99
RPLiDAR A325m360°15UART~$299
YDLiDAR X410m360°6UART~$60
SICK TiM57125m270°15Ethernet~$800
# Python: Baca RPLiDAR, deteksi obstacle
from rplidar import RPLidar
import numpy as np

lidar = RPLidar('/dev/ttyUSB0')
scan = np.zeros(360)

for s in lidar.iter_scans():
    for (_, ang, dist) in s:
        idx = int(ang) % 360
        if dist > 0: scan[idx] = dist/1000.0
    front = np.concatenate([scan[330:], scan[:30]])
    valid = front[front > 0.05]
    if len(valid) > 0 and valid.min() < 0.5:
        print(f"OBSTACLE: {valid.min():.2f}m")
    break

HC-SR04 Ultrasonik — Arduino

Jarak (cm) = (PulseWidth_us / 2) / 29.1 Atau: Jarak = SpeedOfSound(343 m/s) × t/2 Range: 2-400cm, Akurasi: ±3mm, Dead zone: <2cm
// Arduino: 5-sensor ultrasonik array dengan NewPing
#include <NewPing.h>
#define N 5
const int T[N]={22,24,26,28,30}, E[N]={23,25,27,29,31};
NewPing sonar[N]={NewPing(T[0],E[0],200),NewPing(T[1],E[1],200),
  NewPing(T[2],E[2],200),NewPing(T[3],E[3],200),NewPing(T[4],E[4],200)};
float d[N]; // distances in cm

void readAll() {
  for(int i=0;i<N;i++) {
    unsigned int us = sonar[i].ping_median(3);
    d[i] = (us==0) ? 200 : us/US_ROUNDTRIP_CM;
  }
}

IR Sharp GP2Y0A21 — Analog

// Arduino: Sharp IR jarak 10-80cm
float readIR(int pin) {
  int raw = analogRead(pin);
  float v = raw * (5.0/1023.0);
  if(v < 0.4) return 80;  // out of range
  return 27.728 * pow(v, -1.2045);
}
03
Algoritma

OBSTACLE AVOIDANCE

VFH — Vector Field Histogram

1. Bangun polar histogram H(k) dari scan LiDAR: H(k) = sum(m_ij^2 * (a - b*d_ij)) untuk tiap obstacle di sector k 2. Threshold binary: H_b(k) = 1 jika H(k) > tau_high (blocked) 3. Cari valley (run of free sectors) terlebar mendekati target direction 4. Steering = tengah valley terdekat ke target heading
import numpy as np

def vfh_steering(scan_360, target_deg, alpha=5, tau_hi=3000):
    """scan_360: distances[0..359], target_deg: goal direction"""
    n = 360// alpha
    hist = np.zeros(n)
    a, b = 4, 0.4
    for i, dist in enumerate(scan_360):
        if dist < 0.05 or dist > 5: continue
        k = i // alpha
        m = 1/(1+dist)
        hist[k] += m**2 * (a - b*dist)
    binary = (hist > tau_hi).astype(int)
    target_k = (target_deg // alpha) % n
    best_angle, best_width = target_deg, 0
    in_v, vs, valleys = False, 0, []
    ext = np.tile(binary, 3)
    for k in range(n*3):
        if not ext[k] and not in_v: vs=k; in_v=True
        elif ext[k] and in_v:
            w=k-vs; mid=(vs+k-1)//2%n
            if w>=3: valleys.append((mid,w)); in_v=False
    if valleys:
        best_k = min(valleys, key=lambda v: abs(v[0]-target_k))[0]
        best_angle = best_k * alpha
    return best_angle

Dynamic Window Approach (DWA)

Feasible Velocity Set = Vs ∩ Vd ∩ Vr Vd = [v±a·dt] x [ω±α·dt] -- dynamic window (accel limits) Vr = {v | v ≤ sqrt(2·d_obstacle·a_decel)} -- admissible Score(v,ω) = α·heading + β·clearance + γ·velocity → maximize
import numpy as np

def dwa_plan(x,y,yaw,v,w, gx,gy, obstacles,
              max_v=0.5,max_w=1.0,max_a=2.0,max_dw=3.2,dt=0.1,pt=2.0):
    best = (-999,0,0)
    vmin=max(0,v-max_a*dt); vmax=min(max_v,v+max_a*dt)
    wmin=max(-max_w,w-max_dw*dt); wmax=min(max_w,w+max_dw*dt)
    for tv in np.arange(vmin,vmax+0.02,0.02):
        for tw in np.arange(wmin,wmax+0.05,0.05):
            cx,cy,cth=x,y,yaw; md=999
            for _ in np.arange(0,pt,dt):
                cx+=tv*np.cos(cth)*dt; cy+=tv*np.sin(cth)*dt; cth+=tw*dt
                for ox,oy in obstacles:
                    d=np.hypot(cx-ox,cy-oy)-0.25
                    if d<0: md=-1; break
                    md=min(md,d)
                if md<0: break
            if md<=0: continue
            ha=np.arctan2(gy-cy,gx-cx)
            sc=np.pi-abs(ha-cth)+1.2*md+0.5*tv/max_v
            if sc>best[0]: best=(sc,tv,tw)
    return best[1], best[2]
04
Algoritma

A* PATH PLANNING

A* dengan Inflated Costmap

f(n) = g(n) + h(n) g(n) = biaya aktual dari start ke n h(n) = octile heuristic: (dx+dy) + (sqrt(2)-2)*min(dx,dy) Robot radius inflation: binary_dilation(grid, struct=disk(r_cells)) Ensures robot body tidak menabrak obstacle tepi
import heapq, numpy as np
from scipy.ndimage import binary_dilation

class AStarGrid:
    MOVES=[(-1,-1),(-1,0),(-1,1),(0,-1),(0,1),(1,-1),(1,0),(1,1)]
    COSTS=[1.414,1,1.414,1,1,1.414,1,1.414]

    def __init__(self, grid, res=0.05):
        self.g=grid; self.res=res; self.H,self.W=grid.shape

    def _h(self,a,b):
        dx,dy=abs(a[0]-b[0]),abs(a[1]-b[1])
        return (dx+dy)+(1.414-2)*min(dx,dy)

    def plan(self,start,goal,r_cells=5):
        ig=binary_dilation(self.g,np.ones((2*r_cells+1,)*2))
        oh=[(self._h(start,goal),0,start)]; cf={}; gs={start:0}
        while oh:
            _,g,c=heapq.heappop(oh)
            if c==goal:
                p=[]; while c in cf: p.append(c); c=cf[c]
                p.append(start); p.reverse()
                return [(r*self.res,c2*self.res) for r,c2 in p]
            for (dr,dc),cost in zip(self.MOVES,self.COSTS):
                nb=(c[0]+dr,c[1]+dc)
                if not(0<=nb[0]<self.H and 0<=nb[1]<self.W): continue
                if ig[nb[0],nb[1]]: continue
                ng=gs[c]+cost
                if ng<gs.get(nb,1e9):
                    gs[nb]=ng; cf[nb]=c
                    heapq.heappush(oh,(ng+self._h(nb,goal),ng,nb))
        return []
05
Localization

AMCL PARTICLE FILTER

Particle Filter: N particles = N hypothesis pose robot Predict: x_i = motion_model(x_i-1, u) + noise_motion Update: w_i = p(z | x_i, map) -- likelihood sensor measurement Normalize: sum(w_i) = 1 Resample: N new particles sampled proportional to weights AMCL (Adaptive): KLD-sampling -- N adaptif berdasarkan uncertainty
import numpy as np
from scipy.stats import norm

class AMCL:
    def __init__(self,N=500):
        self.N=N; self.p=None; self.w=np.ones(N)/N
        self.alpha=[0.05,0.01,0.05,0.01]

    def init_uniform(self,x0,x1,y0,y1):
        self.p=np.column_stack([
            np.random.uniform(x0,x1,self.N),
            np.random.uniform(y0,y1,self.N),
            np.random.uniform(-np.pi,np.pi,self.N)])

    def predict(self,dr1,dt,dr2):
        a=self.alpha; N=self.N
        r1=dr1+np.random.normal(0,a[0]*abs(dr1)+a[1]*dt,N)
        tr=dt+np.random.normal(0,a[2]*dt+a[3]*(dr1+dr2),N)
        r2=dr2+np.random.normal(0,a[0]*abs(dr2)+a[1]*dt,N)
        self.p[:,0]+=tr*np.cos(self.p[:,2]+r1)
        self.p[:,1]+=tr*np.sin(self.p[:,2]+r1)
        self.p[:,2]+=r1+r2

    def sensor_update(self,scan,expect_fn,sigma=0.2):
        for i in range(self.N):
            ex=expect_fn(self.p[i])
            self.w[i]=np.exp(np.clip(np.sum(norm.logpdf(scan,ex,sigma)),-500,0))
        self.w+=1e-300; self.w/=self.w.sum()

    def resample(self):
        idx=np.zeros(self.N,dtype=int); c=self.w[0]; i=0
        u=np.random.uniform(0,1/self.N)
        for j in range(self.N):
            beta=u+j/self.N
            while beta>c: i+=1; c+=self.w[i]
            idx[j]=i
        self.p=self.p[idx]; self.w[:]=1/self.N

    def pose(self):
        x=np.average(self.p[:,0],weights=self.w)
        y=np.average(self.p[:,1],weights=self.w)
        t=np.arctan2(np.average(np.sin(self.p[:,2]),weights=self.w),
                     np.average(np.cos(self.p[:,2]),weights=self.w))
        return x,y,t
06
Mapping

OCCUPANCY GRID SLAM

Log-odds update: l_t(m) = l_(t-1)(m) + l_sensor(m|z,x) - l_prior l_occ = +0.85 (cell occupied) l_free = -0.85 (cell free) l_prior = 0 Probability: P = 1 - 1/(1 + exp(l)) Clamp: l in [-20, +20]
import numpy as np

class OccGrid:
    def __init__(self,W=400,H=400,res=0.05):
        self.W=W;self.H=H;self.res=res
        self.lo=np.zeros((H,W))

    def w2g(self,wx,wy): return int(wx/self.res+self.W/2),int(wy/self.res+self.H/2)

    def bres(self,x0,y0,x1,y1):
        pts=[];dx,dy=abs(x1-x0),abs(y1-y0)
        sx=1 if x1>x0 else -1;sy=1 if y1>y0 else -1;e=dx-dy
        while True:
            pts.append((x0,y0))
            if x0==x1 and y0==y1: break
            e2=2*e
            if e2>-dy: e-=dy;x0+=sx
            if e2<dx: e+=dx;y0+=sy
        return pts

    def update(self,rx,ry,angles,dists):
        gx,gy=self.w2g(rx,ry)
        for a,d in zip(angles,dists):
            if d<0.05 or d>8: continue
            ex=rx+d*np.cos(a);ey=ry+d*np.sin(a)
            egx,egy=self.w2g(ex,ey)
            for px,py in self.bres(gx,gy,egx,egy)[:-1]:
                if 0<=px<self.W and 0<=py<self.H:
                    self.lo[py,px]=np.clip(self.lo[py,px]-0.85,-20,20)
            if 0<=egx<self.W and 0<=egy<self.H:
                self.lo[egy,egx]=np.clip(self.lo[egy,egx]+0.85,-20,20)

    def prob(self): return 1-1/(1+np.exp(self.lo))
07
Arduino C

ARDUINO — ULTRASONIK NAVIGATION

// Arduino Mega: 5x ultrasonik + L298N state machine
#include <NewPing.h>
#define N 5
NewPing sonar[N]={NewPing(22,23,200),NewPing(24,25,200),NewPing(26,27,200),
                  NewPing(28,29,200),NewPing(30,31,200)};
#define IN1 2; #define IN2 3; #define IN3 4; #define IN4 5;
#define ENA 9; #define ENB 10;
float d[N]; int turn_ms=0;
enum {FWD,TL,TR,BACK,STOP} state=FWD;

void mot(int L,int R){
  digitalWrite(IN1,L>=0);   digitalWrite(IN2,L<0);
  digitalWrite(IN3,R>=0);   digitalWrite(IN4,R<0);
  analogWrite(ENA,constrain(abs(L),0,255));
  analogWrite(ENB,constrain(abs(R),0,255));}

void read(){
  for(int i=0;i<N;i++){
    unsigned int us=sonar[i].ping_median(3);
    d[i]=(us==0)?200:us/US_ROUNDTRIP_CM;}}

void nav(){
  float F=d[2],L=min(d[0],d[1]),R=min(d[3],d[4]);
  switch(state){
    case FWD:
      if(F<18){state=(L>R)?TL:TR; turn_ms=500; mot(-150,-150); delay(200);}
      else if(F<35){
        int sp=map((int)F,18,35,80,180);
        mot(sp-(int)(R-L)*2,sp+(int)(R-L)*2);}
      else mot(200,200); break;
    case TL: mot(-150,150); turn_ms-=50; if(turn_ms<=0) state=FWD; break;
    case TR: mot(150,-150); turn_ms-=50; if(turn_ms<=0) state=FWD; break;
    case STOP: mot(0,0); break;}}

void setup(){ Serial.begin(115200); }
void loop(){ read(); nav(); delay(50); }
08
Arduino C

ARDUINO — ENCODER + PID VELOCITY

// Encoder quadrature + PID velocity + odometry + ROS serial bridge
#define EL_A 2; #define EL_B 4; #define ER_A 3; #define ER_B 5;
volatile long eL=0,eR=0;
void ISRL(){ eL+=(digitalRead(EL_B)==digitalRead(EL_A))?1:-1; }
void ISRR(){ eR+=(digitalRead(ER_B)==digitalRead(ER_A))?1:-1; }

const float RW=0.033,BL=0.23,CPR=4096;
const float DPT=2*PI*RW/CPR;  // dist per tick
float px=0,py=0,pth=0,tvL=0,tvR=0,avL=0,avR=0;
long peL=0,peR=0;
struct PID{float kp,ki,kd,ii,pe;} pidL={200,15,8,0,0},pidR={200,15,8,0,0};

float pidCalc(PID& p,float tgt,float act,float dt){
  float e=tgt-act;
  p.ii=constrain(p.ii+e*dt,-80,80);
  float out=p.kp*e+p.ki*p.ii+p.kd*(e-p.pe)/dt;
  p.pe=e; return out;}

void odom(float dt){
  long dL=eL-peL,dR=eR-peR; peL=eL; peR=eR;
  float dsL=dL*DPT,dsR=dR*DPT;
  avL=dsL/dt; avR=dsR/dt;
  float ds=(dsR+dsL)/2,dth=(dsR-dsL)/BL;
  px+=ds*cos(pth+dth/2); py+=ds*sin(pth+dth/2); pth+=dth;}

void setup(){
  Serial.begin(115200);
  attachInterrupt(digitalPinToInterrupt(EL_A),ISRL,CHANGE);
  attachInterrupt(digitalPinToInterrupt(ER_A),ISRR,CHANGE);}

unsigned long t0=0;
void loop(){
  unsigned long now=micros();
  float dt=(now-t0)/1e6; if(dt<0.02) return; t0=now;
  odom(dt);
  if(Serial.available()>=8){
    float v,w; Serial.readBytes((char*)&v,4); Serial.readBytes((char*)&w,4);
    tvL=v-w*BL/2; tvR=v+w*BL/2;}
  motorPWM(pidCalc(pidL,tvL,avL,dt),pidCalc(pidR,tvR,avR,dt));
  Serial.printf("O:%.4f,%.4f,%.4f\n",px,py,pth);}
09
Python ROS2

ROS2 NAV2 INTEGRATION

#!/usr/bin/env python3
import rclpy
from nav2_simple_commander.robot_navigator import BasicNavigator, TaskResult
from geometry_msgs.msg import PoseStamped
import tf_transformations, time, math

def make_pose(nav,x,y,yaw_deg=0):
    p=PoseStamped(); p.header.frame_id='map'
    p.header.stamp=nav.get_clock().now().to_msg()
    p.pose.position.x=x; p.pose.position.y=y
    q=tf_transformations.quaternion_from_euler(0,0,math.radians(yaw_deg))
    p.pose.orientation.z=q[2]; p.pose.orientation.w=q[3]
    return p

rclpy.init(); nav=BasicNavigator()
nav.setInitialPose(make_pose(nav,0,0))
nav.waitUntilNav2Active()

# Waypoint patrol
wps=[(1.0,0.0,0),(1.0,1.5,90),(0.0,1.5,180),(0.0,0.0,270)]
route=[make_pose(nav,x,y,t) for x,y,t in wps]
nav.followWaypoints(route)

while not nav.isTaskComplete():
    fb=nav.getFeedback()
    if fb: print(f"WP {fb.current_waypoint}/{len(wps)} dist={fb.distance_remaining:.2f}m")
    time.sleep(0.5)

print(nav.getResult())

Serial Bridge Node (ROS2 ↔ Arduino)

import rclpy, serial, struct
from rclpy.node import Node
from geometry_msgs.msg import Twist; from nav_msgs.msg import Odometry

class Bridge(Node):
    def __init__(self):
        super().__init__('bridge')
        self.ser=serial.Serial('/dev/ttyUSB0',115200,timeout=0.01)
        self.pub=self.create_publisher(Odometry,'/odom',10)
        self.create_subscription(Twist,'/cmd_vel',self.cmd_cb,10)
        self.create_timer(0.02,self.serial_cb)
    def cmd_cb(self,msg):
        self.ser.write(struct.pack('ff',msg.linear.x,msg.angular.z))
    def serial_cb(self):
        if self.ser.in_waiting:
            line=self.ser.readline().decode().strip()
            if line.startswith('O:'):
                x,y,th=map(float,line[2:].split(','))
                msg=Odometry(); msg.header.frame_id='odom'
                msg.pose.pose.position.x=x; msg.pose.pose.position.y=y
                self.pub.publish(msg)

rclpy.init(); n=Bridge(); rclpy.spin(n)
010
Advanced

BEHAVIOR TREES

Behavior Trees (BT) adalah arsitektur kontrol modular. Setiap node mengembalikan SUCCESS, FAILURE, atau RUNNING.

NodeLogikaUse Case
Sequence →AND: semua child harus SUCCESSUrutan aksi
Selector ?OR: cukup satu SUCCESSPrioritized fallback
ParallelJalankan semua serentakConcurrent tasks
ActionLeaf: eksekusi aksiMoveBase, Pick
ConditionLeaf: cek kondisiBatteryOK, ObstacleAhead
import py_trees as pt
from py_trees.behaviour import Behaviour
from py_trees.common import Status

class BatteryOK(Behaviour):
    def update(self):
        level=pt.blackboard.Blackboard().get('battery')
        return Status.SUCCESS if level>20 else Status.FAILURE

class ObstacleAhead(Behaviour):
    def update(self):
        return Status.SUCCESS if pt.blackboard.Blackboard().get('obstacle') else Status.FAILURE

class AvoidObstacle(Behaviour):
    def update(self): print('Avoiding...'); return Status.SUCCESS

class Navigate(Behaviour):
    def update(self): print('Navigating...'); return Status.RUNNING

root=pt.composites.Sequence('Root',memory=False)
safety=pt.composites.Selector('Safety',memory=False)
safety.add_children([BatteryOK()])
nav=pt.composites.Selector('NavTask',memory=False)
avoid_seq=pt.composites.Sequence('AvoidIfObs')
avoid_seq.add_children([ObstacleAhead(),AvoidObstacle()])
nav.add_children([avoid_seq,Navigate()])
root.add_children([safety,nav])
bt=pt.trees.BehaviourTree(root)
bt.tick_tock(period_ms=100)
011
Advanced

SENSOR FUSION EKF

EKF: state x = [x, y, θ, vx, vy, ω] Predict: x_pred = F·x_prev P_pred = F·P·F^T + Q Update (per sensor): K = P_pred·H^T·(H·P_pred·H^T + R)^-1 x = x_pred + K·(z - H·x_pred) P = (I - K·H)·P_pred
import numpy as np

class NavEKF:
    def __init__(self,dt=0.02):
        self.dt=dt; self.x=np.zeros(6)
        self.P=np.eye(6)*0.1
        self.F=np.eye(6); self.F[0,3]=dt; self.F[1,4]=dt; self.F[2,5]=dt
        self.Q=np.diag([1e-4,1e-4,1e-4,0.01,0.01,0.01])

    def predict(self,v,w):
        th=self.x[2]; vx=v*np.cos(th); vy=v*np.sin(th)
        self.x[0]+=vx*self.dt; self.x[1]+=vy*self.dt; self.x[2]+=w*self.dt
        self.x[3]=vx; self.x[4]=vy; self.x[5]=w
        [email protected]@self.F.T+self.Q

    def update_gps(self,gx,gy):
        H=np.zeros((2,6)); H[0,0]=1; H[1,1]=1
        self._upd(H,np.diag([0.5,0.5]),np.array([gx,gy]))

    def update_imu(self,omega):
        H=np.zeros((1,6)); H[0,5]=1
        self._upd(H,np.array([[0.01]]),np.array([omega]))

    def _upd(self,H,R,z):
        [email protected]; [email protected]@H.T+R
        [email protected]@np.linalg.inv(S)
        self.x+=K@y; self.P=(np.eye(6)-K@H)@self.P

    def pose(self): return self.x[0],self.x[1],self.x[2]
AUTONOMOUS NAVIGATION — DOC 01 / 07
Antonius - bluedragonsec.com