Showing posts with label python. Show all posts
Showing posts with label python. Show all posts

Abusing yahi -a log based statistic tool à la awstats- to plot histograms/date series from CSV

Foreword what is yahi?



Yahi is a python module that can installed with pip to make a all in one static html page aggregating the data from a web server. Actually, as shown with the parsing of auth.log in the documentation, it is pretty much a versatile log analyser based on regexp enabling various aggregation : by geo_ip, by histograms, chronological series.
The demo is here. It pretty much is close to awstats which is discontinued or goaccess.

As the author of yahi I may not be very objective, but I claim it is having a quality other tools don't have : it is easy to abuse for format other than web logs and here is an example out of the norm : parsing CSV.

Ploting histograms or time series from CSV

CSV that can be parsed as regexp

There are simple cases when CSV don’t have strings embedded and are litteraly comma separated integers/floats.

In this case, CSV can be parsed as a regexp and it’s all the more convenient when the CSV has no title.

Here is an example using the CSV coming from the CSV generated by trollometre

A line is made off a timestamp followed by various (int) counters.

Tip

For the sake of ease of use I hacked the date_pattern format to accept “%s” as a timestamp (while it’s normally only valid strptime formater)

from archery import mdict
from yahi import notch, shoot
from json import dump
import re


context=notch(
    off="user_agent,geo_ip",
    log_format="custom",
    output_format="json",
    date_pattern="%s",
    log_pattern="""^(?P<datetime>[^,]+),
    (?P<nb_fr>[^,]+),
    (?P<nb_total>[^,]+),?.*
    $""")

date_formater= lambda dt :"%s-%s-%s" % ( dt.year, dt.month, dt.day)
res= shoot(
        context,
        lambda data: mdict({
            "date_fr" :
                mdict({ date_formater(data["_datetime"]) : 
                    int(data["nb_fr"]) }),
            "hour_fr" : 
                mdict({ "%02d" % data["_datetime"].hour : 
                    int(data["nb_fr"]) }),
            "date_all" : 
                mdict({ date_formater(data["_datetime"]) : 
                    int(data["nb_total"]) }),
            "hour_all" : 
                mdict({ "%02d" % data["_datetime"].hour : 
                    int(data["nb_total"]) }),
            "total" : 1
        })
    )
dump(res,open("data.js","w"),  indent=4)

Then, all that remains to do is

python test.py < ~/trollometre.csv && yahi_all_in_one_maker && firefox aio.html

You click on time series and can see the either the chronological time serie

_images/csv_1.png

Or the profile by hour

Raw approach with csv.DictReader

Let’s take the use case where my job insurance sent me the data of all the 10000 jobless persons in my vicinity consisting for each line of :

opaque id,civility,firstname, lastname, email,email of the counseler following the job less person

For this CSV, I have the title as the first line, and have strings that may countain “,”, hence the regexp approach is strongly ill advised.

What we want here is 2 histograms :

  • the frequency of the firstname (that does not violates RGPD) and that I can share,
  • how much each adviser is counseling.

Here is the code

from csv import DictReader
from json import dump
from archery import mdict

res=mdict()
with open("/home/jul/Téléchargements/GEMESCAPEG.csv") as f:
    for l in DictReader(f):
        res+=mdict(by_ref = mdict({l["Referent"]: 1}), by_prenom=mdict({l["Prenom"]:1}))

dump(res, open("data.js", "w"), indent=4)

Then, all that remains to do is

yahi_all_in_one_maker && firefox aio.html

And here we can see that each counseler is following on average ~250 jobless persons.

And the frequency of the firstname

Which correlated with the demographic of the firstname as included here below tends to prove that the older you are the less likeky you are to be jobless.

I am not saying ageism, the data are doing it for me.

The true cost and code of parsing the integrality of (french speaking) bluesky ATPROTO in python

I was reading this news on ycombinator and was flabbergastered by the affirmation of people regarding the cost and complexity of parsing the integrality of bluesky.
>I suspect that the cost of running AT proto servers/relays is prohibitive for smaller players compared to a Mastadon server selectively syndicating with a few peers, but I say this with only a vague understanding of the internals of both of these ecosystems.
NB the news is about contradicting this assertion in terms I don't understand.

Well suspecting is nice, but what about reality ?

I actually run from my family outdated PC a full scan in realtime of bluesky with some python code. And then later :
- AppViews are actual "application backends". Bluesky operates the bsky.app appview, i.e. what people know as the Bluesky app. Importantly, in ATProto, there is no reason for everyone to run their own AppView. You can run one (and it costs about $300/mo to run a Bluesky AppView ingesting all data currently on the network in real time if you want to do that).


Ok, I run without extra costs other than electricity and my everyday life FTTH (common in social housing) my bot.

It actually takes :
  • 25% of CPU
  • less than a third of the domestic bandwidth
  • on a bi proc core i3 with multithreading disactivated
  • on a standard mint distribution
  • using 640Mb of memory

I may not be smart, but the bot actually runs, so without being a specialist, I can assure you not being the sharpest knife in the drawer, you can also do your own atproto bot at home without investing 300$/mo, and in python.

So here is my feedback on making a bot in python that can scan the whole bluesky (sort of) and how much volumetry it is, and actual code and hints to do it.



Volumetry

My bot @trollometre.bsky.social in order to see the most reposted skeets of bluesky in french I must inspect them all with a rate limit of 10 request per seconds.
But scanning the firehose is free. The rate limiting applies on requests such as get_post that I use intensively.
If you look at the volumetry per events of bluesky available here you will notice that posts events are fairly few compared to the totality (like events being the most common) :

With 10 events per seconds of free rate limiting with 50 posts per seconds, it seems scanning the whole bluesky will be tough. (Spoiler alert if you want a fair sampling I will hint on how I experimentally achieved it).

Posts events coming from the firehose are complete and they have a langs field. Hopefully for scanning each posts in french and doing a rate limited scan I have far fewer events per second : 2-3% of the mass, less than 1 post per second. Hence, my bot rely heavily on posts.

Spam and blocked ? What are these ?

Around 10% of the traffic of actively posting every day are showing their piece of flesh that are symptomatic of them being mammals and if not blocked represent two third of the most posted traffic.

For you it maybe ok, for me who is not very found of these if diminishes the signal/noise ratio of what 90% of users wish to convey thus, I decided to block them.

Also, active users representing ~25% traffic of the community in the reminder of the french speaking community (664) may not have been favorably impressed by the initial wording of my bot and its name.

Well, it cannot be helped. I always had terrible taste for naming my projects.

By using the tagging of bluesky as porn and building a blacklist out of it I consolidated a 95% efficiency filter. As my detector tend to show, I converge towards a full list.


Let's talk about coding



Resources



First you don't dive into code without resources. Normaly I shoud direct you towards the official one but ... I hardly can read it.

I did most of the bot by doing frankencode : copy pasting from the python API example.

Then I discovered, having access to the API is quite nice, and I think the source code of the client is clear.



Scanning randomly the whole bluesky without burning your rate limit



I made a mistake at the beginning by taking a non multi worker example of the firehose, and achieved serendipity : by having the event loop and the worker without multiprocessing you starve your workers and seem (must be reproduced to be sure) to actually let you starve your worker under the rate limit.

Websockets ?



I tried the websocket firehose but either I did something wrong or it is unreliable.

The code (insufficiently documented and non PEP8 compliant)

Especially the part on how to run the moving parts.

The database structure is pretty thin : it has one table.

The main part of the code is here : trollometre.py

It is a classical (according to the example) multiprocessing architecture.
Let's tacke some features :

a webscoket interface to administer the HAM/SPAM classification



The main code embed a websocket server and there is an html page as a client.

