forked from adafruit/Adafruit_CircuitPython_PIO_UART
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathadafruit_pio_uart.py
214 lines (188 loc) · 6.98 KB
/
adafruit_pio_uart.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# SPDX-FileCopyrightText: Copyright (c) 2023 Scott Shawcroft for Adafruit Industries
#
# SPDX-License-Identifier: MIT
# PIO assembly is based on the examples in the RP2040 datasheet.
# SPDX-FileCopyrightText: Copyright (c) 2020 Raspberry Pi (Trading) Ltd.
# SPDX-License-Identifier: BSD-3-Clause
"""
`adafruit_pio_uart`
================================================================================
PIO implementation of CircuitPython UART API
* Author(s): Scott Shawcroft
"""
import array
import time
import adafruit_pioasm
import busio
import rp2pio
__version__ = "0.0.0+auto.0"
__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_PIO_UART.git"
class UART:
"""PIO implementation of CircuitPython UART API."""
Parity = busio.UART.Parity
def __init__(
self, tx=None, rx=None, baudrate=9600, bits=8, parity=None, stop=1, timeout=1
): # pylint: disable=invalid-name, too-many-arguments
self.bitcount = bits + (1 if parity else 0)
self.bits = bits
self.parity = parity
self.mask = (1 << bits) - 1
self.shift = 8 - (self.bitcount % 8)
self._timeout = timeout
self.rx_pio = None
if rx:
# Minimum viable 8n1 UART receiver. Wait for the start bit, then sample 8 bits
# with the correct timing.
# IN pin 0 is mapped to the GPIO used as UART RX.
# Autopush must be enabled, with a threshold of 8.
# Line by line explanation:
# * Wait for start bit
# * Preload bit counter, delay until eye of first data bit
# * Loop 8 times
# * Sample data
# * Each iteration is 8 cycles
rx_code = adafruit_pioasm.assemble(
".program uart_rx_mini\n"
+ "start:\n"
+ " wait 0 pin 0\n"
+ f" set x, {self.bitcount - 1} [10]\n"
+ "bitloop:\n"
+ " in pins, 1\n"
+ " jmp x-- bitloop [6]\n"
+ " jmp pin good_stop\n"
+ " wait 1 pin 0\n" # Skip IRQ
+ " jmp start\n"
+ "good_stop:\n"
+ " push\n"
)
self.rx_pio = rp2pio.StateMachine(
rx_code,
first_in_pin=rx,
jmp_pin=rx,
frequency=8 * baudrate,
auto_push=False,
push_threshold=self.bitcount,
)
self.tx_pio = None
if tx:
stop_delay = stop * 8 - 1
# An 8n1 UART transmit program.
# OUT pin 0 and side-set pin 0 are both mapped to UART TX pin.
# Line by line explanation:
# * Assert stop bit, or stall with line in idle state
# * Preload bit counter, assert start bit for 8 clocks
# * This loop will run 8 times (8n1 UART)
# * Shift 1 bit from OSR to the first OUT pin
# * Each loop iteration is 8 cycles.
tx_code = adafruit_pioasm.Program(
".program uart_tx\n"
+ ".side_set 1 opt\n"
+ f" pull side 1 [{stop_delay}]\n"
+ f" set x, {self.bitcount - 1} side 0 [7]\n"
+ "bitloop:\n"
+ " out pins, 1\n"
+ " jmp x-- bitloop [6]\n"
)
self.tx_pio = rp2pio.StateMachine(
tx_code.assembled,
first_out_pin=tx,
first_sideset_pin=tx,
frequency=8 * baudrate,
initial_sideset_pin_state=1,
initial_sideset_pin_direction=1,
initial_out_pin_state=1,
initial_out_pin_direction=1,
**tx_code.pio_kwargs,
)
def deinit(self):
"""De-initialize the UART object."""
if self.rx_pio:
self.rx_pio.deinit()
if self.tx_pio:
self.tx_pio.deinit()
@property
def timeout(self):
"""Return the UART timeout."""
return self._timeout
@timeout.setter
def timeout(self, value):
"""Set the UART timeout."""
self._timeout = value
@property
def baudrate(self):
"""Return the UART baudrate."""
if self.tx_pio:
return self.tx_pio.frequency // 8
return self.rx_pio.frequency // 8
@baudrate.setter
def baudrate(self, frequency):
"""Set the UART baudrate."""
if self.rx_pio:
self.rx_pio.frequency = frequency * 8
if self.tx_pio:
self.tx_pio.frequency = frequency * 8
@property
def in_waiting(self):
"""Return whether the UART is waiting."""
return self.rx_pio.in_waiting
def reset_input_buffer(self):
"""Clear the UART input buffer."""
self.rx_pio.clear_rxfifo()
def readinto(self, buf):
"""Read UART data into buf and return the number of bytes read."""
if self.bitcount > 8:
raw_in = array.array("H")
for _ in range(len(buf)):
raw_in.append(0)
else:
raw_in = buf
mem_view = memoryview(raw_in)
count = 0
start_time = time.monotonic()
while count < len(buf) and (
self.timeout == 0 or (time.monotonic() - start_time) < self.timeout
):
waiting = min(len(buf) - count, self.rx_pio.in_waiting)
self.rx_pio.readinto(mem_view[count : count + waiting])
if self.timeout == 0 and waiting == 0:
return None if count == 0 else count
count += waiting
if self.parity is not None:
for i in range(count):
# TODO: Check parity bits instead of just masking them.
buf[i] = (raw_in[i] >> self.shift) & self.mask
return count
def read(self, n):
"""Read and return an array of up to n values from the UART."""
if self.bits > 8:
buf = array.array(n)
else:
buf = bytearray(n)
n = self.readinto(buf)
if n == 0:
return None
if n < len(buf):
return buf[:n]
return buf
def write(self, buf):
"""Write the contents of buf to the UART."""
# Compute parity if we need to
if self.parity:
if self.bitcount > 8:
a = array.array("H") # pylint: disable=invalid-name
for i, v in enumerate(buf): # pylint: disable=invalid-name
a.append(v)
ones = 0
for pos in range(self.bitcount - 1):
if (v & (1 << pos)) != 0:
ones += 1
parity = 0
if self.parity == self.Parity.ODD:
if (ones % 2) == 0:
parity = 1
elif (ones % 2) == 1:
# even parity needs another one if the data is odd
parity = 1
a[i] |= parity << (self.bitcount - 1)
buf = a
return self.tx_pio.write(buf)