aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRunasSudo <runassudo@yingtongli.me>2017-08-20 22:45:56 +1000
committerRunasSudo <runassudo@yingtongli.me>2017-08-20 22:47:07 +1000
commite62ea2a4a7294d9695e4f0effa856ef946231c2a (patch)
tree994ce8e1b465dae92e6cd2a9ae2e7479873fa163
parent9c7c23d6740a6f0aaedfff96e7e756761b5f4a79 (diff)
Change to directly-executable format for module
-rw-r--r--README.md2
-rw-r--r--pblive/__init__.py247
-rw-r--r--pblive/__main__.py247
3 files changed, 248 insertions, 248 deletions
diff --git a/README.md b/README.md
index 234beff..b701a3d 100644
--- a/README.md
+++ b/README.md
@@ -7,7 +7,7 @@ PBLive is an open-source self-hosted live online quiz tool, similar to Kahoot an
virtualenv venv
. venv/bin/activate
pip install -r requirements.txt
- python -m pblive.__init__
+ python -m pblive
Navigate to http://1.2.3.4:5000/admin to begin.
diff --git a/pblive/__init__.py b/pblive/__init__.py
index 55469e1..e69de29 100644
--- a/pblive/__init__.py
+++ b/pblive/__init__.py
@@ -1,247 +0,0 @@
-# PBLive
-# Copyright © 2017 RunasSudo (Yingtong Li)
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Affero General Public License for more details.
-#
-# You should have received a copy of the GNU Affero General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import eventlet
-import flask
-import flask_socketio
-
-# idk why it doesn't work...
-try:
- from . import data
-except ImportError:
- import data
-
-import os, os.path
-import random
-import socket
-import sys
-import yaml
-
-eventlet.monkey_patch()
-
-app = flask.Flask('pblive')
-app.jinja_env.globals['data'] = data
-
-socketio = flask_socketio.SocketIO(app)
-
-# Get server IP address
-tmp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
-tmp_socket.connect(('8.8.8.8', 0)) # Connecting to a UDP socket sends no packets
-data.server_ip = tmp_socket.getsockname()[0]
-tmp_socket.close()
-
-# Load session data
-for f in os.listdir('data'):
- if f.endswith('.yaml') and not f.startswith('.'):
- session_name = f[:-5]
- with open(os.path.join('data', f)) as fh:
- data.sessions[session_name] = data.Session.from_dict(yaml.load(fh), session_name)
-
-@app.route('/')
-def index():
- return flask.render_template('index.html')
-
-@app.route('/admin')
-def admin():
- return flask.render_template('admin.html')
-
-@app.route('/session/<session_name>')
-def session(session_name):
- return flask.render_template('session.html', session=data.sessions[session_name])
-
-@app.route('/image/<location>')
-def image(location):
- # TODO: Relative path
- return flask.send_from_directory(os.path.join(os.getcwd(), 'data/img'), location)
-
-@app.route('/admin/session/<session_name>')
-def admin_session(session_name):
- return flask.render_template('admin_session.html', session=data.sessions[session_name])
-
-@app.route('/debug')
-def debug():
- assert app.debug == False
-
-@socketio.on('join')
-def socket_join(session_name):
- app.logger.debug('New client {} connected'.format(flask.request.sid))
-
- session = data.sessions[session_name]
- user = data.User(sid=flask.request.sid, session=session)
- data.users[flask.request.sid] = user
-
- # Send initial colour picker
- flask_socketio.emit('update', flask.render_template('colour_picker.html', session=session), room=flask.request.sid)
- flask_socketio.emit('update_left', render_sidebar(user, session), room=flask.request.sid)
-
-def render_question(user, session, question_num):
- return flask.render_template(session.questions[question_num].template, session=session, user=user, question_num=session.question_num)
-
-def render_question_admin(session, question_num):
- return flask.render_template(session.questions[question_num].template_admin, session=session, question_num=session.question_num)
-
-def render_sidebar(user, session):
- return flask.render_template('users.html', session=session, user=user)
-
-@socketio.on('join_admin')
-def socket_join(session_name):
- app.logger.debug('New admin {} connected'.format(flask.request.sid))
-
- session = data.sessions[session_name]
- user = data.Admin(sid=flask.request.sid, session=session)
- data.admins_lock.acquire()
- data.admins[flask.request.sid] = user
- data.admins_lock.release()
-
- # Send initial screen
- flask_socketio.emit('update', render_question_admin(session, session.question_num), room=flask.request.sid)
- flask_socketio.emit('update_left', render_sidebar(user, session), room=flask.request.sid)
-
-@socketio.on('disconnect')
-def socket_disconnect():
- app.logger.debug('Client {} disconnected'.format(flask.request.sid))
-
- if flask.request.sid in data.users:
- user = data.users[flask.request.sid]
-
- data.users_lock.acquire()
- del data.users[flask.request.sid]
- data.users_lock.release()
-
- # Release the colour if it's being held
- if user.colour:
- user.session.colours.append(user.colour)
-
- # Relay change
- for _, other_user in data.iterate_users():
- if other_user != user and other_user.session == user.session:
- flask_socketio.emit('update_left', render_sidebar(other_user, user.session), room=other_user.sid)
- if not other_user.colour:
- flask_socketio.emit('update', flask.render_template('colour_picker.html', session=user.session), room=other_user.sid)
- for _, admin in data.iterate_admins():
- if admin.session == user.session:
- flask_socketio.emit('update_left', render_sidebar(admin, user.session), room=admin.sid)
-
-@socketio.on('register')
-def socket_register(colour_id, colour_name):
- user = data.users[flask.request.sid]
-
- if not user.colour and (colour_id, colour_name) in user.session.colours:
- user.colour = (colour_id, colour_name)
- user.session.colours.remove(user.colour)
-
- # Relay change
- for _, other_user in data.iterate_users():
- if other_user != user and other_user.session == user.session:
- flask_socketio.emit('update_left', render_sidebar(other_user, user.session), room=other_user.sid)
- if not other_user.colour:
- flask_socketio.emit('update', flask.render_template('colour_picker.html', session=user.session), room=other_user.sid)
- for _, admin in data.iterate_admins():
- if admin.session == user.session:
- flask_socketio.emit('update_left', render_sidebar(admin, user.session), room=admin.sid)
-
- flask_socketio.emit('update', render_question(user, user.session, user.session.question_num), room=user.sid)
- flask_socketio.emit('update_left', render_sidebar(user, user.session), room=user.sid)
-
-@socketio.on('answer')
-def socket_answer(question_num, answer):
- user = data.users[flask.request.sid]
-
- if question_num == user.session.question_num:
- if isinstance(user.session.questions[user.session.question_num], data.SpeedQuestion):
- if question_num in user.answers:
- # Only one shot!
- return
-
- user.answers[question_num] = answer
-
- if isinstance(user.session.questions[user.session.question_num], data.MCQQuestion):
- flask_socketio.emit('update', render_question(user, user.session, user.session.question_num), room=user.sid)
-
- # Hurry!
- if isinstance(user.session.questions[user.session.question_num], data.SpeedQuestion):
- if user.session.questions[user.session.question_num].timer_thread is None:
- user.session.questions[user.session.question_num].timer_thread = data.SpeedQuestionTimerThread(flask.copy_current_request_context(do_goto_question), user.session, user.session.question_num + 1)
- user.session.questions[user.session.question_num].timer_thread.start()
-
- # Relay change
- for _, other_user in data.iterate_users():
- if other_user.session == user.session:
- flask_socketio.emit('update_left', render_sidebar(other_user, user.session), room=other_user.sid)
- if isinstance(user.session.questions[user.session.question_num], data.SpeedQuestion):
- flask_socketio.emit('update', render_question(other_user, user.session, user.session.question_num), room=other_user.sid)
- for _, admin in data.iterate_admins():
- if admin.session == user.session:
- flask_socketio.emit('update', render_question_admin(user.session, user.session.question_num), room=admin.sid)
- flask_socketio.emit('update_left', render_sidebar(admin, user.session), room=admin.sid)
-
-@socketio.on('reveal_answers')
-def socket_reveal_answers(question_num):
- user = data.admins[flask.request.sid]
-
- user.session.questions[question_num].revealed = True
-
- flask_socketio.emit('update', render_question_admin(user.session, user.session.question_num), room=flask.request.sid)
-
-def do_goto_question(session, question_num):
- # Cleanup old question
- if isinstance(session.questions[session.question_num], data.SpeedQuestion):
- if session.questions[session.question_num].timer_thread is not None:
- session.questions[session.question_num].timer_thread.stop()
-
- session.question_num = question_num
-
- # Do work for some questions
- if isinstance(session.questions[question_num], data.RandomQuestion):
- session.questions[question_num].answerer = random.choice([other_user for _, other_user in data.users.items() if other_user.session == session and other_user.colour])
-
- # Relay change
- for _, other_user in data.iterate_users():
- if other_user.session == session and other_user.colour:
- flask_socketio.emit('update', render_question(other_user, session, session.question_num), room=other_user.sid)
- flask_socketio.emit('update_left', render_sidebar(other_user, session), room=other_user.sid)
- for _, admin in data.iterate_admins():
- if admin.session == session:
- flask_socketio.emit('update', render_question_admin(session, session.question_num), room=admin.sid)
- flask_socketio.emit('update_left', render_sidebar(admin, session), room=admin.sid)
-
-@socketio.on('goto_question')
-def socket_goto_question(question_num):
- user = data.admins[flask.request.sid]
-
- do_goto_question(user.session, question_num)
-
-@socketio.on('pass_question')
-def socket_pass_question():
- user = data.admins[flask.request.sid] if flask.request.sid in data.admins else data.users[flask.request.sid]
-
- if isinstance(user.session.questions[user.session.question_num], data.RandomQuestion):
- # Re-randomise answerer
- user.session.questions[user.session.question_num].answerer = random.choice([other_user for _, other_user in data.users.items() if other_user.session == user.session and other_user.colour])
-
- # Relay change
- for _, other_user in data.iterate_users():
- if other_user.session == user.session and other_user.colour:
- flask_socketio.emit('update', render_question(other_user, other_user.session, other_user.session.question_num), room=other_user.sid)
- flask_socketio.emit('update_left', render_sidebar(other_user, user.session), room=other_user.sid)
- for _, admin in data.iterate_admins():
- if admin.session == user.session:
- flask_socketio.emit('update', render_question_admin(admin.session, admin.session.question_num), room=admin.sid)
- flask_socketio.emit('update_left', render_sidebar(admin, user.session), room=admin.sid)
-
-# Start server
-if __name__ == '__main__':
- socketio.run(app, host='0.0.0.0')
diff --git a/pblive/__main__.py b/pblive/__main__.py
new file mode 100644
index 0000000..efdb8d0
--- /dev/null
+++ b/pblive/__main__.py
@@ -0,0 +1,247 @@
+# PBLive
+# Copyright © 2017 RunasSudo (Yingtong Li)
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import eventlet
+import flask
+import flask_socketio
+
+# idk why it doesn't work...
+try:
+ from . import data
+except ImportError:
+ import data
+
+import os, os.path
+import random
+import socket
+import sys
+import yaml
+
+eventlet.monkey_patch()
+
+app = flask.Flask('pblive')
+app.jinja_env.globals['data'] = data
+
+socketio = flask_socketio.SocketIO(app)
+
+# Get server IP address
+tmp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+tmp_socket.connect(('118.138.0.0', 0)) # Connecting to a UDP socket sends no packets
+data.server_ip = tmp_socket.getsockname()[0]
+tmp_socket.close()
+
+# Load session data
+for f in os.listdir('data'):
+ if f.endswith('.yaml') and not f.startswith('.'):
+ session_name = f[:-5]
+ with open(os.path.join('data', f)) as fh:
+ data.sessions[session_name] = data.Session.from_dict(yaml.load(fh), session_name)
+
+@app.route('/')
+def index():
+ return flask.render_template('index.html')
+
+@app.route('/admin')
+def admin():
+ return flask.render_template('admin.html')
+
+@app.route('/session/<session_name>')
+def session(session_name):
+ return flask.render_template('session.html', session=data.sessions[session_name])
+
+@app.route('/image/<location>')
+def image(location):
+ # TODO: Relative path
+ return flask.send_from_directory(os.path.join(os.getcwd(), 'data/img'), location)
+
+@app.route('/admin/session/<session_name>')
+def admin_session(session_name):
+ return flask.render_template('admin_session.html', session=data.sessions[session_name])
+
+@app.route('/debug')
+def debug():
+ assert app.debug == False
+
+@socketio.on('join')
+def socket_join(session_name):
+ app.logger.debug('New client {} connected'.format(flask.request.sid))
+
+ session = data.sessions[session_name]
+ user = data.User(sid=flask.request.sid, session=session)
+ data.users[flask.request.sid] = user
+
+ # Send initial colour picker
+ flask_socketio.emit('update', flask.render_template('colour_picker.html', session=session), room=flask.request.sid)
+ flask_socketio.emit('update_left', render_sidebar(user, session), room=flask.request.sid)
+
+def render_question(user, session, question_num):
+ return flask.render_template(session.questions[question_num].template, session=session, user=user, question_num=session.question_num)
+
+def render_question_admin(session, question_num):
+ return flask.render_template(session.questions[question_num].template_admin, session=session, question_num=session.question_num)
+
+def render_sidebar(user, session):
+ return flask.render_template('users.html', session=session, user=user)
+
+@socketio.on('join_admin')
+def socket_join(session_name):
+ app.logger.debug('New admin {} connected'.format(flask.request.sid))
+
+ session = data.sessions[session_name]
+ user = data.Admin(sid=flask.request.sid, session=session)
+ data.admins_lock.acquire()
+ data.admins[flask.request.sid] = user
+ data.admins_lock.release()
+
+ # Send initial screen
+ flask_socketio.emit('update', render_question_admin(session, session.question_num), room=flask.request.sid)
+ flask_socketio.emit('update_left', render_sidebar(user, session), room=flask.request.sid)
+
+@socketio.on('disconnect')
+def socket_disconnect():
+ app.logger.debug('Client {} disconnected'.format(flask.request.sid))
+
+ if flask.request.sid in data.users:
+ user = data.users[flask.request.sid]
+
+ data.users_lock.acquire()
+ del data.users[flask.request.sid]
+ data.users_lock.release()
+
+ # Release the colour if it's being held
+ if user.colour:
+ user.session.colours.append(user.colour)
+
+ # Relay change
+ for _, other_user in data.iterate_users():
+ if other_user != user and other_user.session == user.session:
+ flask_socketio.emit('update_left', render_sidebar(other_user, user.session), room=other_user.sid)
+ if not other_user.colour:
+ flask_socketio.emit('update', flask.render_template('colour_picker.html', session=user.session), room=other_user.sid)
+ for _, admin in data.iterate_admins():
+ if admin.session == user.session:
+ flask_socketio.emit('update_left', render_sidebar(admin, user.session), room=admin.sid)
+
+@socketio.on('register')
+def socket_register(colour_id, colour_name):
+ user = data.users[flask.request.sid]
+
+ if not user.colour and (colour_id, colour_name) in user.session.colours:
+ user.colour = (colour_id, colour_name)
+ user.session.colours.remove(user.colour)
+
+ # Relay change
+ for _, other_user in data.iterate_users():
+ if other_user != user and other_user.session == user.session:
+ flask_socketio.emit('update_left', render_sidebar(other_user, user.session), room=other_user.sid)
+ if not other_user.colour:
+ flask_socketio.emit('update', flask.render_template('colour_picker.html', session=user.session), room=other_user.sid)
+ for _, admin in data.iterate_admins():
+ if admin.session == user.session:
+ flask_socketio.emit('update_left', render_sidebar(admin, user.session), room=admin.sid)
+
+ flask_socketio.emit('update', render_question(user, user.session, user.session.question_num), room=user.sid)
+ flask_socketio.emit('update_left', render_sidebar(user, user.session), room=user.sid)
+
+@socketio.on('answer')
+def socket_answer(question_num, answer):
+ user = data.users[flask.request.sid]
+
+ if question_num == user.session.question_num:
+ if isinstance(user.session.questions[user.session.question_num], data.SpeedQuestion):
+ if question_num in user.answers:
+ # Only one shot!
+ return
+
+ user.answers[question_num] = answer
+
+ if isinstance(user.session.questions[user.session.question_num], data.MCQQuestion):
+ flask_socketio.emit('update', render_question(user, user.session, user.session.question_num), room=user.sid)
+
+ # Hurry!
+ if isinstance(user.session.questions[user.session.question_num], data.SpeedQuestion):
+ if user.session.questions[user.session.question_num].timer_thread is None:
+ user.session.questions[user.session.question_num].timer_thread = data.SpeedQuestionTimerThread(flask.copy_current_request_context(do_goto_question), user.session, user.session.question_num + 1)
+ user.session.questions[user.session.question_num].timer_thread.start()
+
+ # Relay change
+ for _, other_user in data.iterate_users():
+ if other_user.session == user.session:
+ flask_socketio.emit('update_left', render_sidebar(other_user, user.session), room=other_user.sid)
+ if isinstance(user.session.questions[user.session.question_num], data.SpeedQuestion):
+ flask_socketio.emit('update', render_question(other_user, user.session, user.session.question_num), room=other_user.sid)
+ for _, admin in data.iterate_admins():
+ if admin.session == user.session:
+ flask_socketio.emit('update', render_question_admin(user.session, user.session.question_num), room=admin.sid)
+ flask_socketio.emit('update_left', render_sidebar(admin, user.session), room=admin.sid)
+
+@socketio.on('reveal_answers')
+def socket_reveal_answers(question_num):
+ user = data.admins[flask.request.sid]
+
+ user.session.questions[question_num].revealed = True
+
+ flask_socketio.emit('update', render_question_admin(user.session, user.session.question_num), room=flask.request.sid)
+
+def do_goto_question(session, question_num):
+ # Cleanup old question
+ if isinstance(session.questions[session.question_num], data.SpeedQuestion):
+ if session.questions[session.question_num].timer_thread is not None:
+ session.questions[session.question_num].timer_thread.stop()
+
+ session.question_num = question_num
+
+ # Do work for some questions
+ if isinstance(session.questions[question_num], data.RandomQuestion):
+ session.questions[question_num].answerer = random.choice([other_user for _, other_user in data.users.items() if other_user.session == session and other_user.colour])
+
+ # Relay change
+ for _, other_user in data.iterate_users():
+ if other_user.session == session and other_user.colour:
+ flask_socketio.emit('update', render_question(other_user, session, session.question_num), room=other_user.sid)
+ flask_socketio.emit('update_left', render_sidebar(other_user, session), room=other_user.sid)
+ for _, admin in data.iterate_admins():
+ if admin.session == session:
+ flask_socketio.emit('update', render_question_admin(session, session.question_num), room=admin.sid)
+ flask_socketio.emit('update_left', render_sidebar(admin, session), room=admin.sid)
+
+@socketio.on('goto_question')
+def socket_goto_question(question_num):
+ user = data.admins[flask.request.sid]
+
+ do_goto_question(user.session, question_num)
+
+@socketio.on('pass_question')
+def socket_pass_question():
+ user = data.admins[flask.request.sid] if flask.request.sid in data.admins else data.users[flask.request.sid]
+
+ if isinstance(user.session.questions[user.session.question_num], data.RandomQuestion):
+ # Re-randomise answerer
+ user.session.questions[user.session.question_num].answerer = random.choice([other_user for _, other_user in data.users.items() if other_user.session == user.session and other_user.colour])
+
+ # Relay change
+ for _, other_user in data.iterate_users():
+ if other_user.session == user.session and other_user.colour:
+ flask_socketio.emit('update', render_question(other_user, other_user.session, other_user.session.question_num), room=other_user.sid)
+ flask_socketio.emit('update_left', render_sidebar(other_user, user.session), room=other_user.sid)
+ for _, admin in data.iterate_admins():
+ if admin.session == user.session:
+ flask_socketio.emit('update', render_question_admin(admin.session, admin.session.question_num), room=admin.sid)
+ flask_socketio.emit('update_left', render_sidebar(admin, user.session), room=admin.sid)
+
+# Start server
+if __name__ == '__main__':
+ socketio.run(app, host='0.0.0.0')
Contact (issues, pull requests, etc.) at git@yingtongli.me. Generated by cgit.