This web page has code actions for each posts published that are HAM (to tag as a normal content), SPAM (to tag as spam), and (POST).
In order to work you need the flask backend to run.

The learn stuff



Natural language processing is fun, so I added a spam detection as a last line of defense that is built with learn.py.

Score setting



I decided to aim at a pretty constant number of posts per day, the heart of the score setter is an independent process that is here.

Plotting



Once the rrd archive created, nothing beats a perl one liner in a perl one liner to transform the data in the CSV into a proper rrd graph.

Conclusion



I am always feeling an impostor because I cannot speak as loudly and vehemently with technical words as people on ycombinator or losters do.

I write « toy code », that I can show, with graph of it actually RUNNING in my dining room.

And from the experience of my toy code under a free software licence that actually works since 2 months on a random PC that is not fancy, I think YOU can probably give a try at atproto/Bluesky API.

I would pretty much advise to throw my code and starts from the example of MarshalX.

The advantages of HTML as a data model over basic declarative ORM approach

Very often, backend devs don't want to write code.

For this, we use one trick : derive HTML widget for presentation, database access, REST endpoints from ONE SOURCE of truth and we call it MODEL.

A tradition, and I insist it's a conservative tradition, is to use a declarative model where we mad the truth of the model from python classes.

By declaring a class we will implicitly declare it's SQL structure, the HTML input form for human readable interaction and the REST endpoint to access a graph of objects which are all mapped on the database.

Since the arrival of pydantic it makes all the more sense when it comes to empower a strongly type approach in python.

But is it the only one worthy ?

I speak here as a veteran of the trenchline which job is to read a list of entries of customer in an xls file from a project manager and change the faulty value based on the retro-engineering of an HTML formular into whatever the freak the right value is supposed to be.

In this case your job is in fact to short circuit the web framework to which you don't have access to change values directly into the database.

More often than never is these real life case you don't have access to the team who built the framework (to much bureaucracy to even get a question answered before the situation gets critical) ... So you look at the form.

And you guess the name of the table that is impacted by looking at the « network tab » in the developper GUI when you hit the submit button.

And you guess the name of the field impacted in the table to guess the name of the columns.

And then you use your only magical tool which is a write access to the database to reflect the expected object with an automapper and change values.

You could do it raw SQL I agree, but sometimes you need to do a web query in the middle to change the value because you have to ask a REST service what is the new ID of the client.

And you see the more this experience of having to tweak into real life frameworks that often surprise users for the sake of the limitation of the source of truth, the more I want the HTML to be the source of truth.

The most stoïcian approach to full stack framework approach : to derive Everything from an HTML page.

The views, the controllers, the route, the model in such a true way that if you modify the HTML you modify in real time the database model, the routes, the displayed form.



What are the advantages of HTML as a declarative language ?



Here, one of the tradition is to prefere the human readable languages such as YAML and JSON, or machine readable as XML over HTML.

However, JSON and YAML are more limited in expressiveness of data structure than HTML (you can have a dict as a key in a dict in json ? Me I can.)

And on the other hand XML is quite a pain to read and write without mistakes.

HTML is just XML



HTML is a lax and lenient grammarless XML. No parsers will raise an exception because you wrote "<br>" instead of "<br/>" (or the opposite). You can add non existent attributes to tags and the parser will understand this easily without you having to redefine a full fledge grammar.

HTML is an XML YOU CAN SEE.



There are some tags that are related to a grammar of visual widget to which non computer people are familiar with.

If you use a FORM as a mapping to a database table, and all input inside has A column name you have already input drawn on your screen.



Modern « remote procedure call » are web based



Call it RPC, call it soap, call it REST, nowadays the web technologies trust 99% of how computer systems exchange data between each others.

You buy something on the internet, at the end you interact with a web formular or a web call. Hence, we can assert with strong convictions that 100% of web technologies can serve web pages. Thus, if you use your html as a model and present it, therefore you can deduce the data model from the form without needing a new pivoting language.

Proof of concept



For the convenience of « fun » we are gonna imagine a backend for « agile by micro blogging » (à la former twitter).

We are gonna assume the platform is structured micro blogging around where agile shines the most : not when things are done, but to move things on.

Things that are done will be called statements. Like : « software is delivered. Here is a factoid (a git url for instance) ». We will call this nodes in a graph and are they will be supposed to immutable states that can't be contested.

Each statement answers another statement's factoid like a delivery statement tends to follow a story point (at least should lead by the mean of a transition.

Hence in this application we will mirco-blog about the transition ... like on a social network with members of concerned group.
The idea of the application is to replace scrum meetings with micro blogging.

Are you blocked ? Do you need anything ? Can be answered on the mirco blogging platform, and every threads that are presented archived, used for machine learning (about what you want to hear as a good news) in a data form that is convenient for large language model.

As such we want to harvest a text long enough to express emotions, constricted to a laughingly small amount of characters so that finesse and ambiguity are tough to raise. That's the heart of the application : harvesting comments tagged with associated emotions to ease the work of tagging for Artificial Intelligence.

Hear me out, this is just a stupid idea of mine to illustrate a graph like structure described with HTML, not a real life idea. Me I just love to represent State Machine Diagram with everything that fall under my hands.

Here is the entity relationship diagram I have in mind :


Let's see what a table declaration might look like in HTML, let's say transition :


<form action=/transition  >
	<input type=number name=id />
	<input type=number name=user_group_id nullable=false reference=user_group.id />
	<textarea name=message rows=10 cols=50 nullable=false ></textarea>
	<input type=url name=factoid />
	<select name="emotion_for_group_triggered" value=neutral >
		<option value="">please select a value</option>
		<option value=positive >Positive</option>
		<option value=neutral >Neutral</option>
		<option value=negative >Negative</option>
	</select>
	<input type=number name=expected_fun_for_group />
	<input type=number name=previous_statement_id reference=statement.id nullable=false />
	<input type=number name=next_statement_id reference=statement.id />
	<unique_constraint col=next_statement_id,previous_statement_id name=unique_transition ></unique_constraint>
	<input type=checkbox name=is_exception />
</form>


