path: root/EXAMPLES
diff options
authorjoan <>2014-11-25 17:10:05 +0000
committerjoan <>2014-11-25 17:10:05 +0000
commite8a8ce19824a469cc75f162334cd46c44781ee1b (patch)
tree9af3ba448025f39a48b742960ca23ffa9e7790e9 /EXAMPLES
parent6e8073871d0fefd8f9c25a2844a4f877c7831fe0 (diff)
Diffstat (limited to 'EXAMPLES')
26 files changed, 1712 insertions, 2 deletions
diff --git a/EXAMPLES/CPP/Readme b/EXAMPLES/CPP/Readme
deleted file mode 100644
index 4d0ddc7..0000000
--- a/EXAMPLES/CPP/Readme
+++ /dev/null
@@ -1 +0,0 @@
-pigpio C++ examples
diff --git a/EXAMPLES/Python/DHT22_AM2302_SENSOR/ b/EXAMPLES/Python/DHT22_AM2302_SENSOR/
new file mode 100755
index 0000000..8a10f17
--- /dev/null
+++ b/EXAMPLES/Python/DHT22_AM2302_SENSOR/
@@ -0,0 +1,283 @@
+#!/usr/bin/env python
+# 2014-07-11
+import time
+import atexit
+import pigpio
+class sensor:
+ """
+ A class to read relative humidity and temperature from the
+ DHT22 sensor. The sensor is also known as the AM2302.
+ The sensor can be powered from the Pi 3V3 or the Pi 5V rail.
+ Powering from the 3V3 rail is simpler and safer. You may need
+ to power from 5V if the sensor is connected via a long cable.
+ For 3V3 operation connect pin 1 to 3V3 and pin 4 to ground.
+ Connect pin 2 to a gpio.
+ For 5V operation connect pin 1 to 5V and pin 4 to ground.
+ The following pin 2 connection works for me. Use at YOUR OWN RISK.
+ 5V--5K_resistor--+--10K_resistor--Ground
+ |
+ DHT22 pin 2 -----+
+ |
+ gpio ------------+
+ """
+ def __init__(self, pi, gpio, LED=None, power=None):
+ """
+ Instantiate with the Pi and gpio to which the DHT22 output
+ pin is connected.
+ Optionally a LED may be specified. This will be blinked for
+ each successful reading.
+ Optionally a gpio used to power the sensor may be specified.
+ This gpio will be set high to power the sensor. If the sensor
+ locks it will be power cycled to restart the readings.
+ Taking readings more often than about once every two seconds will
+ eventually cause the DHT22 to hang. A 3 second interval seems OK.
+ """
+ self.pi = pi
+ self.gpio = gpio
+ self.LED = LED
+ self.power = power
+ if power is not None:
+ pi.write(power, 1) # Switch sensor on.
+ time.sleep(2)
+ self.powered = True
+ self.cb = None
+ atexit.register(self.cancel)
+ self.bad_CS = 0 # Bad checksum count.
+ self.bad_SM = 0 # Short message count.
+ self.bad_MM = 0 # Missing message count.
+ self.bad_SR = 0 # Sensor reset count.
+ # Power cycle if timeout > MAX_TIMEOUTS.
+ self.no_response = 0
+ self.MAX_NO_RESPONSE = 2
+ self.rhum = -999
+ self.temp = -999
+ self.tov = None
+ self.high_tick = 0
+ self.bit = 40
+ pi.set_pull_up_down(gpio, pigpio.PUD_OFF)
+ pi.set_watchdog(gpio, 0) # Kill any watchdogs.
+ self.cb = pi.callback(gpio, pigpio.EITHER_EDGE, self._cb)
+ def _cb(self, gpio, level, tick):
+ """
+ Accumulate the 40 data bits. Format into 5 bytes, humidity high,
+ humidity low, temperature high, temperature low, checksum.
+ """
+ diff = pigpio.tickDiff(self.high_tick, tick)
+ if level == 0:
+ # Edge length determines if bit is 1 or 0.
+ if diff >= 50:
+ val = 1
+ if diff >= 200: # Bad bit?
+ self.CS = 256 # Force bad checksum.
+ else:
+ val = 0
+ if self.bit >= 40: # Message complete.
+ self.bit = 40
+ elif self.bit >= 32: # In checksum byte.
+ self.CS = (self.CS<<1) + val
+ if self.bit == 39:
+ # 40th bit received.
+ self.pi.set_watchdog(self.gpio, 0)
+ self.no_response = 0
+ total = self.hH + self.hL + self.tH + self.tL
+ if (total & 255) == self.CS: # Is checksum ok?
+ self.rhum = ((self.hH<<8) + self.hL) * 0.1
+ if self.tH & 128: # Negative temperature.
+ mult = -0.1
+ self.tH = self.tH & 127
+ else:
+ mult = 0.1
+ self.temp = ((self.tH<<8) + self.tL) * mult
+ self.tov = time.time()
+ if self.LED is not None:
+ self.pi.write(self.LED, 0)
+ else:
+ self.bad_CS += 1
+ elif self.bit >=24: # in temp low byte
+ self.tL = (self.tL<<1) + val
+ elif self.bit >=16: # in temp high byte
+ self.tH = (self.tH<<1) + val
+ elif self.bit >= 8: # in humidity low byte
+ self.hL = (self.hL<<1) + val
+ elif self.bit >= 0: # in humidity high byte
+ self.hH = (self.hH<<1) + val
+ else: # header bits
+ pass
+ self.bit += 1
+ elif level == 1:
+ self.high_tick = tick
+ if diff > 250000:
+ self.bit = -2
+ self.hH = 0
+ self.hL = 0
+ self.tH = 0
+ self.tL = 0
+ self.CS = 0
+ else: # level == pigpio.TIMEOUT:
+ self.pi.set_watchdog(self.gpio, 0)
+ if self.bit < 8: # Too few data bits received.
+ self.bad_MM += 1 # Bump missing message count.
+ self.no_response += 1
+ if self.no_response > self.MAX_NO_RESPONSE:
+ self.no_response = 0
+ self.bad_SR += 1 # Bump sensor reset count.
+ if self.power is not None:
+ self.powered = False
+ self.pi.write(self.power, 0)
+ time.sleep(2)
+ self.pi.write(self.power, 1)
+ time.sleep(2)
+ self.powered = True
+ elif self.bit < 39: # Short message receieved.
+ self.bad_SM += 1 # Bump short message count.
+ self.no_response = 0
+ else: # Full message received.
+ self.no_response = 0
+ def temperature(self):
+ """Return current temperature."""
+ return self.temp
+ def humidity(self):
+ """Return current relative humidity."""
+ return self.rhum
+ def staleness(self):
+ """Return time since measurement made."""
+ if self.tov is not None:
+ return time.time() - self.tov
+ else:
+ return -999
+ def bad_checksum(self):
+ """Return count of messages received with bad checksums."""
+ return self.bad_CS
+ def short_message(self):
+ """Return count of short messages."""
+ return self.bad_SM
+ def missing_message(self):
+ """Return count of missing messages."""
+ return self.bad_MM
+ def sensor_resets(self):
+ """Return count of power cycles because of sensor hangs."""
+ return self.bad_SR
+ def trigger(self):
+ """Trigger a new relative humidity and temperature reading."""
+ if self.powered:
+ if self.LED is not None:
+ self.pi.write(self.LED, 1)
+ self.pi.write(self.gpio, pigpio.LOW)
+ time.sleep(0.017) # 17 ms
+ self.pi.set_mode(self.gpio, pigpio.INPUT)
+ self.pi.set_watchdog(self.gpio, 200)
+ def cancel(self):
+ """Cancel the DHT22 sensor."""
+ self.pi.set_watchdog(self.gpio, 0)
+ if self.cb != None:
+ self.cb.cancel()
+ self.cb = None
+if __name__ == "__main__":
+ import time
+ import pigpio
+ import DHT22
+ # Intervals of about 2 seconds or less will eventually hang the DHT22.
+ pi = pigpio.pi()
+ s = DHT22.sensor(pi, 22, LED=16, power=8)
+ r = 0
+ next_reading = time.time()
+ while True:
+ r += 1
+ s.trigger()
+ time.sleep(0.2)
+ print("{} {} {} {:3.2f} {} {} {} {}".format(
+ r, s.humidity(), s.temperature(), s.staleness(),
+ s.bad_checksum(), s.short_message(), s.missing_message(),
+ s.sensor_resets()))
+ next_reading += INTERVAL
+ time.sleep(next_reading-time.time()) # Overall INTERVAL second polling.
+ s.cancel()
+ pi.stop()
new file mode 100644
index 0000000..328c023
--- /dev/null
@@ -0,0 +1,2 @@
+Class to read the relative humidity and temperature from a DHT22/AM2302 sensor.
new file mode 100644
index 0000000..4ee48d3
--- /dev/null
@@ -0,0 +1,2 @@
+Script to display the status of gpios 0-31.
new file mode 100755
index 0000000..82c022c
--- /dev/null
@@ -0,0 +1,62 @@
+#!/usr/bin/env python
+import time
+import curses
+import atexit
+import pigpio
+MODES=["INPUT", "OUTPUT", "ALT5", "ALT4", "ALT0", "ALT1", "ALT2", "ALT3"]
+def cleanup():
+ curses.nocbreak()
+ curses.echo()
+ curses.endwin()
+ pi.stop()
+pi = pigpio.pi()
+stdscr = curses.initscr()
+cb = []
+for g in range(GPIOS):
+ cb.append(pi.callback(g, pigpio.EITHER_EDGE))
+# disable gpio 28 as the PCM clock is swamping the system
+stdscr.addstr(0, 23, "Status of gpios 0-31", curses.A_REVERSE)
+while True:
+ for g in range(GPIOS):
+ tally = cb[g].tally()
+ mode = pi.get_mode(g)
+ col = (g / 11) * 25
+ row = (g % 11) + 2
+ stdscr.addstr(row, col, "{:2}".format(g), curses.A_BOLD)
+ stdscr.addstr(
+ "={} {:>6}: {:<10}".format(, MODES[mode], tally))
+ stdscr.refresh()
+ time.sleep(0.1)
+ c = stdscr.getch()
+ if c != curses.ERR:
+ break
new file mode 100644
index 0000000..57f9143
--- /dev/null
@@ -0,0 +1,2 @@
+Program to show status changes for a Hall effect sensor.
new file mode 100755
index 0000000..a3a19bb
--- /dev/null
@@ -0,0 +1,32 @@
+#!/usr/bin/env python
+import time
+import pigpio
+# OH3144E or equivalent Hall effect sensor
+# Pin 1 - 5V
+# Pin 2 - Ground
+# Pin 3 - gpio (here P1-8, gpio 14, TXD is used)
+# The internal gpio pull-up is enabled so that the sensor
+# normally reads high. It reads low when a magnet is close.
+pi = pigpio.pi() # connect to local Pi
+pi.set_mode(HALL, pigpio.INPUT)
+pi.set_pull_up_down(HALL, pigpio.PUD_UP)
+start = time.time()
+while (time.time() - start) < 60:
+ print("Hall = {}".format(
+ time.sleep(0.2)
diff --git a/EXAMPLES/Python/I2C_SNIFFER/ b/EXAMPLES/Python/I2C_SNIFFER/
new file mode 100755
index 0000000..e45fa21
--- /dev/null
@@ -0,0 +1,163 @@
+#!/usr/bin/env python
+import time
+import pigpio
+class sniffer:
+ """
+ A class to passively monitor activity on an I2C bus. This should
+ work for an I2C bus running at 100kbps or less. You are unlikely
+ to get any usable results for a bus running any faster.
+ """
+ def __init__(self, pi, SCL, SDA, set_as_inputs=True):
+ """
+ Instantiate with the Pi and the gpios for the I2C clock
+ and data lines.
+ If you are monitoring one of the Raspberry Pi buses you
+ must set set_as_inputs to False so that they remain in
+ I2C mode.
+ The pigpio daemon should have been started with a higher
+ than default sample rate.
+ For an I2C bus rate of 100Kbps sudo pigpiod -s 2 should work.
+ A message is printed for each I2C transaction formatted with
+ "[" for the START
+ "XX" two hex characters for each data byte
+ "+" if the data is ACKd, "-" if the data is NACKd
+ "]" for the STOP
+ E.g. Reading the X, Y, Z values from an ADXL345 gives:
+ [A6+32+]
+ [A7+01+FF+F2+FF+06+00-]
+ """
+ self.pi = pi
+ self.gSCL = SCL
+ self.gSDA = SDA
+ self.FALLING = 0
+ self.RISING = 1
+ self.STEADY = 2
+ self.in_data = False
+ self.byte = 0
+ self.bit = 0
+ self.oldSCL = 1
+ self.oldSDA = 1
+ self.transact = ""
+ if set_as_inputs:
+ self.pi.set_mode(SCL, pigpio.INPUT)
+ self.pi.set_mode(SDA, pigpio.INPUT)
+ self.cbA = self.pi.callback(SCL, pigpio.EITHER_EDGE, self._cb)
+ self.cbB = self.pi.callback(SDA, pigpio.EITHER_EDGE, self._cb)
+ def _parse(self, SCL, SDA):
+ """
+ Accumulate all the data between START and STOP conditions
+ into a string and output when STOP is detected.
+ """
+ if SCL != self.oldSCL:
+ self.oldSCL = SCL
+ if SCL:
+ xSCL = self.RISING
+ else:
+ xSCL = self.FALLING
+ else:
+ xSCL = self.STEADY
+ if SDA != self.oldSDA:
+ self.oldSDA = SDA
+ if SDA:
+ xSDA = self.RISING
+ else:
+ xSDA = self.FALLING
+ else:
+ xSDA = self.STEADY
+ if xSCL == self.RISING:
+ if self.in_data:
+ if self.bit < 8:
+ self.byte = (self.byte << 1) | SDA
+ self.bit += 1
+ else:
+ self.transact += '{:02X}'.format(self.byte)
+ if SDA:
+ self.transact += '-'
+ else:
+ self.transact += '+'
+ self.bit = 0
+ self.byte = 0
+ elif xSCL == self.STEADY:
+ if xSDA == self.RISING:
+ if SCL:
+ self.in_data = False
+ self.byte = 0
+ self.bit = 0
+ self.transact += ']' # STOP
+ print (self.transact)
+ self.transact = ""
+ if xSDA == self.FALLING:
+ if SCL:
+ self.in_data = True
+ self.byte = 0
+ self.bit = 0
+ self.transact += '[' # START
+ def _cb(self, gpio, level, tick):
+ """
+ Check which line has altered state (ignoring watchdogs) and
+ call the parser with the new state.
+ """
+ SCL = self.oldSCL
+ SDA = self.oldSDA
+ if gpio == self.gSCL:
+ if level == 0:
+ SCL = 0
+ elif level == 1:
+ SCL = 1
+ if gpio == self.gSDA:
+ if level == 0:
+ SDA = 0
+ elif level == 1:
+ SDA = 1
+ self._parse(SCL, SDA)
+ def cancel(self):
+ """Cancel the I2C callbacks."""
+ self.cbA.cancel()
+ self.cbB.cancel()
+if __name__ == "__main__":
+ import time
+ import pigpio
+ import I2C_sniffer
+ pi = pigpio.pi()
+ s = I2C_sniffer.sniffer(pi, 1, 0, False) # leave gpios 1/0 in I2C mode
+ time.sleep(60000)
+ s.cancel()
+ pi.stop()
new file mode 100644
index 0000000..68aa6a7
--- /dev/null
@@ -0,0 +1,2 @@
+A program to passively sniff I2C transactions (100kHz bus maximum) and display the results.
new file mode 100644
index 0000000..7b3cf33
--- /dev/null
@@ -0,0 +1,2 @@
+Class to hash a code from an IR receiver (reading an IR remote control).
new file mode 100755
index 0000000..61a443f
--- /dev/null
@@ -0,0 +1,165 @@
+#!/usr/bin/env python
+import pigpio
+class hasher:
+ """
+ This class forms a hash over the IR pulses generated by an
+ IR remote.
+ The remote key press is not converted into a code in the manner of
+ the lirc module. No attempt is made to decode the type of protocol
+ used by the remote. The hash is likely to be unique for different
+ keys and different remotes but this is not guaranteed.
+ This hashing process works for some remotes/protocols but not for
+ others. The only way to find out if it works for one or more of
+ your remotes is to try it and see.
+ #!/usr/bin/env python
+ import time
+ import pigpio
+ import ir_hasher
+ def callback(hash):
+ print("hash={}".format(hash));
+ pi = pigpio.pi()
+ ir = ir_hasher.hasher(pi, 7, callback, 5)
+ print("ctrl c to exit");
+ time.sleep(300)
+ pi.stop()
+ """
+ def __init__(self, pi, gpio, callback, timeout=5):
+ """
+ Initialises an IR remote hasher on a pi's gpio. A gap of timeout
+ milliseconds indicates the end of the remote key press.
+ """
+ self.pi = pi
+ self.gpio = gpio
+ self.code_timeout = timeout
+ self.callback = callback
+ self.in_code = False
+ pi.set_mode(gpio, pigpio.INPUT)
+ self.cb = pi.callback(gpio, pigpio.EITHER_EDGE, self._cb)
+ def _hash(self, old_val, new_val):
+ if new_val < (old_val * 0.60):
+ val = 13
+ elif old_val < (new_val * 0.60):
+ val = 23
+ else:
+ val = 2
+ self.hash_val = self.hash_val ^ val
+ self.hash_val *= 16777619 # FNV_PRIME_32
+ self.hash_val = self.hash_val & ((1<<32)-1)
+ def _cb(self, gpio, level, tick):
+ if level != pigpio.TIMEOUT:
+ if self.in_code == False:
+ self.in_code = True
+ self.pi.set_watchdog(self.gpio, self.code_timeout)
+ self.hash_val = 2166136261 # FNV_BASIS_32
+ self.edges = 1
+ self.t1 = None
+ self.t2 = None
+ self.t3 = None
+ self.t4 = tick
+ else:
+ self.edges += 1
+ self.t1 = self.t2
+ self.t2 = self.t3
+ self.t3 = self.t4
+ self.t4 = tick
+ if self.t1 is not None:
+ d1 = pigpio.tickDiff(self.t1,self.t2)
+ d2 = pigpio.tickDiff(self.t3,self.t4)
+ self._hash(d1, d2)
+ else:
+ if self.in_code:
+ self.in_code = False
+ self.pi.set_watchdog(self.gpio, 0)
+ if self.edges > 12:
+ self.callback(self.hash_val)
+if __name__ == "__main__":
+ import time
+ import pigpio
+ import ir_hasher
+ hashes = {
+ 142650387: '2', 244341844: 'menu', 262513468: 'vol-',
+ 272048826: '5', 345069212: '6', 363685443: '',
+ 434191356: '1', 492745084: 'OK', 549497027: 'mute',
+ 603729091: 'text', 646476378: 'chan-', 832916949: 'home',
+ 923778138: 'power', 938165610: 'power', 953243510: 'forward',
+ 1009731980:'1', 1018231875:'TV', 1142888517:'c-up',
+ 1151589683:'chan+', 1344018636:'OK', 1348032067:'chan+',
+ 1367109971:'', 1370712102:'c-left', 1438405361:'rewind',
+ 1452589043:'pause', 1518578730:'chan-', 1554432645:'8',
+ 1583569525:'0', 1629745313:'rewind', 1666513749:'record',
+ 1677653754:'c-down', 1825951717:'c-right', 1852412236:'6',
+ 1894279468:'9', 1904895749:'vol+', 1941947509:'ff',
+ 2076573637:'0', 2104823531:'back', 2141641957:'home',
+ 2160787557:'record', 2398525299:'7', 2468117013:'8',
+ 2476712746:'play', 2574308838:'forward', 2577952149:'4',
+ 2706654902:'stop', 2829002741:'c-up', 2956097083:'back',
+ 3112717386:'5', 3263244773:'ff', 3286088195:'pause',
+ 3363767978:'c-down', 3468076364:'vol-', 3491068358:'stop',
+ 3593710134:'c-left', 3708232515:'3', 3734134565:'back',
+ 3766109107:'TV', 3798010010:'play', 3869937700:'menu',
+ 3872715523:'7', 3885097091:'2', 3895301587:'text',
+ 3931058739:'mute', 3983900853:'c-right', 4032250885:'4',
+ 4041913909:'vol+', 4207017660:'9', 4227138677:'back',
+ 4294027955:'3'}
+ def callback(hash):
+ if hash in hashes:
+ print("key={} hash={}".format(hashes[hash], hash));
+ pi = pigpio.pi()
+ ir = ir_hasher.hasher(pi, 7, callback, 5)
+ print("ctrl c to exit");
+ time.sleep(300)
+ pi.stop()
new file mode 100644
index 0000000..2ceb79c
--- /dev/null
@@ -0,0 +1,2 @@
+Script to transmit the morse code corresponding to a text string.
new file mode 100755
index 0000000..a452747
--- /dev/null
@@ -0,0 +1,72 @@
+#!/usr/bin/env python
+import pigpio
+'a':'.-' , 'b':'-...' , 'c':'-.-.' , 'd':'-..' , 'e':'.' ,
+'f':'..-.' , 'g':'--.' , 'h':'....' , 'i':'..' , 'j':'.---' ,
+'k':'-.-' , 'l':'.-..' , 'm':'--' , 'n':'-.' , 'o':'---' ,
+'p':'.--.' , 'q':'--.-' , 'r':'.-.' , 's':'...' , 't':'-' ,
+'u':'..-' , 'v':'...-' , 'w':'.--' , 'x':'-..-' , 'y':'-.--' ,
+'z':'--..' , '1':'.----', '2':'..---', '3':'...--', '4':'....-',
+'5':'.....', '6':'-....', '7':'--...', '8':'---..', '9':'----.',
+def transmit_string(pi, gpio, str):
+ pi.wave_clear() # start a new waveform
+ wf=[]
+ for C in str:
+ c=C.lower()
+ print(c)
+ if c in morse:
+ k = morse[c]
+ for x in k:
+ if x == '.':
+ wf.append(pigpio.pulse(1<<gpio, NONE, DOT * MICROS))
+ else:
+ wf.append(pigpio.pulse(1<<gpio, NONE, DASH * MICROS))
+ wf.append(pigpio.pulse(NONE, 1<<gpio, GAP * MICROS))
+ wf.append(pigpio.pulse(NONE, 1<<gpio, LETTER_GAP * MICROS))
+ elif c == ' ':
+ wf.append(pigpio.pulse(NONE, 1<<gpio, WORD_GAP * MICROS))
+ pi.wave_add_generic(wf)
+ pi.wave_tx_start()
+pi = pigpio.pi()
+pi.set_mode(GPIO, pigpio.OUTPUT)
+transmit_string(pi, GPIO, "Now is the winter of our discontent")
+while pi.wave_tx_busy():
+ pass
+transmit_string(pi, GPIO, "made glorious summer by this sun of York")
+while pi.wave_tx_busy():
+ pass
diff --git a/EXAMPLES/Python/PCF8591_YL-40/ b/EXAMPLES/Python/PCF8591_YL-40/
new file mode 100755
index 0000000..c2c898c
--- /dev/null
+++ b/EXAMPLES/Python/PCF8591_YL-40/
@@ -0,0 +1,64 @@
+#!/usr/bin/env python
+# 2014-08-26
+import time
+import curses
+import pigpio
+# sudo pigpiod
+# ./
+# Connect Pi 5V - VCC, Ground - Ground, SDA - SDA, SCL - SCL.
+pi = pigpio.pi() # Connect to local Pi.
+handle = pi.i2c_open(1, YL_40, 0)
+stdscr = curses.initscr()
+aout = 0
+stdscr.addstr(10, 0, "Brightness")
+stdscr.addstr(12, 0, "Temperature")
+stdscr.addstr(14, 0, "AOUT->AIN2")
+stdscr.addstr(16, 0, "Resistor")
+ while True:
+ for a in range(0,4):
+ aout = aout + 1
+ pi.i2c_write_byte_data(handle, 0x40 | ((a+1) & 0x03), aout&0xFF)
+ v = pi.i2c_read_byte(handle)
+ hashes = v / 4
+ spaces = 64 - hashes
+ stdscr.addstr(10+a*2, 12, str(v) + ' ')
+ stdscr.addstr(10+a*2, 16, '#' * hashes + ' ' * spaces )
+ stdscr.refresh()
+ time.sleep(0.04)
+ c = stdscr.getch()
+ if c != curses.ERR:
+ break
+ pass
diff --git a/EXAMPLES/Python/PCF8591_YL-40/README b/EXAMPLES/Python/PCF8591_YL-40/README
new file mode 100644
index 0000000..cdd9374
--- /dev/null
+++ b/EXAMPLES/Python/PCF8591_YL-40/README
@@ -0,0 +1,2 @@
+Script to display readings from the (I2C) PCF8591.
new file mode 100644
index 0000000..4c6e46b
--- /dev/null
@@ -0,0 +1,2 @@
+Script to benchmark the pigpio Python module's performance.
new file mode 100755
index 0000000..0a66bdc
--- /dev/null
@@ -0,0 +1,91 @@
+#!/usr/bin/env python
+# #
+# Unless you know what you are doing don't run this script with anything #
+# connected to the gpios. You will CAUSE damage. #
+# #
+import time
+import pigpio
+delay = 30
+class gpioTest:
+ def __init__(self, pi, gpio, edge, freq, duty):
+ self.pi = pi
+ self.gpio = gpio
+ self.edge = edge
+ self.freq = freq
+ self.duty = duty
+ self.calls = 0
+ def cb(self, g, t, l):
+ self.calls = self.calls + 1
+ # print g,t,l
+ def num(self):
+ return self.calls
+ def start(self):
+ self.pi.set_PWM_frequency(self.gpio, self.freq)
+ self.pi.set_PWM_range(self.gpio, 25)
+ self.pi.set_PWM_dutycycle(self.gpio, self.duty)
+ self.n = self.pi.callback(self.gpio, self.edge, self.cb)
+ def stop(self):
+ self.pi.set_PWM_dutycycle(self.gpio, 0)
+ self.n.cancel()
+pi = pigpio.pi()
+t1 = gpioTest(pi, 4, pigpio.EITHER_EDGE, 4000, 1)
+t2 = gpioTest(pi, 7, pigpio.RISING_EDGE, 8000, 2)
+t3 = gpioTest(pi, 8, pigpio.FALLING_EDGE, 8000, 3)
+t4 = gpioTest(pi, 9, pigpio.EITHER_EDGE, 4000, 4)
+t5 = gpioTest(pi,10, pigpio.RISING_EDGE, 8000, 5)
+t6 = gpioTest(pi,11, pigpio.FALLING_EDGE, 8000, 6)
+t7 = gpioTest(pi,14, pigpio.EITHER_EDGE, 4000, 7)
+t8 = gpioTest(pi,15, pigpio.RISING_EDGE, 8000, 8)
+t9 = gpioTest(pi,17, pigpio.FALLING_EDGE, 8000, 9)
+t10 = gpioTest(pi,18, pigpio.EITHER_EDGE, 4000, 10)
+t11 = gpioTest(pi,22, pigpio.RISING_EDGE, 8000, 11)
+t12 = gpioTest(pi,23, pigpio.FALLING_EDGE, 8000, 12)
+t13 = gpioTest(pi,24, pigpio.EITHER_EDGE, 4000, 13)
+t14 = gpioTest(pi,25, pigpio.RISING_EDGE, 8000, 14)
+# R1: 0 1 4 7 8 9 10 11 14 15 17 18 21 22 23 24 25
+# R2: 2 3 4 7 8 9 10 11 14 15 17 18 22 23 24 25 27 28 29 30 31
+tests = [t1, t2, t3, t4, t5, t6, t7, t8, t9, t10, t11, t12, t13, t14]
+for i in tests: i.start()
+for i in tests: i.stop()
+tot = 0
+msg = ""
+for i in tests:
+ tot += i.num()
+ msg += str(i.num()) + " "
+print("eps={} ({}/{})".format(tot/delay, tot, delay))
new file mode 100644
index 0000000..8dbe585
--- /dev/null
@@ -0,0 +1,2 @@
+Class to decode a mechanical rotary encoder.
new file mode 100755
index 0000000..884ff61
--- /dev/null
@@ -0,0 +1,135 @@
+#!/usr/bin/env python
+import pigpio
+class decoder:
+ """Class to decode mechanical rotary encoder pulses."""
+ def __init__(self, pi, gpioA, gpioB, callback):
+ """
+ Instantiate the class with the pi and gpios connected to
+ rotary encoder contacts A and B. The common contact
+ should be connected to ground. The callback is
+ called when the rotary encoder is turned. It takes
+ one parameter which is +1 for clockwise and -1 for
+ counterclockwise.
+ import time
+ import pigpio
+ import rotary_encoder
+ pos = 0
+ def callback(way):
+ global pos
+ pos += way
+ print("pos={}".format(pos))
+ pi = pigpio.pi()
+ decoder = rotary_encoder.decoder(pi, 7, 8, callback)
+ time.sleep(300)
+ decoder.cancel()
+ pi.stop()
+ """
+ self.pi = pi
+ self.gpioA = gpioA
+ self.gpioB = gpioB
+ self.callback = callback
+ self.levA = 0
+ self.levB = 0
+ self.lastGpio = None
+ self.pi.set_mode(gpioA, pigpio.INPUT)
+ self.pi.set_mode(gpioB, pigpio.INPUT)
+ self.pi.set_pull_up_down(gpioA, pigpio.PUD_UP)
+ self.pi.set_pull_up_down(gpioB, pigpio.PUD_UP)
+ self.cbA = self.pi.callback(gpioA, pigpio.EITHER_EDGE, self._pulse)
+ self.cbB = self.pi.callback(gpioB, pigpio.EITHER_EDGE, self._pulse)
+ def _pulse(self, gpio, level, tick):
+ """
+ Decode the rotary encoder pulse.
+ +---------+ +---------+ 0
+ | | | |
+ A | | | |
+ | | | |
+ +---------+ +---------+ +----- 1
+ +---------+ +---------+ 0
+ | | | |
+ B | | | |
+ | | | |
+ ----+ +---------+ +---------+ 1
+ """
+ if gpio == self.gpioA:
+ self.levA = level
+ else:
+ self.levB = level;
+ if gpio != self.lastGpio: # debounce
+ self.lastGpio = gpio
+ if gpio == self.gpioA and level == 1:
+ if self.levB == 1:
+ self.callback(1)
+ elif gpio == self.gpioB and level == 1:
+ if self.levA == 1:
+ self.callback(-1)
+ def cancel(self):
+ """
+ Cancel the rotary encoder decoder.
+ """
+ self.cbA.cancel()
+ self.cbB.cancel()
+if __name__ == "__main__":
+ import time
+ import pigpio
+ import rotary_encoder
+ pos = 0
+ def callback(way):
+ global pos
+ pos += way
+ print("pos={}".format(pos))
+ pi = pigpio.pi()
+ decoder = rotary_encoder.decoder(pi, 7, 8, callback)
+ time.sleep(300)
+ decoder.cancel()
+ pi.stop()
diff --git a/EXAMPLES/Python/Readme b/EXAMPLES/Python/Readme
deleted file mode 100644
index 499e7cb..0000000
--- a/EXAMPLES/Python/Readme
+++ /dev/null
@@ -1 +0,0 @@
-pigpio Python examples
new file mode 100644
index 0000000..effc9e6
--- /dev/null
@@ -0,0 +1,2 @@
+Class to read sonar rangers with separate trigger and echo pins.
new file mode 100755
index 0000000..9485ef4
--- /dev/null
@@ -0,0 +1,114 @@
+#!/usr/bin/env python
+import time
+import pigpio
+class ranger:
+ """
+ This class encapsulates a type of acoustic ranger. In particular
+ the type of ranger with separate trigger and echo pins.
+ A pulse on the trigger initiates the sonar ping and shortly
+ afterwards a sonar pulse is transmitted and the echo pin
+ goes high. The echo pins stays high until a sonar echo is
+ received (or the response times-out). The time between
+ the high and low edges indicates the sonar round trip time.
+ """
+ def __init__(self, pi, trigger, echo):
+ """
+ The class is instantiated with the Pi to use and the
+ gpios connected to the trigger and echo pins.
+ """
+ self.pi = pi
+ self._trig = trigger
+ self._echo = echo
+ self._ping = False
+ self._high = None
+ self._time = None
+ self._triggered = False
+ self._trig_mode = pi.get_mode(self._trig)
+ self._echo_mode = pi.get_mode(self._echo)
+ pi.set_mode(self._trig, pigpio.OUTPUT)
+ pi.set_mode(self._echo, pigpio.INPUT)
+ self._cb = pi.callback(self._trig, pigpio.EITHER_EDGE, self._cbf)
+ self._cb = pi.callback(self._echo, pigpio.EITHER_EDGE, self._cbf)
+ self._inited = True
+ def _cbf(self, gpio, level, tick):
+ if gpio == self._trig:
+ if level == 0: # trigger sent
+ self._triggered = True
+ self._high = None
+ else:
+ if self._triggered:
+ if level == 1:
+ self._high = tick
+ else:
+ if self._high is not None:
+ self._time = tick - self._high
+ self._high = None
+ self._ping = True
+ def read(self):
+ """
+ Triggers a reading. The returned reading is the number
+ of microseconds for the sonar round-trip.
+ round trip cms = round trip time / 1000000.0 * 34030
+ """
+ if self._inited:
+ self._ping = False
+ self.pi.gpio_trigger(self._trig)
+ start = time.time()
+ while not self._ping:
+ if (time.time()-start) > 5.0:
+ return 20000
+ time.sleep(0.001)
+ return self._time
+ else:
+ return None
+ def cancel(self):
+ """
+ Cancels the ranger and returns the gpios to their
+ original mode.
+ """
+ if self._inited:
+ self._inited = False
+ self._cb.cancel()
+ self.pi.set_mode(self._trig, self._trig_mode)
+ self.pi.set_mode(self._echo, self._echo_mode)
+if __name__ == "__main__":
+ import time
+ import pigpio
+ import sonar_trigger_echo
+ pi = pigpio.pi()
+ sonar = sonar_trigger_echo.ranger(pi, 23, 18)
+ end = time.time() + 600.0
+ r = 1
+ while time.time() < end:
+ print("{} {}".format(r,
+ r += 1
+ time.sleep(0.03)
+ sonar.cancel()
+ pi.stop()
new file mode 100644
index 0000000..106c186
--- /dev/null
@@ -0,0 +1,2 @@
+Class to send and receive radio messages compatible with the Virtual Wire library for Arduinos.
new file mode 100755
index 0000000..e871f4f
--- /dev/null
@@ -0,0 +1,372 @@
+#!/usr/bin/env python
+This module provides a 313MHz/434MHz radio interface compatible
+with the Virtual Wire library used on Arduinos.
+It has been tested between a Pi, TI Launchpad, and Arduino Pro Mini.
+# 2014-08-14
+import time
+import pigpio
+_HEADER=[0x2a, 0x2a, 0x2a, 0x2a, 0x2a, 0x2a, 0x38, 0x2c]
+ 0x0d, 0x0e, 0x13, 0x15, 0x16, 0x19, 0x1a, 0x1c,
+ 0x23, 0x25, 0x26, 0x29, 0x2a, 0x2c, 0x32, 0x34]
+def _sym2nibble(symbol):
+ for nibble in range(16):
+ if symbol == _SYMBOL[nibble]:
+ return nibble
+ return 0
+def _crc_ccitt_update(crc, data):
+ data = data ^ (crc & 0xFF);
+ data = (data ^ (data << 4)) & 0xFF;
+ return (
+ (((data << 8) & 0xFFFF) | (crc >> 8)) ^
+ ((data >> 4) & 0x00FF) ^ ((data << 3) & 0xFFFF)
+ )
+class tx():
+ def __init__(self, pi, txgpio, bps=2000):
+ """
+ Instantiate a transmitter with the Pi, the transmit gpio,
+ and the bits per second (bps). The bps defaults to 2000.
+ The bps is constrained to be within MIN_BPS to MAX_BPS.
+ """
+ self.pi = pi
+ self.txbit = (1<<txgpio)
+ if bps < MIN_BPS:
+ bps = MIN_BPS
+ elif bps > MAX_BPS:
+ bps = MAX_BPS
+ self.mics = int(1000000 / bps)
+ self.wave_id = None
+ pi.wave_add_new()
+ pi.set_mode(txgpio, pigpio.OUTPUT)
+ def _nibble(self, nibble):
+ for i in range(6):
+ if nibble & (1<<i):
+, 0, self.mics))
+ else:
+, self.txbit, self.mics))
+ def _byte(self, crc, byte):
+ self._nibble(_SYMBOL[byte>>4])
+ self._nibble(_SYMBOL[byte&0x0F])
+ return _crc_ccitt_update(crc, byte)
+ def put(self, data):
+ """
+ Transmit a message. If the message is more than
+ MAX_MESSAGE_BYTES in size it is discarded. If a message
+ is currently being transmitted it is aborted and replaced
+ with the new message. True is returned if message
+ transmission has successfully started. False indicates
+ an error.
+ """
+ if len(data) > MAX_MESSAGE_BYTES:
+ return False
+ = []
+ self.cancel()
+ for i in _HEADER:
+ self._nibble(i)
+ crc = self._byte(0xFFFF, len(data)+_CTL)
+ for i in data:
+ if type(i) == type(""):
+ v = ord(i)
+ else:
+ v = i
+ crc = self._byte(crc, v)
+ crc = ~crc
+ self._byte(0, crc&0xFF)
+ self._byte(0, crc>>8)
+ self.pi.wave_add_generic(
+ self.wave_id = self.pi.wave_create()
+ if self.wave_id >= 0:
+ self.pi.wave_send_once(self.wave_id)
+ return True
+ else:
+ return False
+ def ready(self):
+ """
+ Returns True if a new message may be transmitted.
+ """
+ return not self.pi.wave_tx_busy()
+ def cancel(self):
+ """
+ Cancels the wireless transmitter, aborting any message
+ in progress.
+ """
+ if self.wave_id is not None:
+ self.pi.wave_tx_stop()
+ self.pi.wave_delete(self.wave_id)
+ self.pi.wave_add_new()
+ self.wave_id = None
+class rx():
+ def __init__(self, pi, rxgpio, bps=2000):
+ """
+ Instantiate a receiver with the Pi, the receive gpio, and
+ the bits per second (bps). The bps defaults to 2000.
+ The bps is constrained to be within MIN_BPS to MAX_BPS.
+ """
+ self.pi = pi
+ self.rxgpio = rxgpio
+ self.messages = []
+ self.bad_CRC = 0
+ if bps < MIN_BPS:
+ bps = MIN_BPS
+ elif bps > MAX_BPS:
+ bps = MAX_BPS
+ slack = 0.20
+ self.mics = int(1000000 / bps)
+ slack_mics = int(slack * self.mics)
+ self.min_mics = self.mics - slack_mics # Shortest legal edge.
+ self.max_mics = (self.mics + slack_mics) * 4 # Longest legal edge.
+ self.timeout = 8 * self.mics / 1000 # 8 bits time in ms.
+ if self.timeout < 8:
+ self.timeout = 8
+ self.last_tick = None
+ self.good = 0
+ self.bits = 0
+ self.token = 0
+ self.in_message = False
+ self.message = [0]*(MAX_MESSAGE_BYTES+_CTL)
+ self.message_len = 0
+ self.byte = 0
+ pi.set_mode(rxgpio, pigpio.INPUT)
+ self.cb = pi.callback(rxgpio, pigpio.EITHER_EDGE, self._cb)
+ def _calc_crc(self):
+ crc = 0xFFFF
+ for i in range(self.message_length):
+ crc = _crc_ccitt_update(crc, self.message[i])
+ return crc
+ def _insert(self, bits, level):
+ for i in range(bits):
+ self.token >>= 1
+ if level == 0:
+ self.token |= 0x800
+ if self.in_message:
+ self.bits += 1
+ if self.bits >= 12: # Complete token.
+ byte = (
+ _sym2nibble(self.token & 0x3f) << 4 |
+ _sym2nibble(self.token >> 6))
+ if self.byte == 0:
+ self.message_length = byte
+ if byte > (MAX_MESSAGE_BYTES+_CTL):
+ self.in_message = False # Abort message.
+ return
+ self.message[self.byte] = byte
+ self.byte += 1
+ self.bits = 0
+ if self.byte >= self.message_length:
+ self.in_message = False
+ self.pi.set_watchdog(self.rxgpio, 0)
+ crc = self._calc_crc()
+ if crc == 0xF0B8: # Valid CRC.
+ self.messages.append(
+ self.message[1:self.message_length-2])
+ else:
+ self.bad_CRC += 1
+ else:
+ if self.token == 0xB38: # Start message token.
+ self.in_message = True
+ self.pi.set_watchdog(self.rxgpio, self.timeout)
+ self.bits = 0
+ self.byte = 0
+ def _cb(self, gpio, level, tick):
+ if self.last_tick is not None:
+ if level == pigpio.TIMEOUT:
+ self.pi.set_watchdog(self.rxgpio, 0) # Switch watchdog off.
+ if self.in_message:
+ self._insert(4, not self.last_level)
+ self.good = 0
+ self.in_message = False
+ else:
+ edge = pigpio.tickDiff(self.last_tick, tick)
+ if edge < self.min_mics:
+ self.good = 0
+ self.in_message = False
+ elif edge > self.max_mics:
+ if self.in_message:
+ self._insert(4, level)
+ self.good = 0
+ self.in_message = False
+ else:
+ self.good += 1
+ if self.good > 8:
+ bitlen = (100 * edge) / self.mics
+ if bitlen < 140:
+ bits = 1
+ elif bitlen < 240:
+ bits = 2
+ elif bitlen < 340:
+ bits = 3
+ else:
+ bits = 4
+ self._insert(bits, level)
+ self.last_tick = tick
+ self.last_level = level
+ def get(self):
+ """
+ Returns the next unread message, or None if none is avaiable.
+ """
+ if len(self.messages):
+ return self.messages.pop(0)
+ else:
+ return None
+ def ready(self):
+ """
+ Returns True if there is a message available to be read.
+ """
+ return len(self.messages)
+ def cancel(self):
+ """
+ Cancels the wireless receiver.
+ """
+ if self.cb is not None:
+ self.cb.cancel()
+ self.pi.set_watchdog(self.rxgpio, 0)
+ self.cb = None
+if __name__ == "__main__":
+ import time
+ import pigpio
+ import vw
+ RX=11
+ TX=25
+ BPS=2000
+ pi = pigpio.pi() # Connect to local Pi.
+ rx = vw.rx(pi, RX, BPS) # Specify Pi, rx gpio, and baud.
+ tx = vw.tx(pi, TX, BPS) # Specify Pi, tx gpio, and baud.
+ msg = 0
+ start = time.time()
+ while (time.time()-start) < 300:
+ msg += 1
+ while not tx.ready():
+ time.sleep(0.1)
+ time.sleep(0.2)
+ tx.put([48, 49, 65, ((msg>>6)&0x3F)+32, (msg&0x3F)+32])
+ while not tx.ready():
+ time.sleep(0.1)
+ time.sleep(0.2)
+ tx.put("Hello World #{}!".format(msg))
+ while rx.ready():
+ print("".join(chr (c) for c in rx.get()))
+ rx.cancel()
+ tx.cancel()
+ pi.stop()
new file mode 100644
index 0000000..e19bc7c
--- /dev/null
@@ -0,0 +1,2 @@
+Class to decode a Wiegand code.
new file mode 100755
index 0000000..f7c4610
--- /dev/null
@@ -0,0 +1,135 @@
+#!/usr/bin/env python
+import pigpio
+class decoder:
+ """
+ A class to read Wiegand codes of an arbitrary length.
+ The code length and value are returned.
+ #!/usr/bin/env python
+ import time
+ import pigpio
+ import wiegand
+ def callback(bits, code):
+ print("bits={} code={}".format(bits, code))
+ pi = pigpio.pi()
+ w = wiegand.decoder(pi, 14, 15, callback)
+ time.sleep(300)
+ w.cancel()
+ pi.stop()
+ """
+ def __init__(self, pi, gpio_0, gpio_1, callback, bit_timeout=5):
+ """
+ Instantiate with the pi, gpio for 0 (green wire), the gpio for 1
+ (white wire), the callback function, and the bit timeout in
+ milliseconds which indicates the end of a code.
+ The callback is passed the code length in bits and the value.
+ """
+ self.pi = pi
+ self.gpio_0 = gpio_0
+ self.gpio_1 = gpio_1
+ self.callback = callback
+ self.bit_timeout = bit_timeout
+ self.in_code = False
+ self.pi.set_mode(gpio_0, pigpio.INPUT)
+ self.pi.set_mode(gpio_1, pigpio.INPUT)
+ self.pi.set_pull_up_down(gpio_0, pigpio.PUD_UP)
+ self.pi.set_pull_up_down(gpio_1, pigpio.PUD_UP)
+ self.cb_0 = self.pi.callback(gpio_0, pigpio.FALLING_EDGE, self._cb)
+ self.cb_1 = self.pi.callback(gpio_1, pigpio.FALLING_EDGE, self._cb)
+ def _cb(self, gpio, level, tick):
+ """
+ Accumulate bits until both gpios 0 and 1 timeout.
+ """
+ if level < pigpio.TIMEOUT:
+ if self.in_code == False:
+ self.bits = 1
+ self.num = 0
+ self.in_code = True
+ self.code_timeout = 0
+ self.pi.set_watchdog(self.gpio_0, self.bit_timeout)
+ self.pi.set_watchdog(self.gpio_1, self.bit_timeout)
+ else:
+ self.bits += 1
+ self.num = self.num << 1
+ if gpio == self.gpio_0:
+ self.code_timeout = self.code_timeout & 2 # clear gpio 0 timeout
+ else:
+ self.code_timeout = self.code_timeout & 1 # clear gpio 1 timeout
+ self.num = self.num | 1
+ else:
+ if self.in_code:
+ if gpio == self.gpio_0:
+ self.code_timeout = self.code_timeout | 1 # timeout gpio 0
+ else:
+ self.code_timeout = self.code_timeout | 2 # timeout gpio 1
+ if self.code_timeout == 3: # both gpios timed out
+ self.pi.set_watchdog(self.gpio_0, 0)
+ self.pi.set_watchdog(self.gpio_1, 0)
+ self.in_code = False
+ self.callback(self.bits, self.num)
+ def cancel(self):
+ """
+ Cancel the Wiegand decoder.
+ """
+ self.cb_0.cancel()
+ self.cb_1.cancel()
+if __name__ == "__main__":
+ import time
+ import pigpio
+ import wiegand
+ def callback(bits, value):
+ print("bits={} value={}".format(bits, value))
+ pi = pigpio.pi()
+ w = wiegand.decoder(pi, 14, 15, callback)
+ time.sleep(300)
+ w.cancel()
+ pi.stop()