/* eslint no-console:0 */ const path = require('path'); const { URL } = require('url'); const CloudFrontParser = require('cloudfront-log-parser'); const parseUA = require('ua-parser-js'); const format = require('date-fns/format'); const zlib = require('zlib'); const { pipeline } = require('./fs'); const { Readable, Transform, Writable } = require('stream'); const { open: opensql } = require('sqlite'); const sqlite3 = require('sqlite3'); const sql = require('./sql-tag'); let fs = require('fs'); fs = { ...fs, ...fs.promises }; function url (input) { try { const { hash, host, hostname, href, origin, password, pathname, port, protocol, search, searchParams, username } = new URL(input); // eslint-disable-line max-len return { hash, host, hostname, href, origin, password, pathname, port, protocol, search, searchParams, username }; } catch (e) { return null; } } // function asyncthrough (...args) { // const [ fn, donefn ] = args; // args[0] = function (file, enc, next) { // fn(this, file, enc).then(() => next(), (err) => { console.error(err, 'Error thrown'); next(err); }); // }; // if (donefn) { // args[1] = function (next) { // donefn(this).then(() => next(), (err) => { console.error(err, 'Error thrown'); next(err); }); // }; // } // return through.obj(...args); // } const parser = new CloudFrontParser({ format: 'web' }); async function* loadFiles () { const dir = path.resolve(__dirname, 'RAW'); for await (const f of await fs.opendir(dir)) { if (!f.isFile()) continue; const fpath = path.resolve(dir, f.name); const file = path.parse(fpath); if (file.ext !== '.gz') continue; // console.log(file); const filestream = fs.createReadStream(fpath).pipe(zlib.createGunzip()); for await (const chunk of filestream) { yield chunk; } } } (async () => { // open the database const db = await opensql({ filename: path.resolve(__dirname, 'database.sqlite'), driver: sqlite3.Database, }); await db.run(sql` CREATE TABLE IF NOT EXISTS records ( dts INTEGER, ip TEXT, tid INTEGER, url TEXT, referrer TEXT, referrer_host TEXT, client_start INTEGER, client_end INTEGER, duration INTEGER, language TEXT, scrolled INTEGER, max_scroll INTEGER, page_height INTEGER, viewport_height INTEGER, browser TEXT, browser_version INTEGER, os TEXT, device_type TEXT, device TEXT ) `); await db.exec(sql` CREATE UNIQUE INDEX IF NOT EXISTS entries ON records ( dts, ip, tid ); `); await db.run('PRAGMA busy_timeout = 6000'); const stmt = await db.prepare(sql` REPLACE INTO records VALUES ( :dts, :ip, :tid, :url, :referrer, :referrer_host, :client_start, :client_end, :duration, :language, :scrolled, :max_scroll, :page_height, :viewport_height, :browser, :browser_version, :os, :device_type, :device ); `); let counter = 0; await pipeline( Readable.from(loadFiles()), parser, new Transform({ readableObjectMode: true, writableObjectMode: true, transform (row, encoding, done) { // filter out OPTIONS calls if (row['cs-method'] === 'OPTIONS') return done(); // I only care about the pixel hits, nothing else. if (row['cs-uri-stem'] !== '/i') return done(); // this isn't an analytics event if (row['cs-referer'] === '-') return done(); row = Object.fromEntries(Object.entries(row).map(([ k, v ]) => [ k.replace(/-/g, '_'), v ])); const query = (row.cs_uri_query === '-') ? {} : Object.fromEntries(new URLSearchParams(row.cs_uri_query)) ; // we didn't get analytics data from this load, ignore it if (!query.start) return done(); const useragent = parseUA(row.cs_user_agent); const sessionStart = Number(query.start); const sessionEnd = query.end === 'null' ? 0 : Number(query.end); const duration = sessionEnd > sessionStart ? Math.floor((sessionEnd - sessionStart) / 1000) : null; let { language, viewed, max_scroll, page_height, viewport_height, } = query; max_scroll = parseInt(max_scroll, 10) || 0; page_height = parseInt(page_height, 10) || 0; viewport_height = parseInt(viewport_height, 10) || 0; const { pathname } = url(row.cs_referer) || {}; const { hostname: referrer_host, href: referrer } = url(query.referrer) || {}; const result = { dts: `${row.date} ${row.time}`, ip: row.c_ip, tid: query.tid !== 'false' ? query.tid : null, url: pathname, referrer, referrer_host, client_start: format(new Date(sessionStart), 'yyyy-MM-dd HH:mm:ss'), client_end: sessionEnd ? format(new Date(sessionStart), 'yyyy-MM-dd HH:mm:ss') : null, duration, language, scrolled: viewed, max_scroll, page_height, viewport_height, browser: useragent.browser.name, browser_version: useragent.browser.major, os: useragent.os.name + ' ' + useragent.os.version, device_type: useragent.device && useragent.device.type || null, device: useragent.device && useragent.device.vendor && useragent.device.vendor + ' ' + useragent.device.model || null, }; this.push(result); done(); }, }), new Writable({ objectMode: true, // highWaterMark: 2, write (record, encoding, done) { (async () => { const params = Object.fromEntries( Object.entries(record).map(([ k, v ]) => [ ':' + k, v || null ]), ); while (true) { try { await stmt.run(params); break; } catch (err) { if (err.code !== 'SQLITE_BUSY') throw err; } } counter++; if (!(counter % 10)) process.stdout.write('.'); })().then(() => done(), done); }, }), ); await stmt.finalize(); await db.close(); })().then( () => process.exit(), (err) => { console.error(err.stack); process.exit(1); }, );