Through the use of additionnal tags of html and attributes we can convey a lot of informations usable for database construction/querying that are gonna be silent at the presentation (like unique_constraint). And with a little bit of javascript and CSS this html generate the following rendering (indicating the webservices endpoint as input type=submit :


Meaning that you can now serve a landing page that serve the purpose of human interaction, describing a « curl way » of automating interaction and a full model of your database.

Most startup think data model should be obfuscated to prevent being copied, most free software project thinks that sharing the non valuable assets helps adopt the technology.

And thanks to this, I can now create my own test suite that is using the HTML form to work on a doppleganger of the real database by parsing the HTML served by the application service (pdca.py) and launch a perfectly functioning service out of it:
from requests import post
from html.parser import HTMLParser

import requests
import os
from dateutil import parser
from passlib.hash import scrypt as crypto_hash # we can change the hash easily
from urllib.parse import parse_qsl, urlparse

# heaviweight
from requests import get
from sqlalchemy import *
from sqlalchemy.ext.automap import automap_base
from sqlalchemy.orm import Session
DB=os.environ.get('DB','test.db')
DB_DRIVER=os.environ.get('DB_DRIVER','sqlite')
DSN=f"{DB_DRIVER}://{DB_DRIVER == 'sqlite' and not DB.startswith('/') and '/' or ''}{DB}"
ENDPOINT="http://127.0.0.1:5000"
os.chdir("..")
os.system(f"rm {DB}")
os.system(f"DB={DB} DB_DRIVER={DB_DRIVER} python pdca.py & sleep 2")
url = lambda table : ENDPOINT + "/" + table
os.system(f"curl {url('group')}?_action=search")

form_to_db = transtype_input = lambda attrs : {  k: (
                # handling of input having date/time in the name
                "date" in k or "time" in k and v and type(k) == str )
                    and parser.parse(v) or
                # handling of boolean mapping which input begins with "is_"
                k.startswith("is_") and [False, True][v == "on"] or
                # password ?
                "password" in k and crypto_hash.hash(v) or
                v
                for k,v in attrs.items() if v  and not k.startswith("_")
}

post(url("user"), params = dict(id=1,  secret_password="toto", name="jul2", email="[email protected]", _action="create"), files=dict(pic_file=open("./assets/diag.png", "rb").read())).status_code
#os.system(f"curl {ENDPOINT}/user?_action=search")
#os.system(f"sqlite3 {DB} .dump")

engine = create_engine(DSN)
metadata = MetaData()


transtype_true = lambda p : (p[0],[False,True][p[1]=="true"])
def dispatch(p):
    return dict(
        nullable=transtype_true,
        unique=transtype_true,
        default=lambda p:("server_default",eval(p[1])),
    ).get(p[0], lambda *a:None)(p)

transtype_input = lambda attrs : dict(filter(lambda x :x, map(dispatch, attrs.items())))

class HTMLtoData(HTMLParser):
    def __init__(self):
        global engine, tables, metadata
        self.cols = []
        self.table = ""
        self.tables= []
        self.enum =[]
        self.engine= engine
        self.meta = metadata
        super().__init__()

    def handle_starttag(self, tag, attrs):
        global tables
        attrs = dict(attrs)
        simple_mapping = {
            "email" : UnicodeText, "url" : UnicodeText, "phone" : UnicodeText,
            "text" : UnicodeText, "checkbox" : Boolean, "date" : Date, "time" : Time,
            "datetime-local" : DateTime, "file" : Text, "password" : Text, "uuid" : Text, #UUID is postgres specific
        }

        if tag in {"select", "textarea"}:
            self.enum=[]
            self.current_col = attrs["name"]
            self.attrs= attrs
        if tag == "option":
            self.enum.append( attrs["value"] )
        if tag == "unique_constraint":
            self.cols.append( UniqueConstraint(*attrs["col"].split(','), name=attrs["name"]) )
        if tag in { "input" }:
            if attrs.get("name") == "id":
                self.cols.append( Column('id', Integer,  **( dict(primary_key = True) | transtype_input(attrs ))))
                return
            try:
                if attrs.get("name").endswith("_id"):
                    table=attrs.get("name").split("_")
                    self.cols.append( Column(attrs["name"], Integer, ForeignKey(attrs["reference"])) )
                    return
            except Exception as e:
                log(e, ln=line())

            if attrs.get("type") in simple_mapping.keys() or tag in {"select",}:
                self.cols.append( 
                    Column(
                        attrs["name"], simple_mapping[attrs["type"]],
                        **transtype_input(attrs)
                    )
                )
            if attrs["type"] == "number":
                if attrs.get("step","") == "any":
                    self.cols.append( Columns(attrs["name"], Float) )
                else:
                    self.cols.append( Column(attrs["name"], Integer) )
        if tag== "form":
            self.table = urlparse(attrs["action"]).path[1:]

    def handle_endtag(self, tag):
        global tables
        if tag == "select":
            # self.cols.append( Column(self.current_col,Enum(*[(k,k) for k in self.enum]), **transtype_input(self.attrs)) )

            self.cols.append( Column(self.current_col, Text, **transtype_input(self.attrs)) )
            
        if tag == "textarea":
            self.cols.append(
                Column(
                    self.current_col,
                    String(int(self.attrs["cols"])*int(self.attrs["rows"])),
                    **transtype_input(self.attrs)) 
           )
        if tag=="form":
            self.tables.append( Table(self.table, self.meta, *self.cols), )
            #tables[self.table] = self.tables[-1]

            self.cols = []
            with engine.connect() as cnx:
                self.meta.create_all(engine)
                cnx.commit()

HTMLtoData().feed(get("http://127.0.0.1:5000/").text)
os.system("pkill -f pdca.py")



#metadata.reflect(bind=engine)
Base = automap_base(metadata=metadata)

Base.prepare()

with Session(engine) as session:
    for table,values in tuple([
        ("user", form_to_db(dict( name="him", email="[email protected]", secret_password="toto"))),
        ("group", dict(id=1, name="trolol") ),
        ("group", dict(id=2, name="serious") ),
        ("user_group", dict(id=1,user_id=1, group_id=1, secret_token="secret")),
        ("user_group", dict(id=2,user_id=1, group_id=2, secret_token="")),
        ("user_group", dict(id=3,user_id=2, group_id=1, secret_token="")),
        ("statement", dict(id=1,user_group_id=1, message="usable agile workflow", category="story" )),
        ("statement", dict(id=2,user_group_id=1, message="How do we code?", category="story_item" )),
        ("statement", dict(id=3,user_group_id=1, message="which database?", category="question")),
        ("statement", dict(id=4,user_group_id=1, message="which web framework?", category="question")),
        ("statement", dict(id=5,user_group_id=1, message="preferably less", category="answer")),
        ("statement", dict(id=6,user_group_id=1, message="How do we test?", category="story_item" )),
        ("statement", dict(id=7,user_group_id=1, message="QA framework here", category="delivery" )),
        ("statement", dict(id=8,user_group_id=1, message="test plan", category="test" )),
        ("statement", dict(id=9,user_group_id=1, message="OK", category="finish" )),
        ("statement", dict(id=10, user_group_id=1, message="PoC delivered",category="delivery")),

        ("transition", dict( user_group_id=1, previous_statement_id=1, next_statement_id=2, message="something bugs me",is_exception=True, )),
        ("transition", dict( 
            user_group_id=1, 
            previous_statement_id=2, 
            next_statement_id=4, 
            message="standup meeting feedback",is_exception=True, )),
        ("transition", dict( 
            user_group_id=1, 
            previous_statement_id=2, 
            next_statement_id=3, 
            message="standup meeting feedback",is_exception=True, )),
        ("transition", dict( user_group_id=1, previous_statement_id=2, next_statement_id=6, message="change accepted",is_exception=True, )),
        ("transition", dict( user_group_id=1, previous_statement_id=4, next_statement_id=5, message="arbitration",is_exception=True, )),
        ("transition", dict( user_group_id=1, previous_statement_id=3, next_statement_id=5, message="arbitration",is_exception=True, )),
        ("transition", dict( user_group_id=1, previous_statement_id=6, next_statement_id=7, message="R&D", )),
        ("transition", dict( user_group_id=1, previous_statement_id=7, next_statement_id=8, message="Q&A", )),
        ("transition", dict( user_group_id=1, previous_statement_id=8, next_statement_id=9, message="CI action", )),
        ("transition", dict( user_group_id=1, previous_statement_id=2, next_statement_id=10, message="situation unblocked", )),
        ("transition", dict( user_group_id=1, previous_statement_id=9, next_statement_id=10, message="situation unblocked", )),
        ]):
        session.add(getattr(Base.classes,table)(**values))
        session.commit()
os.system("python ./generate_state_diagram.py sqlite:///test.db > out.dot ;dot -Tpng out.dot > diag2.png; xdot out.dot")
s = requests.session()

os.system(f"DB={DB} DB_DRIVER={DB_DRIVER} python pdca.py & sleep 1")


print(s.post(url("group"), params=dict(_action="delete", id=3,name=1)).status_code)
print(s.post(url("grant"), params = dict(secret_password="toto", email="[email protected]",group_id=1, )).status_code)
print(s.post(url("grant"), params = dict(_redirect="/group",secret_password="toto", email="[email protected]",group_id=2, )).status_code)
print(s.cookies["Token"])
print(s.post(url("user_group"), params=dict(_action="search", user_id=1)).text)
print(s.post(url("group"), params=dict(_action="create", id=3,name=2)).text)
print(s.post(url("group"), params=dict(_action="delete", id=3)).status_code)
print(s.post(url("group"), params=dict(_action="search", )).text)
os.system("pkill -f pdca.py")
Which give me a nice set of data to play with while I experiment on how to handle the business logic where the core of the value is.

The crudest CRUD of them all : the smallest CRUD possible in 150 lines of python

Right now, I am on a never ending quest that requires me to think of building a full fledge MVC controller : an anti-jira tracker that would favours HARD CHECKED facts over wishful thinking.

For this to begin, I am not really motivated in beginning with a full fledged MVC (Model View Controller) à la django because there is a lot of boilerplates and actions to do before a result. But, it has a lot of feature I want, including authentication, authorization and handling security.

For prototypes we normally flavours lightweight framework (à la flask), and CRUD.

CRUD approach is a factorisation of all framework in a single dynamic form that adapts itself to the model to generate HTML forms to input data, tabulate, REST endpoints and search them from the python class declaration and generate the database model. One language to rule them all : PYTHON. You can easily generate even the javascript to handle autocompletion on the generated view from python with enough talent.

But before using a CRUD framework, we need a cruder one, ugly, disgusting but useful for a human before building the REST APIs, writing the class in python, the HTML form, and the controlers.

I call this the crudest CRUD of them all.

Think hard at what you want when prototyping ...

  • to write no CONTROLLERS ; flask documentation has a very verbose approach to exposing routes and writing them, writing controller for embasing and searching databases is boring
  • to write the fewer HTML views possible, one and only onle would be great ;
  • to avoid having to fiddle the many files reflecting separation of concerns : the lesser python files and class you touch the better;
  • to avoid having to write SQL nor use an ORM (at least a verbose declarative one) ;
  • show me your code and you can mesmerize and even fool me, however show me your data structure and I'll know everthing I have to know about your application : data structure should be under your nose in a readable fashion in the code;/
  • to have AT LEAST one end point for inserting and searching so that curl can be used to begin automation and testing, preferably in a factorisable fashion;
  • only one point of failure is accepted

Once we set these few condition we see whatever we do WE NEED a dynamic http server at the core. Python being the topic here, we are gonna do it in python.

What is the simplest dynamic web server in python ?

The reference implementation of wsgi that is the crudest wsgi server of them all : wsgiref. And you don't need to download it since it's provided in python stdlib.

First thing first, we are gonna had a default view so that we can serve an HTML static page with the list of the minimal HTML we need to interact with data : sets of input and forms.

Here, we stop. And we see that these forms are describing the data model.

Wouldn't it be nice if we could parse the HTML form easily with a tool from the standard library : html.parser and maybe deduce the database model and even more than fields coud add relationship, and well since we are dreaming : what about creating the tables on the fly from the form if they don't exists ?

The encoding of the relationship do require an hijack of convention where when the parser cross a name of the field in the form whatever_id it deduces it is a foreign key to table « whatever », column « id ».
Once this is done, we can parse the html, do some magick to match HTML input types to database types (adapter) and it's almost over. We can even dream of creating the database if it does not exists in a oneliner for sqlite.

We just need to throw away all the frugality of dependencies by the window and spoil our karma of « digital soberty » by adding the almighty sqlalchemy the crudest (but still heavy) ORM when it comes of the field of the introspective features of an ORM to map a database object to a python object in a clear consistent way. With this, just one function is needed in the controller to switch from embasing (POST method) and searching (GET).

Well, if the DOM is passed in the request. So of course I see the critics here :
  • we can't pass the DOM in the request because the HTML form ignores the DOM
  • You are not scared of error 415 (request too large) in the get method if you pass the DOM ?
That's where we obviously need two important tools : 1) javascript, 2) limitations.

Since we are human we would also like the form to be readable when served, because, well, human don't read the source and can't see the name attributes of the input. A tad of improving the raw html would be nice. It would also give consistency. It will also diminishes the required size of the formular to send. Here, javascript again is the right anwser. Fine, we serve the static page in the top of the controller. Let's use jquery to make it terse enough. Oh, if we have Javascript, wouldn't il be able to clone the part of the invented model tag inside every form so now we can pass the relevant part of the DOM to the controller ?

I think we have everything to write the crudest CRUD server of them all :D

Happy code reading :
import multipart
from wsgiref.simple_server import make_server
from json import dumps
from sqlalchemy import *
from html.parser import HTMLParser
from base64 import b64encode
from sqlalchemy.ext.automap import automap_base
from sqlalchemy.orm import Session
from dateutil import parser
from sqlalchemy_utils import database_exists, create_database
from urllib.parse import parse_qsl, urlparse

engine = create_engine("sqlite:///this.db")
if not database_exists(engine.url):
    create_database(engine.url)

tables = dict()

class HTMLtoData(HTMLParser):
    def __init__(self):
        global engine, tables
        self.cols = []
        self.table = ""
        self.tables= []
        self.engine= engine
        self.meta = MetaData()
        super().__init__()

    def handle_starttag(self, tag, attrs):
        attrs = dict(attrs)
        simple_mapping = dict(
            email = UnicodeText, url = UnicodeText, phone = UnicodeText, text = UnicodeText,
            date = Date, time = Time, datetime = DateTime, file = Text
        )
        if tag == "input":
            if attrs.get("name") == "id":
                self.cols += [ Column('id', Integer, primary_key = True), ]
                return
            try:
                if attrs.get("name").endswith("_id"):
                    table,_=attrs.get("name").split("_")
                    self.cols += [ Column(attrs["name"], Integer, ForeignKey(table + ".id")) ]
                    return
            except Exception as e: print(e)

            if attrs.get("type") in simple_mapping.keys():
                self.cols += [ Column(attrs["name"], simple_mapping[attrs["type"]]), ]

            if attrs["type"] == "number":
                if attrs["step"] == "any":
                    self.cols+= [ Columns(attrs["name"], Float), ]
                else:
                    self.cols+= [ Column(attrs["name"], Integer), ]
        if tag== "form":
            self.table = urlparse(attrs["action"]).path[1:]

    def handle_endtag(self, tag):
        if tag=="form":
            self.tables += [ Table(self.table, self.meta, *self.cols), ]
            tables[self.table] = self.tables[-1]
            self.table = ""
            self.cols = []
            with engine.connect() as cnx:
                self.meta.create_all(engine)
                cnx.commit()
html = """
<!doctype html>
<html>
<head>
<style>
* {    font-family:"Sans Serif" }
body { text-align: center; }
fieldset {  border: 1px solid #666;  border-radius: .5em; width: 30em; margin: auto; }
form { text-align: left; display:inline-block; }
input { margin-bottom:1em; padding:.5em;}
[value=create] { background:#ffffba} [value=delete] { background:#bae1ff} [value=update] { background:#ffdfda}
[value=read] { background:#baffc9}
[type=submit] { margin-right:1em; margin-bottom:0em; border:1px solid #333; padding:.5em; border-radius:.5em; }
</style>
<script src="https://ajax.googleapis.com/ajax/libs/jquery/3.7.1/jquery.min.js"></script>
<script>
$(document).ready(function() {
    $("form").each((i,el) => {
        $(el).wrap("<fieldset></fieldset>"  );
        $(el).before("<legend>" + el.action + "</legend>");
        $(el).append("<input name=_action type=submit value=create ><input name=_action type=submit value=read >")
        $(el).append("<input name=_action type=submit value=update ><input name=_action type=submit value=delete >")
    });
    $("input:not([type=hidden],[type=submit])").each((i,el) => {
        $(el).before("<label>" + el.name+ "</label><br/>");
        $(el).after("<br>");
    });
});
</script>
</head>
<body >
    <form  action=/user method=post >
        <input type=number name=id />
        <input type=text name=name />
        <input type=email name=email />
    </form>
    <form action=/event method=post >
        <input type=number name=id />
        <input type=date name=from_date />
        <input type=date name=to_date />
        <input type=text name=text />
        <input type=number name=user_id />
    </form>
</body>
</html>

"""


router = dict({"" : lambda fo: html,})

def simple_app(environ, start_response):
    fo, fi=multipart.parse_form_data(environ)
    fo.update(**{ k: dict(
            name=fi[k].filename,
            content_type=fi[k].content_type,
            content=b64encode(fi[k].file.read())
        ) for k,v in fi.items()})
    table = route = environ["PATH_INFO"][1:]
    fo.update(**dict(parse_qsl(environ["QUERY_STRING"])))
    HTMLtoData().feed(html)
    metadata = MetaData()
    metadata.reflect(bind=engine)
    Base = automap_base(metadata=metadata)
    Base.prepare()
    attrs_to_dict = lambda attrs : {  k: (
                    "date" in k or "time" in k ) and type(k) == str
                        and parser.parse(v) or
                    "file" in k and f"""data:{fo[k]["content_type"]}; base64, {fo[k]["content"].decode()}""" or v
                    for k,v in attrs.items() if v and not k.startswith("_")
    }
    if route in tables.keys():
        start_response('200 OK', [('Content-type', 'application/json; charset=utf-8')])
        with Session(engine) as session:
            try:
                action = fo.get("_action", "")
                Item = getattr(Base.classes, table)
                if action == "delete":
                    session.delete(session.get(Item, fo["id"]))
                    session.commit()
                    fo["result"] = "deleted"
                if action == "create":
                    new_item = Item(**attrs_to_dict(fo))
                    session.add(new_item)
                    session.flush()
                    ret=session.commit()
                    fo["result"] = new_item.id
                if action == "update":
                    session.delete(session.get(Item, fo["id"]))
                    new_item = Item(**attrs_to_dict(fo))
                    session.add(new_item)
                    session.commit()
                    fo["result"] = new_item.id
                if action in { "read", "search" }:
                    result = []
                    for elt in session.execute(
                        select(Item).filter_by(**attrs_to_dict(fo))).all():
                        result += [{ k.name:getattr(elt[0], k.name) for k in tables[table].columns}]
                    fo["result"] = result
            except Exception as e:
                fo["error"] = e
                session.rollback()
    else:
        start_response('200 OK', [('Content-type', 'text/html; charset=utf-8')])

    return [ router.get(route,lambda fo:dumps(fo.dict, indent=4, default=str))(fo).encode() ]

print("Crudest CRUD of them all on port 5000...")
make_server('', 5000, simple_app).serve_forever()

Is chatgpt good at generating code for tuning a guitar ?

I was on a french speaking IRC chan bragging a tad about how I was doing a guitar tuner and paying attention to not fall into the pit of confusing precise and exact figures as a random computer engineer.

Science he was a patented CS engineer he wanted to prove me that my new guitar tuner was useless since AI could come with a better less convoluted exact example in less lines of code than mine (mine is adapted from a blog on audio processing and fast fourier transform because it was commented and was refreshing me the basics of signal processing).

And I asked him, have you ever heard of the Nyquist frequency ? or the tradeoff an oscilloscope have to do between time locality and accuracy ?

Of course he didn't. And was proud that a wannabee coder would be proven useless thanks to the dumbest AI.



So I actually made this guitar tuner because this time I wanted to have an exact figure around the Hertz.
The problem stated by FFT/Nyquist formula is that if I want an exact number around 1Hz (1 period per second) I should sample at least half a period (hence .5 second), and should not expect a good resolution.

The quoted chatgpt code takes 1024 samples out of 44100/sec, giving it a nice reactivity of 1/44th of a second with an accuracy of 44100/1024/2 => 21Hz.

I know chatgpt does not tune a guitar, but shouldn't the chatgpt user bragging about the superiority of pro as him remember that when we tune we may tune not only at A = 440Hz but maybe A = 432Hz or other ?

A note is defined as a racine 12th of 2 compared to an arbitrary reference (remember an octave is doubling => 12 half tones = 2) ; what makes a temperate scale is not the reference but the relationship between notes and this enable bands of instrument to tune themselves according to the most unreliable but also nice instrument which is the human voice.

Giving the user 3 decimal after the comma is called being precise : it makes you look like a genius in the eye of the crowd. Giving the user 0 decimal but accurate frequency is what makes you look dumb in the eyes of the computer science engineer, but it way more useful in real life.

Here I took the liberty with pysine to generate A on 6 octaves (ref A=440) and use the recommanded chatgpt buffer size acknowledged by a pro CS engineer for tuning your guitar and my default choices.
for i in 55.0 110.0 220.0 440.0 880.0 1760.0 ; do python -m pysine $i 3; done
Here is the result with a chunk size of 1024 :
And here is the result with a chunk size corresponding to half a second of sampling :

I may not be a computer engineer, I am dumb, but checking with easy to use tools that your final result is in sync with your goal is for me more important than diplomas and professional formations.



The code is yet another basic animation in matplotlib with a nice arrow pointing the frequency best fitting item. It is not the best algorithm, but it does the job.

Showing the harmonics as well as the tonal has another benefit it answers the questions : why would I tune my string on the note of the upper string ?
Like tuning E on A ?

Well because -at least for my half broken guitar- it ensures to me that I will tune on the tonal note.

Here is a sample of tuning E on the empty string :
And this is how tuning the E string on the A note looks like :
And don't pay attention to the 56Hz residual noise triggered by my fans/appliance turning and making a constant noise :D Here is the code
import pyaudio
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import numpy as np
import time
from sys import argv

A = 440.0
try:
    A=float(argv[1])
except IndexError:
    pass

form_1 = pyaudio.paInt16 # 16-bit resolution
chans = 1 # 1 channel
samp_rate = 44100 # 44.1kHz sampling rate
chunk = 44100//2# .5 seconds of sampling for 1Hz accuracy

audio = pyaudio.PyAudio() # create pyaudio instantiation

# create pyaudio stream
stream = audio.open(
    format = form_1,rate = samp_rate,channels = chans,
    input = True , frames_per_buffer=chunk
)

fig = plt.figure(figsize=(13,8))
ax = fig.add_subplot(111)
plt.grid(True)
def compute_freq(ref, half_tones):
    return [ 1.0*ref*(2**((half_tones+12*i )/12)) for i in range(-4,4)   ]

print(compute_freq(A,0))
note_2_freq = dict(
    E = compute_freq(A,-5),
    A = compute_freq(A, 0),
    D = compute_freq(A, 5),
    G = compute_freq(A,-2),
    B = compute_freq(A, 2),
    )
resolution = samp_rate/(2*chunk)

def closest_to(freq):
    res = dict()
    for note, freqs in note_2_freq.items():
        res[note]=max(freqs)
        for f in freqs:
            res[note]= min(res[note], abs(freq -f))
    note,diff_freq = sorted(res.items(), key = lambda item : item[1])[0]

    for f in note_2_freq[note]:
        if abs(freq-f) == diff_freq:
            return "%s %s %2.1f %d" % (
                note,
                abs(freq - f ) < resolution and "=" or
                    ( freq > f and "+" or "-"),
                abs(freq-f),
                freq
            )

def init_func():
    plt.rcParams['font.size']=18
    plt.xlabel('Frequency [Hz]')
    plt.ylabel('Amplitude [Arbitry Unit]')
    plt.grid(True)
    ax.set_xscale('log')
    ax.set_yscale('log')
    ax.set_xticks( note_2_freq["E"] + note_2_freq["A"]+
                  note_2_freq["D"]+ note_2_freq["G"]+
                  note_2_freq["B"] ,
                labels = (
                    [ "E" ] * len(note_2_freq["E"]) +
                    [ "A" ] * len(note_2_freq["A"]) +
                    [ "D" ] * len(note_2_freq["D"]) +
                    [ "G" ] * len(note_2_freq["G"]) +
                    [ "B" ] * len(note_2_freq["B"])
                    )
    )

    ax.set_xlim(40, 4000)
    return ax

def data_gen():
    stream.start_stream()
    data = np.frombuffer(stream.read(chunk),dtype=np.int16)
    stream.stop_stream()
    yield data
i=0
def animate(data):
    global i
    i+=1
    ax.cla()
    init_func()
    # compute FFT parameters
    f_vec = samp_rate*np.arange(chunk/2)/chunk # frequency vector based on window
                                               # size and sample rate
    mic_low_freq = 50 # low frequency response of the mic
    low_freq_loc = np.argmin(np.abs(f_vec-mic_low_freq))
    fft_data = (np.abs(np.fft.fft(data))[0:int(np.floor(chunk/2))])/chunk
    fft_data[1:] = 2*fft_data[1:]
    plt.plot(f_vec,fft_data)

    max_loc = np.argmax(fft_data[low_freq_loc:])+low_freq_loc

# max frequency resolution
    plt.annotate(r'$\Delta f_{max}$: %2.1f Hz, A = %2.1f Hz' % (
            resolution, A), xy=(0.7,0.92), xycoords='figure fraction'
    )
    ax.set_ylim([0,2*np.max(fft_data)])

    # annotate peak frequency
    annot = ax.annotate(
        'Freq: %s'%(closest_to(f_vec[max_loc])),
        xy=(f_vec[max_loc], fft_data[max_loc]),\
        xycoords='data', xytext=(0,30), textcoords='offset points',
        arrowprops=dict(arrowstyle="->"),
        ha='center',va='bottom')
    #fig.savefig('full_figure-%04d.png' % i)
    return ax,

ani = animation.FuncAnimation(
    fig, animate, data_gen, init_func, interval=.15,
    cache_frame_data=False, repeat=True, blit=False
)
plt.show()

Tune your guitar with python

Today's exercice is just about turning a very nice example of the python soundevice module into something that works for me© to help me tune my bass.

Long story short, I suck at tuning my instrument and just lost my tuner...

This will require the python module soundevice and matplotlib.

So in order to tune my guitar I indeed need a spectrosonogram that displays the frequencies captured in real time by an audio device with an output readable enough I can actually know if I am nearing a legit frequency called a Note.

The frequencies for the notes are pretty arbitrary and I chose to only show the frequency for E, A , D, G, B since I have a 5 strings bass.
I chose the frequency between 100 and 2000 knowing that anyway any frequency below will trigger harmonics and above will trigger reasonance in the right frequency frame.

Plotting a spectrogram is done by tweaking the eponym matplotlib grapher with values chosen to fit my need and show me a laser thin beam around the right frequency.
#!/usr/bin/env python3
"""Show a text-mode spectrogram using live microphone data."""
import argparse
import math
import shutil
import matplotlib.pyplot as plt
from multiprocessing import Process, Queue
import matplotlib.animation as animation

import numpy as np
import sounddevice as sd

usage_line = ' press enter to quit,'

def int_or_str(text):
    """Helper function for argument parsing."""
    try:
        return int(text)
    except ValueError:
        return text

try:
    columns, _ = shutil.get_terminal_size()
except AttributeError:
    columns = 80

parser = argparse.ArgumentParser(add_help=False)
parser.add_argument(
    '-l', '--list-devices', action='store_true',
    help='show list of audio devices and exit')
args, remaining = parser.parse_known_args()
if args.list_devices:
    print(sd.query_devices())
    parser.exit(0)
parser = argparse.ArgumentParser(
    description=__doc__ + '\n\nSupported keys:' + usage_line,
    formatter_class=argparse.RawDescriptionHelpFormatter,
    parents=[parser])
parser.add_argument(
    '-b', '--block-duration', type=float, metavar='DURATION', default=50,
    help='block size (default %(default)s milliseconds)')
parser.add_argument(
    '-d', '--device', type=int_or_str,
    help='input device (numeric ID or substring)')
parser.add_argument(
    '-g', '--gain', type=float, default=10,
    help='initial gain factor (default %(default)s)')
parser.add_argument(
    '-r', '--range', type=float, nargs=2,
    metavar=('LOW', 'HIGH'), default=[50, 4000],
    help='frequency range (default %(default)s Hz)')
args = parser.parse_args(remaining)
low, high = args.range
if high <= low:
    parser.error('HIGH must be greater than LOW')
q = Queue()
try:
    samplerate = sd.query_devices(args.device, 'input')['default_samplerate']
    def plot(q):
        global samplerate
        fig, ( ax,axs) = plt.subplots(nrows=2)
        plt.ioff()
        def animate(i,q):
            data = q.get()
            ax.clear()
            axs.clear()
            axs.plot(data)
            ax.set_yticks([
                41.20,	82.41,	164.8,	329.6,	659.3,  # E
                55.00, 	110.0, 	220.0, 	440.0, 	880.0,  # A
                73.42,	146.8,	293.7,	587.3,          # D
                49.00, 	98.00, 	196.0, 	392.0, 	784.0,  #G 
                61.74, 	123.5, 	246.9, 	493.9, 	987.8 ])#B 
            ax.specgram(data[:,-1],mode="magnitude", Fs=samplerate*2, scale="linear",NFFT=9002)
            ax.set_ylim(150,1000)
        ani = animation.FuncAnimation(fig, animate,fargs=(q,), interval=500)
        plt.show()

    plotrt = Process(target=plot, args=(q,))
    plotrt.start()

    def callback(indata, frames, time, status):
        if any(indata):
            q.put(indata)
        else:
            print('no input')

    with sd.InputStream(device=args.device, channels=1, callback=callback,
                        blocksize=int(samplerate * args.block_duration /50 ),
                        samplerate=samplerate) as sound:
        while True:
            response = input()
            if response in ('', 'q', 'Q'):
                break
            for ch in response:
                if ch == '+':
                    args.gain *= 2
                elif ch == '-':
                    args.gain /= 2
                else:
                    print('\x1b[31;40m', usage_line.center(args.columns, '#'),
                          '\x1b[0m', sep='')
                    break
except KeyboardInterrupt:
    parser.exit('Interrupted by user')
except Exception as e:
    parser.exit(type(e).__name__ + ': ' + str(e))

Hello world part II : actually recoding print

In part I we explored the pre-requisite in order to code print : having a grasp on the framebuffer.

Here, we are gonna deep inside one of the most overlooked object oriented abastraction : a file and actually print what we can of hello world in 100 lines of code.


The file handler and the file descriptor



These two abstractions are the low level and high level abstractions of the same thing : a view on something more complex which access has been encapsulated in generic methods. Actually when you code a framebuffer driver you provide function pointers that are specialized to your device and you may omit those common to the class. This is done with a double lookup on the major node, minor node number. Of those « generic » methods you have : seek, write, tell, truncate, open, read, close ...
The file handler in python also handles extra bytes (pun) of facilities : like character encoding, stats, and buffering.

Here, we work with the low level abstraction : the file which we access with fileno through its file descriptor. And thanks to this abstraction, you don't care if the underlying implementation fragments the file itself (ex: on a hard drive), you can magically always ask without caring for the gory details to read any arbitrary block or chararacters at any given position.

Two of the most used methods on files here are seek and write.

The file descriptor method write is sensitive to the positionning set by seek. Hence, we can write a line, and position ourselves one line below to write the next line in a character.

matrices as a view of a row based array



When I speak of rows and columns I evocate abstractions that are not related to the framebuffer.

The coordinates are an abstraction we build for convenience to say I want to write from this line at this column.
And since human beings bug after 2 dimensions we split the last dimnension in a vector of dimension 4 called a pixel.
get_at function illustrates our use of this trick to position the (invisible) cursor at any given position on the screen expressed for clarity in size of glyphes.
We could actually code all this exercice through a 3D view of the framebuffer. I just wouldn't be able to pack the code in less than 100 lines of code and would introduce useless abstractions.

But if you have doubt on the numerous seek I do and why I mutiply lines and columns value the way I do check the preceding link for an understanding of raw based array nth matricial dimensionnal views.

fonts, chars glyphs...



Here we are gonna take matrices defining the glyphes (what you actually see on screen) by 8x8 = 64 1D array and map them on the screen with put_char. Put char does a little bit of magic by relying on python to do the chararcter to glyph conversion through the dict lookup that expecting strings does a de factor codepoint to glyph conversion without having to pass the codepoint conversion.

The set of characters to glyphs conversion with their common property is a font.

The hidden console



The console is an abstraction that keeps track of the global states such as : what is the current line we print at. Thus, here, being lazy I use the global variables instead of a singleton named « console » or « term » to keep track of them. But first and foremost, these « abstractions » are just expectations we share in a common mental mode. Like we expect « print » to add a newline at the end of the string and begin the printing at the next line.

The to be finished example



I limited the code to 100 lines so that it's fairly readable. I let as an exercise the following points :
  • encoding the missing glyphes in the font to actually be able to write "hello world!",
  • handling the edge case of reaching the bottom of the screen,
I want to point out the true python print function is WAY MORE COMPLEX than this terse example and also handle magic conversion from memory objects to their string counterpart (like integers that are converted to their decimal representation), it handles buffering, encoding, and so much more. This is merely a toy to dive into the complexity of the mission at hand.
This example is a part of a project to write « hello world » on the framebuffer in numerous languages, bash included.

Annexe : the code




#!/usr/bin/env python3
from struct import pack
from os import SEEK_CUR, lseek as  seek, write
w,h =map(int, open("/sys/class/graphics/fb0/virtual_size").read().split(","))
so_pixel = 4
stride = w * so_pixel

encode = lambda b,g,r,a : pack("4B",b,g,r,a)

font = {
    "height":8,
    "width":8,
    'void' : [ 
        0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 1, 1, 1, 1, 0, 0,
        0, 1, 0, 0, 0, 0, 1, 0, 
        0, 0, 1, 0, 0, 1, 1, 0,
        0, 0, 0, 0, 1, 0, 0, 0,
        0, 0, 0, 0, 1, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 1, 0, 0, 0, 
       ],
    "l":[ 
        0, 0, 0, 0, 0, 0, 0, 0,
        0, 1, 0, 0, 0, 0, 0, 0,
        0, 1, 0, 0, 0, 0, 0, 0,
        0, 1, 0, 0, 0, 0, 0, 0,
        0, 1, 0, 0, 0, 0, 0, 0,
        0, 1, 0, 0, 0, 0, 0, 0,
        0, 1, 0, 0, 0, 0, 0, 0,
        0, 0, 1, 1, 1, 1, 0, 0,
        ],
    "o": [
        0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 1, 1, 1, 1, 0, 0,
        0, 1, 0, 0, 0, 0, 1, 0,
        0, 1, 0, 0, 0, 0, 1, 0,
        0, 1, 0, 0, 0, 0, 1, 0,
        0, 1, 0, 0, 0, 0, 1, 0,
        0, 0, 1, 0, 0, 1, 0, 0,
        0, 0, 0, 1, 1, 0, 0, 0],
    "h": [
        0, 0, 0, 0, 0, 0, 0, 0,
        0, 1, 0, 0, 0, 0, 0, 0,
        0, 1, 0, 0, 0, 0, 0, 0,
        0, 1, 0, 0, 0, 0, 0, 0,
        0, 1, 1, 1, 1, 0, 0, 0,
        0, 1, 0, 0, 1, 0, 0, 0,
        0, 1, 1, 0, 1, 0, 0, 0,
        0, 1, 0, 0, 1, 0, 0, 0],
}

def go_at(fh, x, y): 
   global stride
   seek(fh.fileno(),x*so_pixel + y *stride, 0)

def next_line(fh, reminder):
    seek(fh.fileno(), stride - reminder, SEEK_CUR)

def put_char(fh, x,y, letter):
    go_at(fh, x, y)
    black = encode(0,0,0,255)
    white = encode(255,255,255,255)
    char = font.get(letter, None) or font["void"]
    line = ""
    for col,pixel in enumerate(char):
        write(fh.fileno(), white if pixel else black)
        if (col%font["width"]==font["width"]-1):
            next_line(fh, so_pixel * font["width"])
COL=0
LIN=0

OUT = open("/dev/fb0", "bw")
FD = OUT.fileno()

def newline():
    global OUT,LIN,COL
    LIN+=1
    go_at(OUT, 0, LIN * font["height"])

def print_(line):
    global OUT, COL, LIN
    COL=0
    for c in line:
        if c == "\n":
            newline()
        put_char(OUT,COL * font["width"] , LIN * font['height'], c)
        COL+=1
    newline() 

for i in range(30):
    print_("hello lol")

Revisiting hello world : coding print from scratch part I

The « hello world » example is about standing on the shoulders of the giant and learn how to use function as tools.

Most coders will use print during their whole life without actually coding it. However, it is a fun exercise.

The framebuffer



Given you are on linux you probably have a device named /dev/fb0 if you don't, you can't do this. The framebuffer is a view of the linear graphical memory used by your video card where what you see on the screen is stored ... at the condition you are in console mode and you have the rights.

On my debian centric distribution, to give the_user permissions to write in the framebuffer I must add the_user to group_video. This can be done with sudo adduser the_user video or sudo vigr.

Then, you have to be in console mode. To switch from xorg/wayland to the console back and forth I use the Ctrl + Alt + Fn combination to switch off from X and Alt + Fn to switch back to X (it's called switching the virtual console).

Once this is done you check you have rights by doing
cat /dev/urandom > /dev/fb0
which should fill your screen with random colors and insult you stating there is no more room left on the device. SNAFU : everyhting works as intended.


The pixel



Framebuffer don't know about pixels made of Red, Green, Blue and alpha (given you have a video card that is less than 20 years old), they are just made of memory. We will have to slowly build up our understanding of what this is all about.

The in memory layout may differ according to the hardware, some are having a RGBA layout, mine, the i915 is having a BGRA layout. The following example may need to be rewritten with different hardware if the output is not consistent with your assumption.

Determining the memory layout and coordinates



We will do a test and validate code session : first we make assumption on where the colours are by writting 3 squares of Red, Blue and Green on the screen, then, we will snapshot the screen.

$ cat fb.py
#!/usr/bin/env python3
from struct import pack
w,h =map(int, open("/sys/class/graphics/fb0/virtual_size").read().split(","))
midx = w//2
midy = h//2

encode = lambda b,g,r,a : pack("4B",b,g,r,a)

with open("/dev/fb0", "wb") as f:
    for y in range(0,h):
        for x in range(0,w):
            f.write(encode(
                not x%100 and 0xA0 or x<midx and 0xFF or 0, #blue
                y<midy and 0xFF or 0,                       #green
                x>midx and y>midy and 0xFF or 0,            #red
                0,
            ))
The only « trick » is the use of pack to encode the four colour bytes in a byte array that is written to the framebuffer filehandler. If the code works correctly we should validate the following assumptions:
  • coordinates are such as 0 is top left of the screen where green and blue should superpose
  • my 1920x1080 screen should have 19 weired stripes (hence validating the geometry)
  • each colours should be in its square, red bottom right, green top right, blue bottom left.
  • RAM as a char device is accessing a low level file
And if the world is consistent we can read from the framebuffer and snapshot it in the most trivial picture encoding witch is Portable PixMap format. A portable Pixmap is made of
  • a magic number P3 followed by
  • width
  • height
  • the maximum colour value (here 255)
  • the 3 colour bytes Red, Blue, Green without Alpha value per pixel

The code for this is straigh forward :
$ cat snap.py
#!/usr/bin/env python
from struct import unpack
w,h = map( int,open("/sys/class/graphics/fb0/virtual_size").read().split(","))

# returns b g r a
decode = lambda pixel : unpack("4B", pixel)

def pr(b,g,r,a):
    print("%d %d %d" % (r,g,b))

print(f"""P3
{w} {h}
255
""")

with open("/dev/fb0", "rb") as fin:
    while pixel := fin.read(4):
        pr(*decode(pixel))
Here the only trick is we use the symetrical function of pack, unpack to decode the pixel in the four colour bytes.

wrapping up part one



Asumming you can install fim the framebuffer image wiewer and you installed imagemagick : you can now do
./fb.py && snap.py > this.ppm  && convert this.ppm this.jpg && fim this.jpg
Doing so, you should have the same picture showing twice without an error like this :
As an exercise, you can vary the fb.py to make funny output, or code a PPM viewer that print back your ppm to the screen.

3D ploter in python-tk with matplotlib.

Wishing to prove my assertion wrong on python-tk that piping python directly into tk/tcl interpreter is simple I tried to contradict myself by making a full GUI in matplotlib. Because, if you are not aware : matplotlib supports multi-target (Wx, Qt, gtk, tk, html) multi-platform widgets (Button, checkbox, text entry and so much more).

The challenge seemed pretty easy, to assemble an easy demo with simple example : Thus, the added value resided in letting people fill in the min/max/step information for the relevant dimensions.

Without the colorbar I would have just been slightly annoyed by the slowness of the reaction of matplotlib as a GUI, but the colorbar posed a new challenge because it would either stack for each drawing or make plt.clf/ax.cla erase too much (see this great resource on when to use cla/clf in matplotlib).

So ... I tried python-tk with matplotlib knowing all too well that you can embed natively matplotlib in tkinter interface.

And since it was working I kept it.

Here is a screenshot of the interface :
WARNING this code should not be let in inventive hands (such as bored teenagers) because there is an evil eval; it requires to be in care of consenting adults.

Some highlights of the code :
  • bidir python-tk requires setting the Popen PIPEs to non blocking and using select.select on the output
  • matplotlib is unusable in non blocking mode : once matplotlib has the focus you need to destroy it to plot another function
  • from np import * is evil, but it let you have access to all array oriented math function (sin, cos, exp, ...)
This said, we have a pretty compact code of 128 lines of code that is pretty more reactive than using matplolib's widget for a 3D ploter.

Writing an interactive tcl/tk interpreter proxy to wish in python

Maybe, you want to experiment small stuffs in wish (the tcl/tk) interpreter because of a post claiming that direct python tcl/tk is simpler in some simple cases than tkinter.

As a convinced tkinter/FreeSimpleGUI user, I see this as an extreme claim that requires solid evidences.

When all is said and done, wish interpreter is not interactive, and for testing simple stuff it can get annoying very fast. Thus, it would be nice to add readline to the interface.

So here is a less than 100 line of code exercice of soing exactly so while having fun with : readline and multiprocessing (I would have taken multithreading if threads were easy to terminate).

Readline I quote
The readline module defines a number of functions to facilitate completion and reading/writing of history files from the Python interpreter.
Basically, it adds arrow navigation in history, back search with Ctrl+R, Ctrl+K for cuting on the right, Ctrl+Y for yanking ... all the facilities of interaction you have in bash or ipython for instance.

We are gonna use multiprocessing because tcl/tl is event oriented, hence, asynchronuous hence, we may have string coming from the tcl stdout while we do nothing and we would like to print them.

We also introduce like in ipython some magic prefixed with # (comment in tcl) like #? for the help. A session should look like this :
# pack [ button .c -text that -command { puts "hello" } ]
# 
tcl output> hello # here we pressed the button "that"
tcl output> hello # here we pressed the button "that"


# set name 32
# puts $name

tcl output> 32

# #?

#l print current recorded session
#? print current help
#! calls python code like
  #!save(name="temp") which saves the current session in current dir in "temp" file
bye exit quit quit the current session

# #l
pack [ button .c -text that -command { puts "hello" } ]
set name 32
puts $name

# #!save("my_test.tcl")
# quit
The code in itself is fairly easy to read the only catch is that wish accepts multiline input. I can't because I don't know how to parse tcl. As a result I « eval in tcl » every line to know if there is an error and ask politely tcl to do the job of signaling the error with a « catch/error » (the equivalent of python try + raise an exception).
#!/usr/bin/env python3
# -*- coding: utf8 -*-

from subprocess import Popen, PIPE, STDOUT
from multiprocessing import Process
import sys, os
import atexit
import os
import readline
from select import select
from time import sleep

### interactive session with history with readline
histfile = os.path.join(os.path.expanduser("~"), ".wish_history")
try:
    readline.read_history_file(histfile)
    # default history len is -1 (infinite), which may grow unruly
    readline.set_history_length(-1)
except FileNotFoundError:
    pass

### saving history at the end of the session
atexit.register(readline.write_history_file, histfile)

### opening wish
wish = Popen(['wish'], 
        stdin=PIPE,
        stdout=PIPE,
        stderr=PIPE,
        bufsize=-1,
        )

os.set_blocking(wish.stdout.fileno(), False)
os.set_blocking(wish.stderr.fileno(), False)
os.set_blocking(wish.stdin.fileno(), False)

def puts(s):
    out = f"""set code [ catch {{ {s} }} p ]
if {{$code}} {{ error $p }}
"""
    select([], [wish.stdin], [])
    wish.stdin.write(out.encode())


def gets():
    while True:
        wish.stdout.flush()
        tin = wish.stdout.read()
        if tin:
            print("\ntcl output> " + tin.decode())
        sleep(.1)

def save(fn="temp"):
    with open(fn,"wt") as f:
        f.write(session)

session=s=""
def load(fn="temp"):
    global session
    with open(fn, "rt") as f:
        while l:= f.readline():
            session+=l + "\n"
            puts(l)


# async io in tcl requires a background process to read the input
t =Process(target=gets, arwish=())
t.start()

while True:
    s = input("# ")
    if s in { "bye", "quit", "exit" }:
        t.terminate()
        wish.stdin.write("destroy .".encode())
        break
    elif s == "#l":
        print(session)
    elif s == "#?":
        print("""
#l print current recorded session
#? print current help
#! calls python code like
  #!save(name="temp") which saves the current session in current dir in "temp" file
  #!load(name="temp") which load the session stored in current dir in "temp" file
bye exit quit quit the current session
""" )
        continue
    elif s.startswith("#!"):
        print(eval(s[2:]))
        continue
    else:
        puts(s)
        if err:=wish.stderr.readline():
            sys.stderr.write(err.decode())
        else:
            if s and not s.startswith("#"):
                session += s + "\n"

This code is available on pypi as iwish (interactive wish) and the git link is in the README.